Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions src/main/java/se/yolean/kafka/keyvalue/CacheRecord.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package se.yolean.kafka.keyvalue;

import java.io.Serializable;
import java.util.Arrays;

public final class CacheRecord implements Serializable {

private static final long serialVersionUID = 1L;

private final byte[] value;
private final long timestamp;

CacheRecord(byte[] value, UpdateRecord update) {
this.value = value;
this.timestamp = update.getTimestamp();
}

public byte[] getValue() {
return value;
}

public String getVstr() {
return new String(getValue());
}

/**
* @return kafka's record timestamp, regardless of org.apache.kafka.common.record.TimestampType value
*/
public long getTimestamp() {
return timestamp;
}

public String getTstr() {
return Long.toString(getTimestamp());
}

@Override
public String toString() {
return value.length + "@" + timestamp;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
result = prime * result + Arrays.hashCode(value);
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CacheRecord other = (CacheRecord) obj;
if (timestamp != other.timestamp)
return false;
if (!Arrays.equals(value, other.value))
return false;
return true;
}

}
14 changes: 8 additions & 6 deletions src/main/java/se/yolean/kafka/keyvalue/ConsumerAtLeastOnce.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
Expand Down Expand Up @@ -98,7 +99,7 @@ public enum Stage {

@Inject
//@javax.inject.Named("cache")
Map<String, byte[]> cache;
Map<String, CacheRecord> cache;

@Inject
OnUpdate onupdate;
Expand Down Expand Up @@ -237,7 +238,7 @@ public void run() {
}
}

