diff --git a/src/main/java/com/timgroup/statsd/DirectSketch.java b/src/main/java/com/timgroup/statsd/DirectSketch.java new file mode 100644 index 00000000..7c075bd3 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/DirectSketch.java @@ -0,0 +1,118 @@ +package com.timgroup.statsd; + +import java.util.Arrays; + +class DirectSketch { + static final double gamma = 130.0 / 128; + static final double minValue = 1e-9; + static final double logGamma = Math.log(gamma); + static final int bias = 1 - (int)(Math.floor(Math.log(minValue) / logGamma)); + static final int posInfKey = (1 << 15) - 1; + static final int negInfKey = -posInfKey; + static final int maxCount = Integer.MAX_VALUE; + + ProtobufWriter keys = new ProtobufWriter(); + ProtobufWriter bins = new ProtobufWriter(); + + double min; + double max; + double sum; + double cnt; + + int topKey; + int topCount; + + static int key(double value) { + if (value < 0) { + return -key(-value); + } + + if (value < minValue) { + return 0; + } + + int key = (int)Math.rint(Math.log(value) / logGamma) + bias; + if (key > posInfKey) { + return posInfKey; + } + return key; + } + + void reset() { + min = 0; + max = 0; + sum = 0; + cnt = 0; + keys.clear(); + bins.clear(); + + topKey = negInfKey - 1; + topCount = 0; + } + + void append(int key, int count) { + keys.bareLong(key); + bins.bareVarint(count); + } + + void build(final long[] values, final double sampleRate) { + reset(); + buildInner(values, sampleRate); + keys.flip(); + bins.flip(); + } + + private void buildInner(final long[] values, double sampleRate) { + if (values == null || values.length == 0) { + return; + } + + Arrays.sort(values); + + if (Double.isNaN(sampleRate) || sampleRate <= 0 || sampleRate > 1) { + sampleRate = 1; + } + + final double sampleSize = 1 / sampleRate; + min = values[0]; + max = values[0]; + cnt = sampleSize * (double)values.length; + + for (long val : values) { + min = Math.min(min, val); + max = Math.max(max, val); + sum += val * sampleSize; + + int key = key(val); + + if (key == topKey) { + int remain = (int)sampleSize; + while (topCount > maxCount - remain) { + remain -= maxCount - topCount; + append(key, maxCount); + topCount = 0; + } + topCount += remain; + } else { + if (topCount > 0) { + append(topKey, topCount); + } + topKey = key; + topCount = (int)sampleSize; + } + } + + append(topKey, topCount); + } + + void serialize(ProtobufWriter pw, long timestamp) { + pw.fieldVarint(1, timestamp); + pw.fieldVarint(2, (long)cnt); + pw.fieldDouble(3, min); + pw.fieldDouble(4, max); + pw.fieldDouble(5, sum / cnt); + pw.fieldDouble(6, sum); + pw.fieldPacked(7, keys); + pw.fieldPacked(8, bins); + } +} diff --git a/src/main/java/com/timgroup/statsd/DirectStatsDClient.java b/src/main/java/com/timgroup/statsd/DirectStatsDClient.java index b5f74492..36a45e17 100644 --- a/src/main/java/com/timgroup/statsd/DirectStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/DirectStatsDClient.java @@ -43,4 +43,22 @@ public interface DirectStatsDClient extends StatsDClient { * @param tags array of tags to be added to the data */ void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags); + + /** + * Record values for the specified named distribution. + * + *
Values with an explicit timestamp are never aggregated and will be recorded as the metric value at the + * given time.
+ * + *This method is a DataDog extension, and may not work with other servers.
+ * + *This method is non-blocking and is guaranteed not to throw an exception.
+ * + * @param aspect the name of the distribution + * @param values the values to be incorporated in the distribution. This method consumes the array. + * @param sampleRate percentage of time metric to be sent + * @param tags array of tags to be added to the data + */ + void recordSketchWithTimestamp(String aspect, long[] values, double sampleRate, long timestamp, String... tags); + } diff --git a/src/main/java/com/timgroup/statsd/Message.java b/src/main/java/com/timgroup/statsd/Message.java index 322f8861..e47f47eb 100644 --- a/src/main/java/com/timgroup/statsd/Message.java +++ b/src/main/java/com/timgroup/statsd/Message.java @@ -24,7 +24,8 @@ public enum Type { HISTOGRAM("h"), DISTRIBUTION("d"), EVENT("_e"), - SERVICE_CHECK("_sc"); + SERVICE_CHECK("_sc"), + SKETCH("S"); private final String type; diff --git a/src/main/java/com/timgroup/statsd/NoOpDirectStatsDClient.java b/src/main/java/com/timgroup/statsd/NoOpDirectStatsDClient.java index a9c16852..744d2e40 100644 --- a/src/main/java/com/timgroup/statsd/NoOpDirectStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NoOpDirectStatsDClient.java @@ -8,4 +8,8 @@ public class NoOpDirectStatsDClient extends NoOpStatsDClient implements DirectSt @Override public void recordDistributionValues(String aspect, double[] values, double sampleRate, String... tags) { } @Override public void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags) { } + + @Override public void recordSketchWithTimestamp( + String aspect, long[] values, double sampleRate, long timestamp, String... tags) { } + } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java index 9b7de0ee..3a1ea81b 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java @@ -20,6 +20,13 @@ public void recordDistributionValues(String aspect, long[] values, double sample } } + @Override + public void recordSketchWithTimestamp(String aspect, long[] values, double sampleRate, long timestamp, String... tags) { + if (values != null && values.length > 0) { + sendMetric(new LongSketchMessage(aspect, values, sampleRate, timestamp, tags)); + } + } + abstract class MultiValuedStatsDMessage extends Message { private final double sampleRate; // NaN for none private final long timestamp; // zero for none @@ -153,4 +160,58 @@ protected void writeValueTo(StringBuilder buffer, int index) { buffer.append(values[index]); } } + + final ProtobufWriter pw = new ProtobufWriter(); + final DirectSketch sk = new DirectSketch(); + + final class LongSketchMessage extends Message { + final long[] values; + final double sampleRate; + final long timestamp; + + LongSketchMessage(String aspect, long[] values, double sampleRate, long timestamp, String[] tags) { + super(aspect, Message.Type.SKETCH, tags); + this.sampleRate = sampleRate; + this.values = values; + this.timestamp = timestamp; + + } + + @Override + public final boolean canAggregate() { + return false; + } + + @Override + public final void aggregate(Message message) {} + + @Override + public final boolean writeTo(StringBuilder builder, int capacity, String containerID) { + sk.build(values, sampleRate); + + pw.clear(); + sk.serialize(pw, timestamp); + + builder + .append(prefix) + .append(aspect) + .append(":"); + + pw.flip(); + pw.encodeAscii(builder); + + builder.append("|S"); + + if (timestamp != 0) { + builder.append("|T").append(timestamp); + } + if (containerID != null && !containerID.isEmpty()) { + builder.append("|c:").append(containerID); + } + tagString(tags, builder); + builder.append("\n"); + + return false; + } + } } diff --git a/src/main/java/com/timgroup/statsd/ProtobufWriter.java b/src/main/java/com/timgroup/statsd/ProtobufWriter.java new file mode 100644 index 00000000..cfa08477 --- /dev/null +++ b/src/main/java/com/timgroup/statsd/ProtobufWriter.java @@ -0,0 +1,139 @@ +package com.timgroup.statsd; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.Formatter; + +class ProtobufWriter { + enum Ty { + Varint(0), + Double(1), + Bytes(2); + + final byte tag; + Ty(int tag) { + this.tag = (byte)tag; + } + } + + ByteBuffer buf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); + + void bareVarint(long value) { + reserve(10); + do { + byte bv = (byte)(value & 127); + value >>>= 7; + if (value != 0) { + bv |= 128; + } + buf.put(bv); + } + while (value != 0); + } + + void bareLong(long value) { + bareVarint((value << 1) ^ (value >> 63)); + } + + void bareDouble(double value) { + reserve(8); + buf.putDouble(value); + } + + void fieldHeader(Ty ty, int id) { + reserve(1); + buf.put((byte)(id << 3 | ty.tag)); + } + + void fieldVarint(int id, long value) { + fieldHeader(Ty.Varint, id); + bareVarint(value); + } + + void fieldDouble(int id, double value) { + fieldHeader(Ty.Double, id); + bareDouble(value); + } + + void fieldPacked(int id, ProtobufWriter pw) { + fieldHeader(Ty.Bytes, id); + bareVarint(pw.buf.remaining()); + reserve(pw.buf.remaining()); + buf.put(pw.buf); + } + + void reserve(int more) { + if (buf.remaining() >= more) { + return; + } + grow(more); + } + + void grow(int more) { + final int pos = buf.position(); + final int newSize = growSize(buf.capacity(), more); + buf = ByteBuffer.wrap(Arrays.copyOf(buf.array(), newSize)).order(buf.order()); + buf.position(pos); + } + + static int growSize(int capacity, int more) { + if (capacity > Integer.MAX_VALUE - more) { + throw new BufferOverflowException(); + } + final int newSize = capacity + more; + if (capacity < Integer.MAX_VALUE / 2 && newSize < capacity * 2) { + return capacity * 2; + } + return newSize; + } + + void clear() { + buf.clear(); + } + + void flip() { + buf.flip(); + } + + static final String b64chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + void encodeAscii(StringBuilder sb) { + int bits = 0; + int left = 0; + do { + if (left < 6 && buf.hasRemaining()) { + int val = buf.get(); + if (val < 0) { + val += 256; + } + bits |= val << (8 - left); + left += 8; + } + sb.append(b64chars.charAt(bits >> 10)); + bits = (bits << 6) & 0xffff; + left -= 6; + } + while (left > 0 || buf.hasRemaining()); + } + + @Override + public String toString() { + Formatter fmt = new Formatter(); + fmt.format("["); + for (int i = 0; i < buf.limit(); i++) { + fmt.format(" %02x", buf.get(i)); + } + fmt.format(" ]"); + return fmt.toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ProtobufWriter) { + return buf.equals(((ProtobufWriter)obj).buf); + } + return false; + } +} diff --git a/src/test/java/com/timgroup/statsd/DirectSketchTest.java b/src/test/java/com/timgroup/statsd/DirectSketchTest.java new file mode 100644 index 00000000..fd827848 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/DirectSketchTest.java @@ -0,0 +1,116 @@ +package com.timgroup.statsd; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static com.timgroup.statsd.DirectSketch.key; + +public class DirectSketchTest { + DirectSketch s = new DirectSketch(); + + @Test + public void keys() { + final double oneSmaller = Double.longBitsToDouble(Double.doubleToLongBits(1e-9)-1); + assertEquals(32767, key(1e300)); + assertEquals(DirectSketch.bias, key(1)); + assertEquals(0, key(oneSmaller)); + assertEquals(0, key(0)); + assertEquals(0, key(-oneSmaller)); + assertEquals(-DirectSketch.bias, key(-1)); + assertEquals(-32767, key(-1e300)); + } + + @Test + public void basic() { + s.build((long[])null, 1); + assertEquals(0, s.min, 0); + assertEquals(0, s.max, 0); + assertEquals(0, s.sum, 0); + assertEquals(0, s.cnt, 0); + assertEquals(longs(), s.keys); + assertEquals(varints(), s.bins); + + s.build(new long[]{423, 234}, 0); + assertEquals(234, s.min, 0); + assertEquals(423, s.max, 0); + assertEquals(657, s.sum, 0); + assertEquals(2, s.cnt, 0); + assertEquals(longs(1690, 1728), s.keys); + assertEquals(varints(1, 1), s.bins); + ProtobufWriter pw = new ProtobufWriter(); + s.serialize(pw, 1747236777); + StringBuilder sb = new StringBuilder(); + pw.flip(); + pw.encodeAscii(sb); + assertEquals("CKnvksEGEAIZAAAAAABAbUAhAAAAAABwekApAAAAAACIdEAxAAAAAACIhEA6BLQagBtCAgEB", sb.toString()); + + s.build(new long[]{}, 1); + assertEquals(0, s.min, 0); + assertEquals(0, s.max, 0); + assertEquals(0, s.sum, 0); + assertEquals(0, s.cnt, 0); + assertEquals(longs(), s.keys); + assertEquals(varints(), s.bins); + + long[] values = new long[]{4, 2, 3, 1, 0, 3, -1, -2}; + s.build(values, 2); + + assertEquals(-2, s.min, 0); + assertEquals(4, s.max, 0); + assertEquals(10, s.sum, 0); + assertEquals((double)values.length, s.cnt, 0); + + assertEquals(longs(-1383, -1338, 0, 1338, 1383, 1409, 1427), s.keys); + assertEquals(varints(1, 1, 1, 1, 1, 2, 1), s.bins); + + ProtobufWriter w = new ProtobufWriter(); + s.serialize(w, 1000); + w.flip(); + assertEquals("["+ + " 08 e8 07" + + " 10 08" + + " 19 00 00 00 00 00 00 00 c0" + + " 21 00 00 00 00 00 00 10 40" + + " 29 00 00 00 00 00 00 f4 3f" + + " 31 00 00 00 00 00 00 24 40" + + " 3a 0d cd 15 f3 14 00 f4 14 ce 15 82 16 a6 16" + + " 42 07 01 01 01 01 01 02 01" + + " ]", w.toString()); + } + + @Test + public void overflow() { + final int maxInt = Integer.MAX_VALUE; + + long[] values = new long[]{ 1, 1, 1, 1, 1, 1 }; + s.build(values, 1e-9); + + assertEquals(0, s.min, 1); + assertEquals(0, s.max, 1); + assertEquals((double)values.length * (1 / 1e-9), s.cnt, 0); + assertEquals(longs(1338, 1338, 1338), s.keys); + assertEquals(varints(maxInt, maxInt, 1705032700), s.bins); + + s.build(values, 1/(double)(maxInt)); + assertEquals(longs(1338, 1338, 1338, 1338, 1338, 1338), s.keys); + assertEquals(varints(maxInt, maxInt, maxInt, maxInt, maxInt, maxInt), s.bins); + } + + ProtobufWriter varints(long... vals) { + ProtobufWriter w = new ProtobufWriter(); + for (long v : vals) { + w.bareVarint(v); + } + w.flip(); + return w; + } + + ProtobufWriter longs(long... vals) { + ProtobufWriter w = new ProtobufWriter(); + for (long v : vals) { + w.bareLong(v); + } + w.flip(); + return w; + } +} diff --git a/src/test/java/com/timgroup/statsd/NonBlockingDirectStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingDirectStatsDClientTest.java index ad1ff7a7..a7de7818 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingDirectStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingDirectStatsDClientTest.java @@ -19,7 +19,7 @@ public class NonBlockingDirectStatsDClientTest { private static final int STATSD_SERVER_PORT = 17256; - private static final int MAX_PACKET_SIZE = 64; + private static final int MAX_PACKET_SIZE = 8182; private static DirectStatsDClient client; private static DummyStatsDServer server; @@ -102,8 +102,24 @@ public void sends_multivalued_distribution_to_statsd_with_tags_and_sampling_rate assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mydistribution:423:234|d|@1.000000|#baz,foo:bar"))); } + @Test(timeout = 5000L) + public void sends_sketch_to_statsd() { + client.recordSketchWithTimestamp("mysketch", new long[] { 423L, 234L }, 1, 1747236777, "foo:bar", "baz"); + server.waitForMessage("my.prefix"); + assertThat(server.messagesReceived(), hasItem(comparesEqualTo("my.prefix.mysketch:CKnvksEGEAIZAAAAAABAbUAhAAAAAABwekApAAAAAACIdEAxAAAAAACIhEA6BLQagBtCAgEB|S|T1747236777|#baz,foo:bar"))); + } + @Test(timeout = 5000L) public void sends_too_long_multivalued_distribution_to_statsd() { + DirectStatsDClient client = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .hostname("localhost") + .port(STATSD_SERVER_PORT) + .enableTelemetry(false) + .originDetectionEnabled(false) + .maxPacketSizeBytes(64) + .buildDirectStatsDClient(); + long[] values = {423L, 234L, 456L, 512L, 345L, 898L, 959876543123L, 667L}; client.recordDistributionValues("mydistribution", values, 0.4, "foo:bar", "baz"); diff --git a/src/test/java/com/timgroup/statsd/ProtobufWriterTest.java b/src/test/java/com/timgroup/statsd/ProtobufWriterTest.java new file mode 100644 index 00000000..8540e801 --- /dev/null +++ b/src/test/java/com/timgroup/statsd/ProtobufWriterTest.java @@ -0,0 +1,107 @@ +package com.timgroup.statsd; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ProtobufWriterTest { + ProtobufWriter w = new ProtobufWriter(); + + @Test + public void varint() { + String[] cases = new String[]{ + " 0 [ 00 ]", + " 1 [ 01 ]", + " 128 [ 80 01 ]", + " 129 [ 81 01 ]", + " 255 [ ff 01 ]", + " 256 [ 80 02 ]", + " 16384 [ 80 80 01 ]", + " 0x7fffffff [ ff ff ff ff 07 ]", + " 0x80000000 [ 80 80 80 80 08 ]", + "0x7fffffffffffffff [ ff ff ff ff ff ff ff ff 7f ]" + }; + + for (String c : cases) { + String[] pair = c.trim().split(" +", 2); + + w.clear(); + w.bareVarint(Long.decode(pair[0])); + w.flip(); + assertEquals(c, pair[1], w.toString()); + } + } + + @Test + public void zigzag() { + String[] cases = new String[] { + " 0 [ 00 ]", + " -1 [ 01 ]", + " 1 [ 02 ]", + " -2 [ 03 ]", + " 2 [ 04 ]", + " -128 [ ff 01 ]", + " 128 [ 80 02 ]", + " 0x7fffffff [ fe ff ff ff 0f ]", + " -0x80000000 [ ff ff ff ff 0f ]", + " 0x7fffffffffffffff [ fe ff ff ff ff ff ff ff ff 01 ]", + "-0x8000000000000000 [ ff ff ff ff ff ff ff ff ff 01 ]", + }; + + for (String c : cases) { + String[] pair = c.trim().split(" +", 2); + + w.clear(); + w.bareLong(Long.decode(pair[0])); + w.flip(); + assertEquals(c, pair[1], w.toString()); + } + } + + @Test + public void fields() { + ProtobufWriter v = new ProtobufWriter(); + v.bareLong(1); + v.bareLong(2); + v.bareLong(3); + v.flip(); + + w.clear(); + w.fieldVarint(1, 256); + w.fieldDouble(2, 3.14); + w.fieldPacked(3, v); + w.flip(); + + assertEquals("[ 08 80 02 11 1f 85 eb 51 b8 1e 09 40 1a 03 02 04 06 ]", w.toString()); + + StringBuilder b = new StringBuilder(); + w.encodeAscii(b); + assertEquals("CIACER+F61G4HglAGgMCBAY", b.toString()); + } + + @Test + public void varintReserve() { + w.buf.position(w.buf.limit()); + w.bareVarint(Long.MIN_VALUE); + } + + @Test + public void growSize() { + assertEquals(16, ProtobufWriter.growSize(8, 4)); + assertEquals(100, ProtobufWriter.growSize(8, 92)); + assertEquals(0x4000_0000, ProtobufWriter.growSize(0x2000_0000, 4)); + assertEquals(0x4000_1000, ProtobufWriter.growSize(0x4000_0000, 0x1000)); + + Exception ex = null; + try { + ProtobufWriter.growSize(0x4000_0000, 0x4000_0000); + } catch (BufferOverflowException e) { + ex = e; + } + assertNotNull(ex); + } + +}