Skip to content

AGTMETRICS-236 add minimal sketch support to direct client #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions src/main/java/com/timgroup/statsd/DirectSketch.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.timgroup.statsd;

import java.util.Arrays;

class DirectSketch {
static final double gamma = 130.0 / 128;
static final double minValue = 1e-9;
static final double logGamma = Math.log(gamma);
static final int bias = 1 - (int)(Math.floor(Math.log(minValue) / logGamma));
static final int posInfKey = (1 << 15) - 1;
static final int negInfKey = -posInfKey;
static final int maxCount = Integer.MAX_VALUE;

ProtobufWriter keys = new ProtobufWriter();
ProtobufWriter bins = new ProtobufWriter();

double min;
double max;
double sum;
double cnt;

int topKey;
int topCount;

static int key(double value) {
if (value < 0) {
return -key(-value);
}

if (value < minValue) {
return 0;
}

int key = (int)Math.rint(Math.log(value) / logGamma) + bias;
if (key > posInfKey) {
return posInfKey;
}
return key;
}

void reset() {
min = 0;
max = 0;
sum = 0;
cnt = 0;
keys.clear();
bins.clear();

topKey = negInfKey - 1;
topCount = 0;
}

void append(int key, int count) {
keys.bareLong(key);
bins.bareVarint(count);
}

void build(final long[] values, final double sampleRate) {
reset();
buildInner(values, sampleRate);
keys.flip();
bins.flip();
}

private void buildInner(final long[] values, double sampleRate) {
if (values == null || values.length == 0) {
return;
}

Arrays.sort(values);

if (Double.isNaN(sampleRate) || sampleRate <= 0 || sampleRate > 1) {
sampleRate = 1;
}

final double sampleSize = 1 / sampleRate;
min = values[0];
max = values[0];
cnt = sampleSize * (double)values.length;

for (long val : values) {
min = Math.min(min, val);
max = Math.max(max, val);
sum += val * sampleSize;

int key = key(val);

if (key == topKey) {
int remain = (int)sampleSize;
while (topCount > maxCount - remain) {
remain -= maxCount - topCount;
append(key, maxCount);
topCount = 0;
}
topCount += remain;
} else {
if (topCount > 0) {
append(topKey, topCount);
}
topKey = key;
topCount = (int)sampleSize;
}
}

append(topKey, topCount);
}

void serialize(ProtobufWriter pw, long timestamp) {
pw.fieldVarint(1, timestamp);
pw.fieldVarint(2, (long)cnt);
pw.fieldDouble(3, min);
pw.fieldDouble(4, max);
pw.fieldDouble(5, sum / cnt);
pw.fieldDouble(6, sum);
pw.fieldPacked(7, keys);
pw.fieldPacked(8, bins);
}
}
18 changes: 18 additions & 0 deletions src/main/java/com/timgroup/statsd/DirectStatsDClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,22 @@ public interface DirectStatsDClient extends StatsDClient {
* @param tags array of tags to be added to the data
*/
void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags);

