diff --git a/src/main/java/se/yolean/kafka/keyvalue/CacheRecord.java b/src/main/java/se/yolean/kafka/keyvalue/CacheRecord.java new file mode 100644 index 0000000..77b62d7 --- /dev/null +++ b/src/main/java/se/yolean/kafka/keyvalue/CacheRecord.java @@ -0,0 +1,67 @@ +package se.yolean.kafka.keyvalue; + +import java.io.Serializable; +import java.util.Arrays; + +public final class CacheRecord implements Serializable { + + private static final long serialVersionUID = 1L; + + private final byte[] value; + private final long timestamp; + + CacheRecord(byte[] value, UpdateRecord update) { + this.value = value; + this.timestamp = update.getTimestamp(); + } + + public byte[] getValue() { + return value; + } + + public String getVstr() { + return new String(getValue()); + } + + /** + * @return kafka's record timestamp, regardless of org.apache.kafka.common.record.TimestampType value + */ + public long getTimestamp() { + return timestamp; + } + + public String getTstr() { + return Long.toString(getTimestamp()); + } + + @Override + public String toString() { + return value.length + "@" + timestamp; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) (timestamp ^ (timestamp >>> 32)); + result = prime * result + Arrays.hashCode(value); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CacheRecord other = (CacheRecord) obj; + if (timestamp != other.timestamp) + return false; + if (!Arrays.equals(value, other.value)) + return false; + return true; + } + +} diff --git a/src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java b/src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java index 6db6b1f..5220385 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java +++ b/src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java @@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.health.HealthCheck; import org.eclipse.microprofile.health.HealthCheckResponse; @@ -98,7 +99,7 @@ public enum Stage { @Inject //@javax.inject.Named("cache") - Map cache; + Map cache; @Inject OnUpdate onupdate; @@ -237,7 +238,7 @@ public void run() { } } - void run(final KafkaConsumer consumer, final Map cache, final long polls) throws + void run(final KafkaConsumer consumer, final Map cache, final long polls) throws InterruptedException, org.apache.kafka.common.errors.TimeoutException, org.apache.kafka.clients.consumer.NoOffsetForPartitionException, @@ -304,9 +305,10 @@ void run(final KafkaConsumer consumer, final Map Iterator> records = polled.iterator(); while (records.hasNext()) { ConsumerRecord record = records.next(); - UpdateRecord update = new UpdateRecord(record.topic(), record.partition(), record.offset(), record.key()); + UpdateRecord update = new UpdateRecord(record.topic(), record.partition(), record.offset(), record.key(), + record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE ? UpdateRecord.NO_TIMESTAMP : record.timestamp()); toStats(update); - cache.put(record.key(), record.value()); + cache.put(record.key(), new CacheRecord(record.value(), update)); Long start = nextUncommitted.get(update.getTopicPartition()); if (start == null) { throw new IllegalStateException("There's no start offset for " + update.getTopicPartition() + ", at consumed offset " + update.getOffset() + " key " + update.getKey()); @@ -349,7 +351,7 @@ public Long getCurrentOffset(String topicName, int partition) { } @Override - public byte[] getValue(String key) { + public CacheRecord getValue(String key) { return cache.get(key); } @@ -359,7 +361,7 @@ public Iterator getKeys() { } @Override - public Iterator getValues() { + public Iterator getValues() { return cache.values().iterator(); } diff --git a/src/main/java/se/yolean/kafka/keyvalue/KafkaCache.java b/src/main/java/se/yolean/kafka/keyvalue/KafkaCache.java index e394de0..39ba48a 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/KafkaCache.java +++ b/src/main/java/se/yolean/kafka/keyvalue/KafkaCache.java @@ -30,7 +30,7 @@ public interface KafkaCache { */ boolean isReady(); - byte[] getValue(String key); + CacheRecord getValue(String key); /** * @param topicName @@ -41,6 +41,6 @@ public interface KafkaCache { Iterator getKeys(); - Iterator getValues(); + Iterator getValues(); } \ No newline at end of file diff --git a/src/main/java/se/yolean/kafka/keyvalue/UpdateRecord.java b/src/main/java/se/yolean/kafka/keyvalue/UpdateRecord.java index e2fc65a..3678cf1 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/UpdateRecord.java +++ b/src/main/java/se/yolean/kafka/keyvalue/UpdateRecord.java @@ -22,7 +22,7 @@ public class UpdateRecord implements Serializable { private static final long serialVersionUID = 1L; - private static final long NO_TIMESTAMP = -1; + public static final long NO_TIMESTAMP = -1; private final TopicPartition topicPartition; private final long offset; @@ -32,20 +32,12 @@ public class UpdateRecord implements Serializable { private long timestamp = NO_TIMESTAMP; - public UpdateRecord( - String topic, - int partition, - long offset, - String key) { + public UpdateRecord(String topic, int partition, long offset, String key, long timestamp) { this.topicPartition = new TopicPartition(topic, partition); this.offset = offset; this.key = key; this.string = topicPartition.toString() + '-' + offset + '[' + key + ']'; this.hashCode = string.hashCode(); - } - - public UpdateRecord(String topic, int partition, long offset, String key, long timestamp) { - this(topic, partition, offset, key); this.timestamp = timestamp; } @@ -70,8 +62,8 @@ TopicPartition getTopicPartition() { } /** - * Timestamp is just a value we carry during processing, not serialized to clients - * (at least not until we have a convincing use case for including it in onupdate). + * @return kafka's record timestamp + * @throws IllegalStateException indicating org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE */ public long getTimestamp() { if (timestamp == NO_TIMESTAMP) { diff --git a/src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java b/src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java index 114fc56..e1e8606 100644 --- a/src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java +++ b/src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java @@ -37,11 +37,13 @@ import org.eclipse.microprofile.health.HealthCheck; import org.eclipse.microprofile.health.HealthCheckResponse; +import se.yolean.kafka.keyvalue.CacheRecord; import se.yolean.kafka.keyvalue.KafkaCache; @Path("/cache/v1") public class CacheResource implements HealthCheck { + private static final String TIMESTAMP_RESPONSE_HEADER = "x-kkv-record-timestamp"; @Inject // Note that this can be null if cache is still in it's startup event handler KafkaCache cache = null; @@ -69,7 +71,7 @@ void requireUpToDateCache() throws javax.ws.rs.ServiceUnavailableException { * @throws NotFoundException If the key wasn't in the cache or if the value * somehow was null */ - byte[] getCacheValue(String key) throws NotFoundException { + CacheRecord getCacheValue(String key) throws NotFoundException { requireUpToDateCache(); if (key == null) { throw new javax.ws.rs.BadRequestException("Request key can not be null"); @@ -77,7 +79,7 @@ byte[] getCacheValue(String key) throws NotFoundException { if (key == "") { throw new javax.ws.rs.BadRequestException("Request key can not be empty"); } - final byte[] value = cache.getValue(key); + final CacheRecord value = cache.getValue(key); if (value == null) { throw new NotFoundException(); } @@ -87,9 +89,10 @@ byte[] getCacheValue(String key) throws NotFoundException { @GET @Path("/raw/{key}") @Produces(MediaType.APPLICATION_OCTET_STREAM) - public byte[] valueByKey(@PathParam("key") final String key, @Context UriInfo uriInfo) { + public Response valueByKey(@PathParam("key") final String key, @Context UriInfo uriInfo) { requireUpToDateCache(); - return getCacheValue(key); + CacheRecord value = getCacheValue(key); + return Response.ok().header(TIMESTAMP_RESPONSE_HEADER, Long.toString(value.getTimestamp())).entity(value.getValue()).build(); } @GET @@ -163,13 +166,13 @@ public void write(OutputStream out) throws IOException, WebApplicationException @Produces(MediaType.TEXT_PLAIN) public Response values() { requireUpToDateCache(); - Iterator values = cache.getValues(); + Iterator values = cache.getValues(); StreamingOutput stream = new StreamingOutput() { @Override public void write(OutputStream out) throws IOException, WebApplicationException { while (values.hasNext()) { - out.write(values.next()); + out.write(values.next().getValue()); out.write('\n'); } } diff --git a/src/test/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnceIntegrationTest.java b/src/test/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnceIntegrationTest.java index 4d44710..8b2f273 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnceIntegrationTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnceIntegrationTest.java @@ -102,7 +102,7 @@ void testSingleRun() throws InterruptedException, ExecutionException { assertEquals(2, consumer.cache.size(), "Should have consumed two records with different key"); assertTrue(consumer.cache.containsKey("k1"), "Should contain the first key"); - assertEquals("v1", new String(consumer.cache.get("k1")), "Should have the first key's value"); + assertEquals("v1", consumer.cache.get("k1").getVstr(), "Should have the first key's value"); producer.send(new ProducerRecord(TOPIC, "k1", "v2".getBytes())).get(); // TODO per-test kafka topic: producer.send(new ProducerRecord(TOPIC, "k3", "v2".getBytes())).get(); @@ -110,9 +110,9 @@ void testSingleRun() throws InterruptedException, ExecutionException { consumer.run(); // TODO per-test kafka topic: assertEquals(3, consumer.cache.size(), "Should have got the additional key from the last batch"); - assertEquals("v2", new String(consumer.cache.get("k1")), "Value should come from the latest record"); + assertEquals("v2", consumer.cache.get("k1").getVstr(), "Value should come from the latest record"); - Mockito.verify(consumer.onupdate).handle(new UpdateRecord(TOPIC, 2, 1, "k1")); + Mockito.verify(consumer.onupdate).handle(new UpdateRecord(TOPIC, 2, 1, "k1", 1)); // API extended after this test was written. We should probably verify order too. Mockito.verify(consumer.onupdate, Mockito.atLeast(3)).pollStart(Collections.singletonList(TOPIC)); @@ -124,7 +124,7 @@ void testSingleRun() throws InterruptedException, ExecutionException { // verify KafkaCache interface methods, as the REST resource uses that API KafkaCache cache = (KafkaCache) consumer; - assertEquals("v2", new String(cache.getValue("k1"))); + assertEquals("v2", cache.getValue("k1").getVstr()); // TODO assertEquals(1, cache.getCurrentOffset(TOPIC, 0)); // TODO assertEquals(null, cache.getCurrentOffset(TOPIC, 1)); @@ -137,11 +137,11 @@ void testSingleRun() throws InterruptedException, ExecutionException { assertEquals("k2", keys.next()); assertFalse(keys.hasNext()); - Iterator values = cache.getValues(); + Iterator values = cache.getValues(); assertTrue(values.hasNext()); - assertEquals("v2", new String(values.next())); + assertEquals("v2", values.next().getVstr()); assertTrue(values.hasNext()); - assertEquals("v1", new String(values.next())); + assertEquals("v1", values.next().getVstr()); } @Test diff --git a/src/test/java/se/yolean/kafka/keyvalue/ErrorHandlingKafkaIntegrationTest.java b/src/test/java/se/yolean/kafka/keyvalue/ErrorHandlingKafkaIntegrationTest.java index 6db2c47..2b67426 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/ErrorHandlingKafkaIntegrationTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/ErrorHandlingKafkaIntegrationTest.java @@ -80,7 +80,7 @@ void testBrokerDisconnect() throws Exception { assertEquals(Stage.Polling, consumer.stage); // To be able to see where we exited we're not resetting stage at the end of runs assertEquals(1, consumer.cache.size(), "Should be operational now, before we mess with connections"); - assertEquals("v1", new String(consumer.cache.get("k1")), "Should have the first key's value"); + assertEquals("v1", consumer.cache.get("k1").getVstr(), "Should have the first key's value"); KafkaBroker broker = kafka.getKafkaBrokers().iterator().next(); @@ -97,7 +97,7 @@ void testBrokerDisconnect() throws Exception { consumer.run(); assertEquals(2, consumer.cache.size(), "Cache should be operational again, after the broker was restarted"); - assertEquals("v1", new String(consumer.cache.get("k2")), "Should have the first key's value"); + assertEquals("v1", consumer.cache.get("k2").getVstr(), "Should have the first key's value"); producer.close(); } diff --git a/src/test/java/se/yolean/kafka/keyvalue/UpdateRecordTest.java b/src/test/java/se/yolean/kafka/keyvalue/UpdateRecordTest.java index 0c0996e..dc0d1f7 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/UpdateRecordTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/UpdateRecordTest.java @@ -22,17 +22,17 @@ class UpdateRecordTest { @Test void testToString() { - assertEquals("t1-123-45678[kx]", new UpdateRecord("t1", 123, 45678, "kx").toString()); + assertEquals("t1-123-45678[kx]", new UpdateRecord("t1", 123, 45678, "kx", 1).toString()); } @Test void testHashCode() { - assertEquals("t1-123-45678[kx]".hashCode(), new UpdateRecord("t1", 123, 45678, "kx").hashCode()); + assertEquals("t1-123-45678[kx]".hashCode(), new UpdateRecord("t1", 123, 45678, "kx", 2).hashCode()); } @Test void testGetTopicPartition() { - UpdateRecord u = new UpdateRecord("t1", 123, 45678, "kx"); + UpdateRecord u = new UpdateRecord("t1", 123, 45678, "kx", 3); assertEquals("t1", u.getTopicPartition().topic()); assertEquals(123, u.getTopicPartition().partition()); assertEquals("t1-123", u.getTopicPartition().toString()); @@ -41,9 +41,9 @@ void testGetTopicPartition() { @Test void testEquals() { - assertTrue(new UpdateRecord("1", 1, 2, "x").equals(new UpdateRecord("1", 1, 2, "x"))); - assertFalse(new UpdateRecord("1", 1, 2, "x").equals(new UpdateRecord("1", 1, 2, "x2"))); - assertFalse(new UpdateRecord("1", 1, 2, "x").equals(new UpdateRecord("1", 12, 2, "x"))); + assertTrue(new UpdateRecord("1", 1, 2, "x", 4).equals(new UpdateRecord("1", 1, 2, "x", 4))); + assertFalse(new UpdateRecord("1", 1, 2, "x", 5).equals(new UpdateRecord("1", 1, 2, "x2", 5))); + assertFalse(new UpdateRecord("1", 1, 2, "x", 6).equals(new UpdateRecord("1", 12, 2, "x", 6))); } /* We no longer exchange these objects over HTTP so jackson annotations were removed diff --git a/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java b/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java index 4ecf7a6..069e8c6 100644 --- a/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java +++ b/src/test/java/se/yolean/kafka/keyvalue/onupdate/UpdatesBodyPerTopicJSONTest.java @@ -41,7 +41,7 @@ void testEmpty() throws UnsupportedEncodingException { @Test void test1() throws UnsupportedEncodingException { UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t1"); - body.handle(new UpdateRecord("t", 1, 3, "k1")); + body.handle(new UpdateRecord("t", 1, 3, "k1", 1)); Map headers = body.getHeaders(); ByteArrayOutputStream content = new ByteArrayOutputStream(); body.getContent(content); @@ -55,8 +55,8 @@ void test1() throws UnsupportedEncodingException { @Test void test2() throws UnsupportedEncodingException { UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2"); - body.handle(new UpdateRecord("t", 0, 10, "k1")); - body.handle(new UpdateRecord("t", 0, 11, "k2")); + body.handle(new UpdateRecord("t", 0, 10, "k1", 2)); + body.handle(new UpdateRecord("t", 0, 11, "k2", 3)); body.getHeaders(); assertEquals( "{\"v\":1,\"topic\":\"t2\",\"offsets\":{\"0\":11},\"updates\":{\"k1\":{},\"k2\":{}}}",