Skip to content

[fix][client]Failure to load encryption key should not prevent creation of producer on client side #24291

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: branch-3.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3145,9 +3145,8 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader())
.create();
Assert.fail("Producer creation should not succeed if failing to read key");
} catch (Exception e) {
// ok
Assert.fail("Producer creation should not fail if failing to read key");
}

// 2. Producer with valid key name
Expand Down Expand Up @@ -3291,6 +3290,204 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
log.info("-- Exiting {} test --", methodName);
}

@Test(timeOut = 100000)
public void testSendFailureWhenProducerFailsToLoadEncryptionKey() throws Exception {
log.info("-- Starting {} test --", methodName);

class EncKeyReader implements CryptoKeyReader {

final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}

@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}
}

// 1. Invalid key name
try {
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader())
.enableBatching(false).create();
producer.send("my-test-message".getBytes());
Assert.fail("Producer send should not succeed if failing to read key");
} catch (Exception e) {
// OK
}
// 2. Invalid key name with ProducerCryptoFailureAction.SEND
try {
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader())
.cryptoFailureAction(ProducerCryptoFailureAction.SEND)
.enableBatching(false).create();
MessageId messageId = producer.send("my-test-message".getBytes());
Assert.assertNotNull(messageId);
} catch (Exception e) {
Assert.fail("Producer send should not fail if crypto failure action is ProducerCryptoFailureAction.SEND");
}
}

@Test(timeOut = 100000)
public void testSendFailureWhenProducerFailsToLoadEncryptionKeyWithBatching() throws Exception {
log.info("-- Starting {} test --", methodName);

class EncKeyReader implements CryptoKeyReader {

final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}

@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}
}

// 1. Invalid key name
try {
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader())
.create();
for (int i = 0; i < 10; i++) {
producer.send(("my-message-" + i).getBytes());
}
Assert.fail("Producer send should not succeed if failing to read key");
} catch (Exception e) {
// OK
}
// 2. Invalid key name with ProducerCryptoFailureAction.SEND
try {
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader())
.cryptoFailureAction(ProducerCryptoFailureAction.SEND)
.create();
int cnt = 0;
for (int i = 0; i < 10; i++) {
producer.send(("my-message-" + i).getBytes());
cnt++;
}
Assert.assertEquals(cnt, 10);
} catch (Exception e) {
Assert.fail("Producer send should not fail if crypto failure action is ProducerCryptoFailureAction.SEND");
}
}

@Test(timeOut = 100000)
public void testSendFailureWhenProducerFailsToLoadEncryptionKeyWithChunking() throws Exception {
log.info("-- Starting {} test --", methodName);

class EncKeyReader implements CryptoKeyReader {

final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

@Override
public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/public-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}

@Override
public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
String CERT_FILE_PATH = "./src/test/resources/certificate/private-key." + keyName;
if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
try {
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
return keyInfo;
} catch (IOException e) {
log.error("Failed to read certificate from {}", CERT_FILE_PATH);
}
}
return null;
}
}

// 1. Invalid key name
try {
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader())
.enableBatching(false).enableChunking(true).create();
producer.send("my-test-message".getBytes());
Assert.fail("Producer send should not succeed if failing to read key");
} catch (Exception e) {
// OK
}
// 2. Invalid key name with ProducerCryptoFailureAction.SEND
try {
@Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader())
.cryptoFailureAction(ProducerCryptoFailureAction.SEND)
.enableBatching(false).enableChunking(true).create();
MessageId messageId = producer.send("my-test-message".getBytes());
Assert.assertNotNull(messageId);
} catch (Exception e) {
Assert.fail("Producer send should not fail if crypto failure action is ProducerCryptoFailureAction.SEND");
}
}

private String decryptMessage(TopicMessageImpl<byte[]> msg,
String encryptionKeyName,
CryptoKeyReader reader) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2298,9 +2298,8 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
.addEncryptionKey("client-non-existant-rsa.pem")
.cryptoKeyReader(new EncKeyReader())
.create();
Assert.fail("Producer creation should not succeed if failing to read key");
} catch (Exception e) {
// ok
Assert.fail("Producer creation should not fail if failing to read key");
}

// 2. Producer with valid key name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,7 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
try {
msgCrypto.addPublicKeyCipher(conf.getEncryptionKeys(), conf.getCryptoKeyReader());
} catch (CryptoException e) {
if (!producerCreatedFuture.isDone()) {
log.warn("[{}] [{}] [{}] Failed to add public key cipher.", topic, producerName, producerId);
producerCreatedFuture.completeExceptionally(
PulsarClientException.wrap(e,
String.format("The producer %s of the topic %s "
+ "adds the public key cipher was failed",
producerName, topic)));
}
}
}), 0L, 4L, TimeUnit.HOURS);
}
Expand Down Expand Up @@ -2375,6 +2368,9 @@ private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) {
batchMessageContainer.resetPayloadAfterFailedPublishing();
log.warn("[{}] [{}] Failed to create batch message for sending. Batch payloads have been reset and"
+ " messages will be retried in subsequent batches.", topic, producerName, t);
if (t instanceof PulsarClientException.CryptoException) {
shouldScheduleNextBatchFlush = false;
}
} finally {
if (shouldScheduleNextBatchFlush) {
maybeScheduleBatchFlushTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,8 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
try {
pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
Assert.fail("Producer creation should not suceed if failing to read key");
} catch (Exception e) {
// ok
Assert.fail("Producer creation should not fail if failing to read key");
}

// 2. Producer with valid key name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,8 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
try {
pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
Assert.fail("Producer creation should not suceed if failing to read key");
} catch (Exception e) {
// ok
Assert.fail("Producer creation should not fail if failing to read key");
}

// 2. Producer with valid key name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> metad
}

@Test
public void testEncryptionFailure() throws Exception {
public void testEncryptioEncryptionFailnFailure() throws Exception {

class EncKeyReader implements CryptoKeyReader {

Expand Down Expand Up @@ -399,9 +399,8 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
try {
pulsarClient.newProducer().topic("persistent://my-property/use/myenc-ns/myenc-topic1")
.addEncryptionKey("client-non-existant-rsa.pem").cryptoKeyReader(new EncKeyReader()).create();
Assert.fail("Producer creation should not suceed if failing to read key");
} catch (Exception e) {
// ok
Assert.fail("Producer creation should not fail if failing to read key");
}

// 2. Producer with valid key name
Expand Down
Loading