/**
* Record values for the specified named distribution.
*
* <p>Values with an explicit timestamp are never aggregated and will be recorded as the metric value at the
* given time.</p>
*
* <p>This method is a DataDog extension, and may not work with other servers.</p>
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect the name of the distribution
* @param values the values to be incorporated in the distribution. This method consumes the array.
* @param sampleRate percentage of time metric to be sent
* @param tags array of tags to be added to the data
*/
void recordSketchWithTimestamp(String aspect, long[] values, double sampleRate, long timestamp, String... tags);

}
3 changes: 2 additions & 1 deletion src/main/java/com/timgroup/statsd/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public enum Type {
HISTOGRAM("h"),
DISTRIBUTION("d"),
EVENT("_e"),
SERVICE_CHECK("_sc");
SERVICE_CHECK("_sc"),
SKETCH("S");

private final String type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ public class NoOpDirectStatsDClient extends NoOpStatsDClient implements DirectSt
@Override public void recordDistributionValues(String aspect, double[] values, double sampleRate, String... tags) { }

@Override public void recordDistributionValues(String aspect, long[] values, double sampleRate, String... tags) { }

@Override public void recordSketchWithTimestamp(
String aspect, long[] values, double sampleRate, long timestamp, String... tags) { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ public void recordDistributionValues(String aspect, long[] values, double sample
}
}

@Override
public void recordSketchWithTimestamp(String aspect, long[] values, double sampleRate, long timestamp, String... tags) {
if (values != null && values.length > 0) {
sendMetric(new LongSketchMessage(aspect, values, sampleRate, timestamp, tags));
}
}

abstract class MultiValuedStatsDMessage extends Message {
private final double sampleRate; // NaN for none
private final long timestamp; // zero for none
Expand Down Expand Up @@ -153,4 +160,58 @@ protected void writeValueTo(StringBuilder buffer, int index) {
buffer.append(values[index]);
}
}

final ProtobufWriter pw = new ProtobufWriter();
final DirectSketch sk = new DirectSketch();

final class LongSketchMessage extends Message {
final long[] values;
final double sampleRate;
final long timestamp;

LongSketchMessage(String aspect, long[] values, double sampleRate, long timestamp, String[] tags) {
super(aspect, Message.Type.SKETCH, tags);
this.sampleRate = sampleRate;
this.values = values;
this.timestamp = timestamp;

}

@Override
public final boolean canAggregate() {
return false;
}

@Override
public final void aggregate(Message message) {}

@Override
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
sk.build(values, sampleRate);

pw.clear();
sk.serialize(pw, timestamp);

builder
.append(prefix)
.append(aspect)
.append(":");

pw.flip();
pw.encodeAscii(builder);

builder.append("|S");

if (timestamp != 0) {
builder.append("|T").append(timestamp);
}
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}
tagString(tags, builder);
builder.append("\n");

return false;
}
}
}
139 changes: 139 additions & 0 deletions src/main/java/com/timgroup/statsd/ProtobufWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.timgroup.statsd;

import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Formatter;

class ProtobufWriter {
enum Ty {
Varint(0),
Double(1),
Bytes(2);

final byte tag;
Ty(int tag) {
this.tag = (byte)tag;
}
}

ByteBuffer buf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);

void bareVarint(long value) {
reserve(10);
do {
byte bv = (byte)(value & 127);
value >>>= 7;
if (value != 0) {
bv |= 128;
}
buf.put(bv);
}
while (value != 0);
}

void bareLong(long value) {
bareVarint((value << 1) ^ (value >> 63));
}

void bareDouble(double value) {
reserve(8);
buf.putDouble(value);
}

void fieldHeader(Ty ty, int id) {
reserve(1);
buf.put((byte)(id << 3 | ty.tag));
}

void fieldVarint(int id, long value) {
fieldHeader(Ty.Varint, id);
bareVarint(value);
}

void fieldDouble(int id, double value) {
fieldHeader(Ty.Double, id);
bareDouble(value);
}

void fieldPacked(int id, ProtobufWriter pw) {
fieldHeader(Ty.Bytes, id);
bareVarint(pw.buf.remaining());
reserve(pw.buf.remaining());
buf.put(pw.buf);
}

void reserve(int more) {
if (buf.remaining() >= more) {
return;
}
grow(more);
}

void grow(int more) {
final int pos = buf.position();
final int newSize = growSize(buf.capacity(), more);
buf = ByteBuffer.wrap(Arrays.copyOf(buf.array(), newSize)).order(buf.order());
buf.position(pos);
}

static int growSize(int capacity, int more) {
if (capacity > Integer.MAX_VALUE - more) {
throw new BufferOverflowException();
}
final int newSize = capacity + more;
if (capacity < Integer.MAX_VALUE / 2 && newSize < capacity * 2) {
return capacity * 2;
}
return newSize;
}

void clear() {
buf.clear();
}

void flip() {
buf.flip();
}

static final String b64chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";

void encodeAscii(StringBuilder sb) {
int bits = 0;
int left = 0;
do {
if (left < 6 && buf.hasRemaining()) {
int val = buf.get();
if (val < 0) {
val += 256;
}
bits |= val << (8 - left);
left += 8;
}
sb.append(b64chars.charAt(bits >> 10));
bits = (bits << 6) & 0xffff;
left -= 6;
}
while (left > 0 || buf.hasRemaining());
}

@Override
public String toString() {
Formatter fmt = new Formatter();
fmt.format("[");
for (int i = 0; i < buf.limit(); i++) {
fmt.format(" %02x", buf.get(i));
}
fmt.format(" ]");
return fmt.toString();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ProtobufWriter) {
return buf.equals(((ProtobufWriter)obj).buf);
}
return false;
}
}
Loading