void run(final KafkaConsumer<String, byte[]> consumer, final Map<String, byte[]> cache, final long polls) throws
void run(final KafkaConsumer<String, byte[]> consumer, final Map<String, CacheRecord> cache, final long polls) throws
InterruptedException,
org.apache.kafka.common.errors.TimeoutException,
org.apache.kafka.clients.consumer.NoOffsetForPartitionException,
Expand Down Expand Up @@ -304,9 +305,10 @@ void run(final KafkaConsumer<String, byte[]> consumer, final Map<String, byte[]>
Iterator<ConsumerRecord<String, byte[]>> records = polled.iterator();
while (records.hasNext()) {
ConsumerRecord<String, byte[]> record = records.next();
UpdateRecord update = new UpdateRecord(record.topic(), record.partition(), record.offset(), record.key());
UpdateRecord update = new UpdateRecord(record.topic(), record.partition(), record.offset(), record.key(),
record.timestampType() == TimestampType.NO_TIMESTAMP_TYPE ? UpdateRecord.NO_TIMESTAMP : record.timestamp());
toStats(update);
cache.put(record.key(), record.value());
cache.put(record.key(), new CacheRecord(record.value(), update));
Long start = nextUncommitted.get(update.getTopicPartition());
if (start == null) {
throw new IllegalStateException("There's no start offset for " + update.getTopicPartition() + ", at consumed offset " + update.getOffset() + " key " + update.getKey());
Expand Down Expand Up @@ -349,7 +351,7 @@ public Long getCurrentOffset(String topicName, int partition) {
}

@Override
public byte[] getValue(String key) {
public CacheRecord getValue(String key) {
return cache.get(key);
}

Expand All @@ -359,7 +361,7 @@ public Iterator<String> getKeys() {
}

@Override
public Iterator<byte[]> getValues() {
public Iterator<CacheRecord> getValues() {
return cache.values().iterator();
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/se/yolean/kafka/keyvalue/KafkaCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface KafkaCache {
*/
boolean isReady();

byte[] getValue(String key);
CacheRecord getValue(String key);

/**
* @param topicName
Expand All @@ -41,6 +41,6 @@ public interface KafkaCache {

Iterator<String> getKeys();

Iterator<byte[]> getValues();
Iterator<CacheRecord> getValues();

}
16 changes: 4 additions & 12 deletions src/main/java/se/yolean/kafka/keyvalue/UpdateRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class UpdateRecord implements Serializable {

private static final long serialVersionUID = 1L;

private static final long NO_TIMESTAMP = -1;
public static final long NO_TIMESTAMP = -1;

private final TopicPartition topicPartition;
private final long offset;
Expand All @@ -32,20 +32,12 @@ public class UpdateRecord implements Serializable {

private long timestamp = NO_TIMESTAMP;

public UpdateRecord(
String topic,
int partition,
long offset,
String key) {
public UpdateRecord(String topic, int partition, long offset, String key, long timestamp) {
this.topicPartition = new TopicPartition(topic, partition);
this.offset = offset;
this.key = key;
this.string = topicPartition.toString() + '-' + offset + '[' + key + ']';
this.hashCode = string.hashCode();
}

public UpdateRecord(String topic, int partition, long offset, String key, long timestamp) {
this(topic, partition, offset, key);
this.timestamp = timestamp;
}

Expand All @@ -70,8 +62,8 @@ TopicPartition getTopicPartition() {
}

/**
* Timestamp is just a value we carry during processing, not serialized to clients
* (at least not until we have a convincing use case for including it in onupdate).
* @return kafka's record timestamp
* @throws IllegalStateException indicating org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE
*/
public long getTimestamp() {
if (timestamp == NO_TIMESTAMP) {
Expand Down
15 changes: 9 additions & 6 deletions src/main/java/se/yolean/kafka/keyvalue/http/CacheResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;

import se.yolean.kafka.keyvalue.CacheRecord;
import se.yolean.kafka.keyvalue.KafkaCache;

@Path("/cache/v1")
public class CacheResource implements HealthCheck {

private static final String TIMESTAMP_RESPONSE_HEADER = "x-kkv-record-timestamp";
@Inject // Note that this can be null if cache is still in it's startup event handler
KafkaCache cache = null;

Expand Down Expand Up @@ -69,15 +71,15 @@ void requireUpToDateCache() throws javax.ws.rs.ServiceUnavailableException {
* @throws NotFoundException If the key wasn't in the cache or if the value
* somehow was null
*/
byte[] getCacheValue(String key) throws NotFoundException {
CacheRecord getCacheValue(String key) throws NotFoundException {
requireUpToDateCache();
if (key == null) {
throw new javax.ws.rs.BadRequestException("Request key can not be null");
}
if (key == "") {
throw new javax.ws.rs.BadRequestException("Request key can not be empty");
}
final byte[] value = cache.getValue(key);
final CacheRecord value = cache.getValue(key);
if (value == null) {
throw new NotFoundException();
}
Expand All @@ -87,9 +89,10 @@ byte[] getCacheValue(String key) throws NotFoundException {
@GET
@Path("/raw/{key}")
@Produces(MediaType.APPLICATION_OCTET_STREAM)
public byte[] valueByKey(@PathParam("key") final String key, @Context UriInfo uriInfo) {
public Response valueByKey(@PathParam("key") final String key, @Context UriInfo uriInfo) {
requireUpToDateCache();
return getCacheValue(key);
CacheRecord value = getCacheValue(key);
return Response.ok().header(TIMESTAMP_RESPONSE_HEADER, Long.toString(value.getTimestamp())).entity(value.getValue()).build();
}

@GET
Expand Down Expand Up @@ -163,13 +166,13 @@ public void write(OutputStream out) throws IOException, WebApplicationException
@Produces(MediaType.TEXT_PLAIN)
public Response values() {
requireUpToDateCache();
Iterator<byte[]> values = cache.getValues();
Iterator<CacheRecord> values = cache.getValues();

StreamingOutput stream = new StreamingOutput() {
@Override
public void write(OutputStream out) throws IOException, WebApplicationException {
while (values.hasNext()) {
out.write(values.next());
out.write(values.next().getValue());
out.write('\n');
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,17 @@ void testSingleRun() throws InterruptedException, ExecutionException {

assertEquals(2, consumer.cache.size(), "Should have consumed two records with different key");
assertTrue(consumer.cache.containsKey("k1"), "Should contain the first key");
assertEquals("v1", new String(consumer.cache.get("k1")), "Should have the first key's value");
assertEquals("v1", consumer.cache.get("k1").getVstr(), "Should have the first key's value");

producer.send(new ProducerRecord<String,byte[]>(TOPIC, "k1", "v2".getBytes())).get();
// TODO per-test kafka topic: producer.send(new ProducerRecord<String,byte[]>(TOPIC, "k3", "v2".getBytes())).get();
assertEquals(2, consumer.cache.size(), "Nothing should happen unless run() is ongoing");

consumer.run();
// TODO per-test kafka topic: assertEquals(3, consumer.cache.size(), "Should have got the additional key from the last batch");
assertEquals("v2", new String(consumer.cache.get("k1")), "Value should come from the latest record");
assertEquals("v2", consumer.cache.get("k1").getVstr(), "Value should come from the latest record");

Mockito.verify(consumer.onupdate).handle(new UpdateRecord(TOPIC, 2, 1, "k1"));
Mockito.verify(consumer.onupdate).handle(new UpdateRecord(TOPIC, 2, 1, "k1", 1));

// API extended after this test was written. We should probably verify order too.
Mockito.verify(consumer.onupdate, Mockito.atLeast(3)).pollStart(Collections.singletonList(TOPIC));
Expand All @@ -124,7 +124,7 @@ void testSingleRun() throws InterruptedException, ExecutionException {

// verify KafkaCache interface methods, as the REST resource uses that API
KafkaCache cache = (KafkaCache) consumer;
assertEquals("v2", new String(cache.getValue("k1")));
assertEquals("v2", cache.getValue("k1").getVstr());

// TODO assertEquals(1, cache.getCurrentOffset(TOPIC, 0));
// TODO assertEquals(null, cache.getCurrentOffset(TOPIC, 1));
Expand All @@ -137,11 +137,11 @@ void testSingleRun() throws InterruptedException, ExecutionException {
assertEquals("k2", keys.next());
assertFalse(keys.hasNext());

Iterator<byte[]> values = cache.getValues();
Iterator<CacheRecord> values = cache.getValues();
assertTrue(values.hasNext());
assertEquals("v2", new String(values.next()));
assertEquals("v2", values.next().getVstr());
assertTrue(values.hasNext());
assertEquals("v1", new String(values.next()));
assertEquals("v1", values.next().getVstr());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBrokerDisconnect() throws Exception {
assertEquals(Stage.Polling, consumer.stage); // To be able to see where we exited we're not resetting stage at the end of runs

assertEquals(1, consumer.cache.size(), "Should be operational now, before we mess with connections");
assertEquals("v1", new String(consumer.cache.get("k1")), "Should have the first key's value");
assertEquals("v1", consumer.cache.get("k1").getVstr(), "Should have the first key's value");

KafkaBroker broker = kafka.getKafkaBrokers().iterator().next();

Expand All @@ -97,7 +97,7 @@ void testBrokerDisconnect() throws Exception {
consumer.run();

assertEquals(2, consumer.cache.size(), "Cache should be operational again, after the broker was restarted");
assertEquals("v1", new String(consumer.cache.get("k2")), "Should have the first key's value");
assertEquals("v1", consumer.cache.get("k2").getVstr(), "Should have the first key's value");

producer.close();
}
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/se/yolean/kafka/keyvalue/UpdateRecordTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ class UpdateRecordTest {

@Test
void testToString() {
assertEquals("t1-123-45678[kx]", new UpdateRecord("t1", 123, 45678, "kx").toString());
assertEquals("t1-123-45678[kx]", new UpdateRecord("t1", 123, 45678, "kx", 1).toString());
}

@Test
void testHashCode() {
assertEquals("t1-123-45678[kx]".hashCode(), new UpdateRecord("t1", 123, 45678, "kx").hashCode());
assertEquals("t1-123-45678[kx]".hashCode(), new UpdateRecord("t1", 123, 45678, "kx", 2).hashCode());
}

@Test
void testGetTopicPartition() {
UpdateRecord u = new UpdateRecord("t1", 123, 45678, "kx");
UpdateRecord u = new UpdateRecord("t1", 123, 45678, "kx", 3);
assertEquals("t1", u.getTopicPartition().topic());
assertEquals(123, u.getTopicPartition().partition());
assertEquals("t1-123", u.getTopicPartition().toString());
Expand All @@ -41,9 +41,9 @@ void testGetTopicPartition() {

@Test
void testEquals() {
assertTrue(new UpdateRecord("1", 1, 2, "x").equals(new UpdateRecord("1", 1, 2, "x")));
assertFalse(new UpdateRecord("1", 1, 2, "x").equals(new UpdateRecord("1", 1, 2, "x2")));
assertFalse(new UpdateRecord("1", 1, 2, "x").equals(new UpdateRecord("1", 12, 2, "x")));
assertTrue(new UpdateRecord("1", 1, 2, "x", 4).equals(new UpdateRecord("1", 1, 2, "x", 4)));
assertFalse(new UpdateRecord("1", 1, 2, "x", 5).equals(new UpdateRecord("1", 1, 2, "x2", 5)));
assertFalse(new UpdateRecord("1", 1, 2, "x", 6).equals(new UpdateRecord("1", 12, 2, "x", 6)));
}

/* We no longer exchange these objects over HTTP so jackson annotations were removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void testEmpty() throws UnsupportedEncodingException {
@Test
void test1() throws UnsupportedEncodingException {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t1");
body.handle(new UpdateRecord("t", 1, 3, "k1"));
body.handle(new UpdateRecord("t", 1, 3, "k1", 1));
Map<String, String> headers = body.getHeaders();
ByteArrayOutputStream content = new ByteArrayOutputStream();
body.getContent(content);
Expand All @@ -55,8 +55,8 @@ void test1() throws UnsupportedEncodingException {
@Test
void test2() throws UnsupportedEncodingException {
UpdatesBodyPerTopicJSON body = new UpdatesBodyPerTopicJSON("t2");
body.handle(new UpdateRecord("t", 0, 10, "k1"));
body.handle(new UpdateRecord("t", 0, 11, "k2"));
body.handle(new UpdateRecord("t", 0, 10, "k1", 2));
body.handle(new UpdateRecord("t", 0, 11, "k2", 3));
body.getHeaders();
assertEquals(
"{\"v\":1,\"topic\":\"t2\",\"offsets\":{\"0\":11},\"updates\":{\"k1\":{},\"k2\":{}}}",
Expand Down