diff --git a/src/main/java/com/timgroup/statsd/BufferPool.java b/src/main/java/com/timgroup/statsd/BufferPool.java index 298236de..86650690 100644 --- a/src/main/java/com/timgroup/statsd/BufferPool.java +++ b/src/main/java/com/timgroup/statsd/BufferPool.java @@ -11,7 +11,6 @@ public class BufferPool { private final int bufferSize; private final boolean direct; - BufferPool(final int poolSize, int bufferSize, final boolean direct) throws InterruptedException { size = poolSize; diff --git a/src/main/java/com/timgroup/statsd/Message.java b/src/main/java/com/timgroup/statsd/Message.java index 322f8861..47084a5d 100644 --- a/src/main/java/com/timgroup/statsd/Message.java +++ b/src/main/java/com/timgroup/statsd/Message.java @@ -56,11 +56,10 @@ protected Message(String aspect, Message.Type type, String[] tags) { * * @param builder StringBuilder the text representation will be written to. * @param capacity The capacity of the send buffer. - * @param containerID The container ID to be appended to the message. * @return boolean indicating whether the message was partially written to the builder. * If true, the method will be called again with the same arguments to continue writing. */ - abstract boolean writeTo(StringBuilder builder, int capacity, String containerID); + abstract boolean writeTo(StringBuilder builder, int capacity); /** * Aggregate message. diff --git a/src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java index 9b7de0ee..387c4037 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingDirectStatsDClient.java @@ -42,21 +42,21 @@ public final void aggregate(Message message) { } @Override - public final boolean writeTo(StringBuilder builder, int capacity, String containerID) { - int metadataSize = metadataSize(builder, containerID); + public final boolean writeTo(StringBuilder builder, int capacity) { + int metadataSize = metadataSize(builder); writeHeadMetadata(builder); boolean partialWrite = writeValuesTo(builder, capacity - metadataSize); - writeTailMetadata(builder, containerID); + writeTailMetadata(builder); return partialWrite; } - private int metadataSize(StringBuilder builder, String containerID) { + private int metadataSize(StringBuilder builder) { if (metadataSize == -1) { final int previousLength = builder.length(); final int previousEncodedLength = Utf8.encodedLength(builder); writeHeadMetadata(builder); - writeTailMetadata(builder, containerID); + writeTailMetadata(builder); metadataSize = Utf8.encodedLength(builder) - previousEncodedLength; builder.setLength(previousLength); } @@ -67,7 +67,7 @@ private void writeHeadMetadata(StringBuilder builder) { builder.append(prefix).append(aspect); } - private void writeTailMetadata(StringBuilder builder, String containerID) { + private void writeTailMetadata(StringBuilder builder) { builder.append('|').append(type); if (!Double.isNaN(sampleRate)) { builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate)); @@ -76,11 +76,7 @@ private void writeTailMetadata(StringBuilder builder, String containerID) { builder.append("|T").append(timestamp); } tagString(tags, builder); - if (containerID != null && !containerID.isEmpty()) { - builder.append("|c:").append(containerID); - } - - builder.append('\n'); + writeMessageTail(builder); } private boolean writeValuesTo(StringBuilder builder, int remainingCapacity) { diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java index eb2c0f49..aaf9c679 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java @@ -19,7 +19,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadLocalRandom; - /** * A simple StatsD client implementation facilitating metrics recording. * @@ -174,111 +173,47 @@ protected static String format(ThreadLocal formatter, Number value protected final StatsDSender statsDSender; protected StatsDSender telemetryStatsDSender; protected final Telemetry telemetry; + final String telemetryTags; private final int maxPacketSizeBytes; private final boolean blocking; + private final String containerID; /** * Create a new StatsD client communicating with a StatsD instance on the - * specified host and port. All messages send via this client will have - * their keys prefixed with the specified string. The new client will - * attempt to open a connection to the StatsD server immediately upon - * instantiation, and may throw an exception if that a connection cannot - * be established. Once a client has been instantiated in this way, all - * exceptions thrown during subsequent usage are passed to the specified - * handler and then consumed, guaranteeing that failures in metrics will - * not affect normal code execution. + * host and port specified by the given builder. + * The builder must be resolved before calling this internal constructor. + * + * @param builder + * the resolved configuration builder * - * @param prefix - * the prefix to apply to keys sent via this client - * @param constantTags - * tags to be added to all content sent - * @param errorHandler - * handler to use when an exception occurs during usage, may be null to indicate noop - * @param addressLookup - * yields the IP address and socket of the StatsD server - * @param telemetryAddressLookup - * yields the IP address and socket of the StatsD telemetry server destination - * @param queueSize - * the maximum amount of unprocessed messages in the Queue. - * @param timeout - * the timeout in milliseconds for blocking operations. Applies to unix sockets only. - * @param bufferSize - * the socket buffer size in bytes. Applies to unix sockets only. - * @param maxPacketSizeBytes - * the maximum number of bytes for a message that can be sent - * @param entityID - * the entity id value used with an internal tag for tracking client entity. - * If "entityID=null" the client default the value with the environment variable "DD_ENTITY_ID". - * If the environment variable is not defined, the internal tag is not added. - * @param poolSize - * The size for the network buffer pool. - * @param processorWorkers - * The number of processor worker threads assembling buffers for submission. - * @param senderWorkers - * The number of sender worker threads submitting buffers to the socket. - * @param blocking - * Blocking or non-blocking implementation for statsd message queue. - * @param enableTelemetry - * Boolean to enable client telemetry. - * @param telemetryFlushInterval - * Telemetry flush interval integer, in milliseconds. - * @param aggregationFlushInterval - * Aggregation flush interval integer, in milliseconds. 0 disables aggregation. - * @param aggregationShards - * Aggregation flush interval integer, in milliseconds. 0 disables aggregation. - * @param containerID - * Allows passing the container ID, this will be used by the Agent to enrich - * metrics with container tags. - * This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0. - * When configured, the provided container ID is prioritized over the container ID discovered - * via Origin Detection. When entityID or DD_ENTITY_ID are set, this value is ignored. - * @param originDetectionEnabled - * Enable/disable the client origin detection. - * This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0. - * When enabled, the client tries to discover its container ID and sends it to the Agent - * to enrich the metrics with container tags. - * Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false - * The client tries to read the container ID by parsing the file /proc/self/cgroup. - * This is not supported on Windows. - * The client prioritizes the value passed via or entityID or DD_ENTITY_ID (if set) over the container ID. - * @param connectionTimeout - * the timeout in milliseconds for connecting to the StatsD server. Applies to unix sockets only. - * It is also used to detect if a connection is still alive and re-establish a new one if needed. - * @throws StatsDClientException - * if the client could not be started + * @see NonBlockingStatsDClientBuilder#resolve() */ - NonBlockingStatsDClient(final String prefix, final int queueSize, final String[] constantTags, - final StatsDClientErrorHandler errorHandler, final Callable addressLookup, - final Callable telemetryAddressLookup, final int timeout, final int bufferSize, - final int maxPacketSizeBytes, String entityID, final int poolSize, final int processorWorkers, - final int senderWorkers, boolean blocking, final boolean enableTelemetry, final int telemetryFlushInterval, - final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory customThreadFactory, - String containerID, final boolean originDetectionEnabled, final int connectionTimeout) - throws StatsDClientException { - - if ((prefix != null) && (!prefix.isEmpty())) { - this.prefix = prefix + "."; + public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) throws StatsDClientException { + + if (builder.prefix != null && !builder.prefix.isEmpty()) { + prefix = builder.prefix + "."; } else { - this.prefix = ""; + prefix = ""; } - if (errorHandler == null) { + + if (builder.errorHandler == null) { handler = NO_OP_HANDLER; } else { - handler = errorHandler; + handler = builder.errorHandler; } - this.blocking = blocking; - this.maxPacketSizeBytes = maxPacketSizeBytes; + blocking = builder.blocking; + maxPacketSizeBytes = builder.maxPacketSizeBytes; { List costantPreTags = new ArrayList<>(); - if (constantTags != null) { - for (final String constantTag : constantTags) { + if (builder.constantTags != null) { + for (final String constantTag : builder.constantTags) { costantPreTags.add(constantTag); } } // Support "dd.internal.entity_id" internal tag. - updateTagsWithEntityID(costantPreTags, entityID); + updateTagsWithEntityID(costantPreTags, builder.entityID); for (final Literal literal : Literal.values()) { final String envVal = literal.envVal(); if (envVal != null && !envVal.trim().isEmpty()) { @@ -293,44 +228,44 @@ protected static String format(ThreadLocal formatter, Number value } costantPreTags = null; // Origin detection - boolean originEnabled = isOriginDetectionEnabled(containerID, originDetectionEnabled); - containerID = getContainerID(containerID, originEnabled); + boolean originEnabled = isOriginDetectionEnabled(builder.containerID, builder.originDetectionEnabled); + containerID = getContainerID(builder.containerID, originEnabled); } try { - clientChannel = createByteChannel(addressLookup, timeout, connectionTimeout, bufferSize); + clientChannel = createByteChannel(builder.addressLookup, builder.timeout, builder.connectionTimeout, + builder.socketBufferSize); - ThreadFactory threadFactory = customThreadFactory != null ? customThreadFactory : new StatsDThreadFactory(); + ThreadFactory threadFactory = builder.threadFactory != null ? builder.threadFactory : new StatsDThreadFactory(); - statsDProcessor = createProcessor(queueSize, handler, getPacketSize(clientChannel), poolSize, - processorWorkers, blocking, aggregationFlushInterval, aggregationShards, threadFactory, containerID); + int aggregationFlushInterval = builder.enableAggregation ? builder.aggregationFlushInterval : 0; + statsDProcessor = createProcessor(builder.queueSize, handler, getPacketSize(clientChannel), builder.bufferPoolSize, + builder.processorWorkers, builder.blocking, aggregationFlushInterval, builder.aggregationShards, threadFactory); Properties properties = new Properties(); properties.load(getClass().getClassLoader().getResourceAsStream( "dogstatsd/version.properties")); - String telemetryTags = tagString(new String[]{CLIENT_TRANSPORT_TAG + clientChannel.getTransportType(), - CLIENT_VERSION_TAG + properties.getProperty("dogstatsd_client_version"), - CLIENT_TAG}, new StringBuilder()).toString(); + telemetryTags = tagString(new String[]{CLIENT_TRANSPORT_TAG + clientChannel.getTransportType(), + CLIENT_VERSION_TAG + properties.getProperty("dogstatsd_client_version"), + CLIENT_TAG}, new StringBuilder()).toString(); - if (addressLookup == telemetryAddressLookup) { + if (builder.addressLookup == builder.telemetryAddressLookup) { telemetryClientChannel = clientChannel; telemetryStatsDProcessor = statsDProcessor; } else { - telemetryClientChannel = createByteChannel(telemetryAddressLookup, timeout, connectionTimeout, bufferSize); + telemetryClientChannel = createByteChannel(builder.telemetryAddressLookup, builder.timeout, + builder.connectionTimeout, builder.socketBufferSize); // similar settings, but a single worker and non-blocking. - telemetryStatsDProcessor = createProcessor(queueSize, handler, getPacketSize(telemetryClientChannel), - poolSize, 1, false, 0, aggregationShards, threadFactory, containerID); + telemetryStatsDProcessor = createProcessor(builder.queueSize, handler, getPacketSize(telemetryClientChannel), + builder.bufferPoolSize, 1, false, 0, builder.aggregationShards, threadFactory); } - this.telemetry = new Telemetry.Builder() - .tags(telemetryTags) - .processor(telemetryStatsDProcessor) - .build(); + telemetry = new Telemetry(this); statsDSender = createSender(handler, clientChannel, statsDProcessor.getBufferPool(), - statsDProcessor.getOutboundQueue(), senderWorkers, threadFactory); + statsDProcessor.getOutboundQueue(), builder.senderWorkers, threadFactory); telemetryStatsDSender = statsDSender; if (telemetryStatsDProcessor != statsDProcessor) { @@ -341,8 +276,8 @@ protected static String format(ThreadLocal formatter, Number value } // set telemetry - statsDProcessor.setTelemetry(this.telemetry); - statsDSender.setTelemetry(this.telemetry); + statsDProcessor.setTelemetry(telemetry); + statsDSender.setTelemetry(telemetry); } catch (final Exception e) { throw new StatsDClientException("Failed to start StatsD client", e); @@ -351,47 +286,25 @@ protected static String format(ThreadLocal formatter, Number value statsDProcessor.startWorkers("StatsD-Processor-"); statsDSender.startWorkers("StatsD-Sender-"); - if (enableTelemetry) { + if (builder.enableTelemetry) { if (telemetryStatsDProcessor != statsDProcessor) { telemetryStatsDProcessor.startWorkers("StatsD-TelemetryProcessor-"); telemetryStatsDSender.startWorkers("StatsD-TelemetrySender-"); } - this.telemetry.start(telemetryFlushInterval); + telemetry.start(builder.telemetryFlushInterval); } } - /** - * Create a new StatsD client communicating with a StatsD instance on the - * host and port specified by the given builder. - * The builder must be resolved before calling this internal constructor. - * - * @param builder - * the resolved configuration builder - * - * @see NonBlockingStatsDClientBuilder#resolve() - */ - public NonBlockingStatsDClient(final NonBlockingStatsDClientBuilder builder) throws StatsDClientException { - this(builder.prefix, builder.queueSize, builder.constantTags, builder.errorHandler, - builder.addressLookup, builder.telemetryAddressLookup, builder.timeout, - builder.socketBufferSize, builder.maxPacketSizeBytes, builder.entityID, - builder.bufferPoolSize, builder.processorWorkers, builder.senderWorkers, - builder.blocking, builder.enableTelemetry, builder.telemetryFlushInterval, - (builder.enableAggregation ? builder.aggregationFlushInterval : 0), - builder.aggregationShards, builder.threadFactory, builder.containerID, - builder.originDetectionEnabled, builder.connectionTimeout); - } - protected StatsDProcessor createProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int bufferPoolSize, final int workers, final boolean blocking, - final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory, - final String containerID) + final int aggregationFlushInterval, final int aggregationShards, final ThreadFactory threadFactory) throws Exception { if (blocking) { return new StatsDBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID); + workers, aggregationFlushInterval, aggregationShards, threadFactory); } else { return new StatsDNonBlockingProcessor(queueSize, handler, maxPacketSizeBytes, bufferPoolSize, - workers, aggregationFlushInterval, aggregationShards, threadFactory, containerID); + workers, aggregationFlushInterval, aggregationShards, threadFactory); } } @@ -524,7 +437,7 @@ protected StatsDMessage(String aspect, Message.Type type, T value, double sample } @Override - public final boolean writeTo(StringBuilder builder, int capacity, String containerID) { + public final boolean writeTo(StringBuilder builder, int capacity) { builder.append(prefix).append(aspect).append(':'); writeValue(builder); builder.append('|').append(type); @@ -535,11 +448,7 @@ public final boolean writeTo(StringBuilder builder, int capacity, String contain builder.append("|T").append(timestamp); } tagString(this.tags, builder); - if (containerID != null && !containerID.isEmpty()) { - builder.append("|c:").append(containerID); - } - - builder.append('\n'); + writeMessageTail(builder); return false; } @@ -552,6 +461,12 @@ public boolean canAggregate() { protected abstract void writeValue(StringBuilder builder); } + void writeMessageTail(StringBuilder builder) { + if (containerID != null && !containerID.isEmpty()) { + builder.append("|c:").append(containerID); + } + builder.append('\n'); + } boolean sendMetric(final Message message) { return send(message); @@ -1144,7 +1059,7 @@ private StringBuilder eventMap(final Event event, StringBuilder res) { @Override public void recordEvent(final Event event, final String... eventTags) { statsDProcessor.send(new AlphaNumericMessage(Message.Type.EVENT, "") { - @Override public boolean writeTo(StringBuilder builder, int capacity, String containerID) { + @Override public boolean writeTo(StringBuilder builder, int capacity) { final String title = escapeEventString(prefix + event.getTitle()); final String text = escapeEventString(event.getText()); builder.append(Message.Type.EVENT.toString()) @@ -1162,11 +1077,7 @@ public void recordEvent(final Event event, final String... eventTags) { eventMap(event, builder); tagString(eventTags, builder); - if (containerID != null && !containerID.isEmpty()) { - builder.append("|c:").append(containerID); - } - - builder.append('\n'); + writeMessageTail(builder); return false; } }); @@ -1201,7 +1112,7 @@ private int getUtf8Length(final String text) { public void recordServiceCheckRun(final ServiceCheck sc) { statsDProcessor.send(new AlphaNumericMessage(Message.Type.SERVICE_CHECK, "") { @Override - public boolean writeTo(StringBuilder sb, int capacity, String containerID) { + public boolean writeTo(StringBuilder sb, int capacity) { // see http://docs.datadoghq.com/guides/dogstatsd/#service-checks sb.append(Message.Type.SERVICE_CHECK.toString()) .append("|") @@ -1218,11 +1129,7 @@ public boolean writeTo(StringBuilder sb, int capacity, String containerID) { if (sc.getMessage() != null) { sb.append("|m:").append(sc.getEscapedMessage()); } - if (containerID != null && !containerID.isEmpty()) { - sb.append("|c:").append(containerID); - } - - sb.append('\n'); + writeMessageTail(sb); return false; } }); @@ -1289,16 +1196,12 @@ protected void writeValue(StringBuilder builder) { } @Override - protected final boolean writeTo(StringBuilder builder, int capacity, String containerID) { + protected final boolean writeTo(StringBuilder builder, int capacity) { builder.append(prefix).append(aspect).append(':'); writeValue(builder); builder.append('|').append(type); tagString(this.tags, builder); - if (containerID != null && !containerID.isEmpty()) { - builder.append("|c:").append(containerID); - } - - builder.append('\n'); + writeMessageTail(builder); return false; } }); @@ -1346,4 +1249,39 @@ private String getContainerID(String containerID, boolean originDetectionEnabled private int getPacketSize(ClientChannel chan) { return maxPacketSizeBytes > 0 ? maxPacketSizeBytes : chan.getMaxPacketSizeBytes(); } + + class TelemetryMessage extends NumericMessage { + private final String tagsString; // pre-baked comma separeated tags string + + protected TelemetryMessage(String metric, Integer value, String tags) { + super(metric, Message.Type.COUNT, value, null); + this.tagsString = tags; + this.done = true; // dont aggregate telemetry messages for now + } + + @Override + public final boolean writeTo(StringBuilder builder, int capacity) { + builder.append(aspect) + .append(':') + .append(this.value) + .append('|') + .append(type) + .append(tagsString); + writeMessageTail(builder); + return false; + } + } + + public void sendTelemetryMetric(String metric, Integer value) { + telemetryStatsDProcessor.send(new TelemetryMessage(metric, value, telemetryTags)); + } + + void sendTelemetryMetric(String metric, Integer value, String tags) { + StringBuilder tagsBuilder = new StringBuilder(); + tagsBuilder.setLength(0); + tagsBuilder.append(telemetryTags); + tagsBuilder.append(','); // telemetryTags is never empty + tagsBuilder.append(tags); + telemetryStatsDProcessor.send(new TelemetryMessage(metric, value, tagsBuilder.toString())); + } } diff --git a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java index 2ceedccd..c396f69f 100644 --- a/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java +++ b/src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java @@ -11,43 +11,93 @@ import java.util.concurrent.Callable; import java.util.concurrent.ThreadFactory; +/** + * Create a new StatsD client communicating with a StatsD instance on the + * specified host and port. All messages send via this client will have + * their keys prefixed with the specified string. The new client will + * attempt to open a connection to the StatsD server immediately upon + * instantiation, and may throw an exception if that a connection cannot + * be established. Once a client has been instantiated in this way, all + * exceptions thrown during subsequent usage are passed to the specified + * handler and then consumed, guaranteeing that failures in metrics will + * not affect normal code execution. + */ public class NonBlockingStatsDClientBuilder implements Cloneable { - /** - * 1400 chosen as default here so that the number of bytes in a message plus the number of bytes required - * for additional udp headers should be under the 1500 Maximum Transmission Unit for ethernet. - * See https://github.com/DataDog/java-dogstatsd-client/pull/17 for discussion. - */ - + /** The maximum number of bytes for a message that can be sent. */ public int maxPacketSizeBytes = 0; public int port = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT; public int telemetryPort = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT; + /** The maximum amount of unprocessed messages in the queue. */ public int queueSize = NonBlockingStatsDClient.DEFAULT_QUEUE_SIZE; + /** The timeout in milliseconds for blocking operations. Applies to unix sockets only. */ public int timeout = NonBlockingStatsDClient.SOCKET_TIMEOUT_MS; + /** The size for the network buffer pool. */ public int bufferPoolSize = NonBlockingStatsDClient.DEFAULT_POOL_SIZE; + /** The socket buffer size in bytes. Applies to unix sockets only. */ public int socketBufferSize = NonBlockingStatsDClient.SOCKET_BUFFER_BYTES; + /** The number of processor worker threads assembling buffers for submission. */ public int processorWorkers = NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS; + /** The number of sender worker threads submitting buffers to the socket. */ public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS; + /** Blocking or non-blocking implementation for statsd message queue. */ public boolean blocking = NonBlockingStatsDClient.DEFAULT_BLOCKING; + /** Enable sending client telemetry. */ public boolean enableTelemetry = NonBlockingStatsDClient.DEFAULT_ENABLE_TELEMETRY; public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION; + /** Telemetry flush interval, in milliseconds. */ public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL; + /** Aggregation flush interval, in milliseconds. 0 disables aggregation. */ public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL; public int aggregationShards = StatsDAggregator.DEFAULT_SHARDS; + /** + * Enable/disable the client origin detection. + * + *

This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0. + * When enabled, the client tries to discover its container ID and sends it to the Agent + * to enrich the metrics with container tags. + * Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false + * The client tries to read the container ID by parsing the file /proc/self/cgroup. + * This is not supported on Windows. + */ public boolean originDetectionEnabled = NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION; + /** + * The timeout in milliseconds for connecting to the StatsD server. Applies to unix sockets only. + * + *

It is also used to detect if a connection is still alive and re-establish a new one if needed. + */ public int connectionTimeout = NonBlockingStatsDClient.SOCKET_CONNECT_TIMEOUT_MS; + /** Yields the IP address and socket of the StatsD server. */ public Callable addressLookup; + /** Yields the IP address and socket of the StatsD telemetry server destination. */ public Callable telemetryAddressLookup; public String hostname; public String telemetryHostname; public String namedPipe; + + /** The prefix to apply to keys sent via this client. */ public String prefix; + + /** The entity id value used with an internal tag for tracking client entity. + * + *

If null the client default the value with the environment variable "DD_ENTITY_ID". + * If the environment variable is not defined, the internal tag is not added. + */ public String entityID; + /** Tags to be added to all content sent. */ public String[] constantTags; + /** + * Allows passing the container ID, this will be used by the Agent to enrich + * metrics with container tags. + * + *

This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0. + * When configured, the provided container ID is prioritized over the container ID discovered + * via Origin Detection. When entityID or DD_ENTITY_ID are set, this value is ignored. + */ public String containerID; - + /** Handler to use when an exception occurs during usage, may be null to indicate noop. */ public StatsDClientErrorHandler errorHandler; public ThreadFactory threadFactory; @@ -63,26 +113,35 @@ public NonBlockingStatsDClientBuilder telemetryPort(int val) { return this; } + /** The maximum amount of unprocessed messages in the queue. */ public NonBlockingStatsDClientBuilder queueSize(int val) { queueSize = val; return this; } + /** The timeout in milliseconds for blocking operations. Applies to unix sockets only. */ public NonBlockingStatsDClientBuilder timeout(int val) { timeout = val; return this; } + /** + * The timeout in milliseconds for connecting to the StatsD server. Applies to unix sockets only. + * + *

It is also used to detect if a connection is still alive and re-establish a new one if needed. + */ public NonBlockingStatsDClientBuilder connectionTimeout(int val) { connectionTimeout = val; return this; } + /** The size for the network buffer pool. */ public NonBlockingStatsDClientBuilder bufferPoolSize(int val) { bufferPoolSize = val; return this; } + /** The socket buffer size in bytes. Applies to unix sockets only. */ public NonBlockingStatsDClientBuilder socketBufferSize(int val) { socketBufferSize = val; return this; @@ -93,26 +152,31 @@ public NonBlockingStatsDClientBuilder maxPacketSizeBytes(int val) { return this; } + /** The number of processor worker threads assembling buffers for submission. */ public NonBlockingStatsDClientBuilder processorWorkers(int val) { processorWorkers = val; return this; } + /** The number of sender worker threads submitting buffers to the socket. */ public NonBlockingStatsDClientBuilder senderWorkers(int val) { senderWorkers = val; return this; } + /** Blocking or non-blocking implementation for statsd message queue. */ public NonBlockingStatsDClientBuilder blocking(boolean val) { blocking = val; return this; } + /** Yields the IP address and socket of the StatsD server. */ public NonBlockingStatsDClientBuilder addressLookup(Callable val) { addressLookup = val; return this; } + /** Yields the IP address and socket of the StatsD telemetry server destination. */ public NonBlockingStatsDClientBuilder telemetryAddressLookup(Callable val) { telemetryAddressLookup = val; return this; @@ -143,26 +207,35 @@ public NonBlockingStatsDClientBuilder telemetryAddress(String address) { return this; } + /** The prefix to apply to keys sent via this client. */ public NonBlockingStatsDClientBuilder prefix(String val) { prefix = val; return this; } + /** The entity id value used with an internal tag for tracking client entity. + * + *

If null the client default the value with the environment variable "DD_ENTITY_ID". + * If the environment variable is not defined, the internal tag is not added. + */ public NonBlockingStatsDClientBuilder entityID(String val) { entityID = val; return this; } + /** Tags to be added to all content sent. */ public NonBlockingStatsDClientBuilder constantTags(String... val) { constantTags = val; return this; } + /** Handler to use when an exception occurs during usage, may be null to indicate noop. */ public NonBlockingStatsDClientBuilder errorHandler(StatsDClientErrorHandler val) { errorHandler = val; return this; } + /** Enable sending client telemetry. */ public NonBlockingStatsDClientBuilder enableTelemetry(boolean val) { enableTelemetry = val; return this; @@ -173,11 +246,13 @@ public NonBlockingStatsDClientBuilder enableAggregation(boolean val) { return this; } + /** Telemetry flush interval, in milliseconds. */ public NonBlockingStatsDClientBuilder telemetryFlushInterval(int val) { telemetryFlushInterval = val; return this; } + /** Aggregation flush interval, in milliseconds. 0 disables aggregation. */ public NonBlockingStatsDClientBuilder aggregationFlushInterval(int val) { aggregationFlushInterval = val; return this; @@ -193,11 +268,30 @@ public NonBlockingStatsDClientBuilder threadFactory(ThreadFactory val) { return this; } + /** + * Allows passing the container ID, this will be used by the Agent to enrich + * metrics with container tags. + * + *

This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >=7.35.0. + * When configured, the provided container ID is prioritized over the container ID discovered + * via Origin Detection. When entityID or DD_ENTITY_ID are set, this value is ignored. + */ public NonBlockingStatsDClientBuilder containerID(String val) { containerID = val; return this; } + /** + * Enable/disable the client origin detection. + * + *

This feature requires Datadog Agent version >=6.35.0 && <7.0.0 or Agent versions >7.35.0. + * When enabled, the client tries to discover its container ID and sends it to the Agent + * to enrich the metrics with container tags. + * Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false + * The client tries to read the container ID by parsing the file /proc/self/cgroup. + * This is not supported on Windows. + * The client prioritizes the value passed via or entityID or DD_ENTITY_ID (if set) over the container ID. + */ public NonBlockingStatsDClientBuilder originDetectionEnabled(boolean val) { originDetectionEnabled = val; return this; @@ -426,7 +520,4 @@ private static int getPortFromEnvVar(final int defaultPort) { } } } - - } - diff --git a/src/main/java/com/timgroup/statsd/NumericMessage.java b/src/main/java/com/timgroup/statsd/NumericMessage.java index 6ba59ff4..a1b39940 100644 --- a/src/main/java/com/timgroup/statsd/NumericMessage.java +++ b/src/main/java/com/timgroup/statsd/NumericMessage.java @@ -1,8 +1,6 @@ package com.timgroup.statsd; - public abstract class NumericMessage extends Message { - protected Number value; protected NumericMessage(Message.Type type) { @@ -14,7 +12,6 @@ protected NumericMessage(String aspect, Message.Type type, T value, String[] tag this.value = value; } - /** * Aggregate message. * @@ -59,6 +56,4 @@ public Number getValue() { public void setValue(Number value) { this.value = value; } - } - diff --git a/src/main/java/com/timgroup/statsd/StatsDAggregator.java b/src/main/java/com/timgroup/statsd/StatsDAggregator.java index be7182fb..438c39ea 100644 --- a/src/main/java/com/timgroup/statsd/StatsDAggregator.java +++ b/src/main/java/com/timgroup/statsd/StatsDAggregator.java @@ -7,7 +7,6 @@ import java.util.Timer; import java.util.TimerTask; - public class StatsDAggregator { public static int DEFAULT_FLUSH_INTERVAL = 2000; // 2s public static int DEFAULT_SHARDS = 4; // 4 partitions to reduce contention. @@ -89,7 +88,6 @@ public boolean aggregateMessage(Message message) { return false; } - int hash = message.hashCode(); int bucket = Math.abs(hash % this.shardGranularity); Map map = aggregateMetrics.get(bucket); diff --git a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java index f32005b9..7ba94b99 100644 --- a/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDBlockingProcessor.java @@ -9,7 +9,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; - public class StatsDBlockingProcessor extends StatsDProcessor { private final BlockingQueue messages; @@ -30,10 +29,10 @@ protected boolean haveMessages() { StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory, final String containerID) throws Exception { + final ThreadFactory threadFactory) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, - aggregatorFlushInterval, aggregatorShards, threadFactory, containerID); + aggregatorFlushInterval, aggregatorShards, threadFactory); this.messages = new ArrayBlockingQueue<>(queueSize); } diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 0ff9d8ca..64874151 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -9,7 +9,6 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; - public class StatsDNonBlockingProcessor extends StatsDProcessor { private final Queue messages; @@ -38,10 +37,10 @@ protected boolean haveMessages() { StatsDNonBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory, final String containerID) throws Exception { + final ThreadFactory threadFactory) throws Exception { super(queueSize, handler, maxPacketSizeBytes, poolSize, workers, - aggregatorFlushInterval, aggregatorShards, threadFactory, containerID); + aggregatorFlushInterval, aggregatorShards, threadFactory); this.qsize = new AtomicInteger(0); this.messages = new ConcurrentLinkedQueue<>(); } diff --git a/src/main/java/com/timgroup/statsd/StatsDProcessor.java b/src/main/java/com/timgroup/statsd/StatsDProcessor.java index 55ffab53..456624bf 100644 --- a/src/main/java/com/timgroup/statsd/StatsDProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDProcessor.java @@ -37,8 +37,6 @@ public abstract class StatsDProcessor { protected volatile boolean shutdown; volatile boolean shutdownAgg; - String containerID; - protected abstract class ProcessingTask implements Runnable { protected StringBuilder builder = new StringBuilder(); char[] charBuffer = new char[maxPacketSizeBytes]; @@ -99,7 +97,7 @@ protected void processLoop() { boolean partialWrite; do { builder.setLength(0); - partialWrite = message.writeTo(builder, sendBuffer.capacity(), containerID); + partialWrite = message.writeTo(builder, sendBuffer.capacity()); int lowerBoundSize = builder.length(); if (sendBuffer.capacity() < lowerBoundSize) { @@ -185,7 +183,7 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { StatsDProcessor(final int queueSize, final StatsDClientErrorHandler handler, final int maxPacketSizeBytes, final int poolSize, final int workers, final int aggregatorFlushInterval, final int aggregatorShards, - final ThreadFactory threadFactory, final String containerID) throws Exception { + final ThreadFactory threadFactory) throws Exception { this.handler = handler; this.threadFactory = threadFactory; @@ -199,8 +197,6 @@ protected void writeBuilderToSendBuffer(ByteBuffer sendBuffer) { this.endSignal = new CountDownLatch(workers); this.closeSignal = new CountDownLatch(workers); this.aggregator = new StatsDAggregator(this, aggregatorShards, aggregatorFlushInterval); - - this.containerID = containerID; } protected abstract ProcessingTask createProcessingTask(); diff --git a/src/main/java/com/timgroup/statsd/StatsDSender.java b/src/main/java/com/timgroup/statsd/StatsDSender.java index ea910a85..b5269454 100644 --- a/src/main/java/com/timgroup/statsd/StatsDSender.java +++ b/src/main/java/com/timgroup/statsd/StatsDSender.java @@ -28,7 +28,6 @@ public class StatsDSender { private volatile Telemetry telemetry; - StatsDSender(final WritableByteChannel clientChannel, final StatsDClientErrorHandler handler, BufferPool pool, BlockingQueue buffers, final int workers, final ThreadFactory threadFactory) { diff --git a/src/main/java/com/timgroup/statsd/Telemetry.java b/src/main/java/com/timgroup/statsd/Telemetry.java index abca7a66..27fb8ca0 100644 --- a/src/main/java/com/timgroup/statsd/Telemetry.java +++ b/src/main/java/com/timgroup/statsd/Telemetry.java @@ -40,11 +40,8 @@ public class Telemetry { protected final String aggregatedContextsMetric = "datadog.dogstatsd.client.aggregated_context"; protected final String aggregatedContextsByTypeMetric = "datadog.dogstatsd.client.aggregated_context_by_type"; - protected String tags; - protected StringBuilder tagBuilder = new StringBuilder(); - - public StatsDProcessor processor; protected Timer timer; + NonBlockingStatsDClient client; protected class TelemetryTask extends TimerTask { private Telemetry telemetry; @@ -55,63 +52,12 @@ protected class TelemetryTask extends TimerTask { } public void run() { - this.telemetry.flush(); - } - } - - class TelemetryMessage extends NumericMessage { - private final String tagsString; // pre-baked comma separeated tags string - - protected TelemetryMessage(String metric, Integer value, String tags) { - super(metric, Message.Type.COUNT, value, null); - this.tagsString = tags; - this.done = true; // dont aggregate telemetry messages for now - } - - @Override - public final boolean writeTo(StringBuilder builder, int capacity, String containerID) { - builder.append(aspect) - .append(':') - .append(this.value) - .append('|') - .append(type) - .append(tagsString); - - if (containerID != null && !containerID.isEmpty()) { - builder.append("|c:").append(containerID); - } - - builder.append('\n'); // already has the statsd separator baked-in - return false; + telemetry.flush(); } } - Telemetry(final String tags, final StatsDProcessor processor) { - // precompute metrics lines with tags - this.tags = tags; - this.processor = processor; - this.timer = null; - } - - public static class Builder { - private String tags; - private StatsDProcessor processor; - - public Builder() {} - - public Builder tags(String tags) { - this.tags = tags; - return this; - } - - public Builder processor(StatsDProcessor processor) { - this.processor = processor; - return this; - } - - public Telemetry build() { - return new Telemetry(this.tags, this.processor); - } + Telemetry(final NonBlockingStatsDClient client) { + this.client = client; } /** @@ -122,16 +68,16 @@ public Telemetry build() { */ public void start(final long flushInterval) { // flush the telemetry at regualar interval - this.timer = new Timer(true); - this.timer.scheduleAtFixedRate(new TelemetryTask(this), flushInterval, flushInterval); + timer = new Timer(true); + timer.scheduleAtFixedRate(new TelemetryTask(this), flushInterval, flushInterval); } /** * Stops the flush timer for the telemetry. */ public void stop() { - if (this.timer != null) { - this.timer.cancel(); + if (timer != null) { + timer.cancel(); } } @@ -143,61 +89,26 @@ public void flush() { // be spread out among processor worker and we flush every 5s by // default - processor.send(new TelemetryMessage(this.metricsSentMetric, this.metricsSent.getAndSet(0), tags)); - processor.send(new TelemetryMessage(this.eventsSentMetric, this.eventsSent.getAndSet(0), tags)); - processor.send(new TelemetryMessage(this.serviceChecksSentMetric, this.serviceChecksSent.getAndSet(0), tags)); - processor.send(new TelemetryMessage(this.bytesSentMetric, this.bytesSent.getAndSet(0), tags)); - processor.send(new TelemetryMessage(this.bytesDroppedMetric, this.bytesDropped.getAndSet(0), tags)); - processor.send(new TelemetryMessage(this.packetsSentMetric, this.packetsSent.getAndSet(0), tags)); - processor.send(new TelemetryMessage(this.packetsDroppedMetric, this.packetsDropped.getAndSet(0), tags)); - processor.send(new TelemetryMessage(this.packetsDroppedQueueMetric, this.packetsDroppedQueue.getAndSet(0), tags)); - processor.send(new TelemetryMessage(this.aggregatedContextsMetric, this.aggregatedContexts.getAndSet(0), tags)); + client.sendTelemetryMetric(metricsSentMetric, metricsSent.getAndSet(0)); + client.sendTelemetryMetric(eventsSentMetric, eventsSent.getAndSet(0)); + client.sendTelemetryMetric(serviceChecksSentMetric, serviceChecksSent.getAndSet(0)); + client.sendTelemetryMetric(bytesSentMetric, bytesSent.getAndSet(0)); + client.sendTelemetryMetric(bytesDroppedMetric, bytesDropped.getAndSet(0)); + client.sendTelemetryMetric(packetsSentMetric, packetsSent.getAndSet(0)); + client.sendTelemetryMetric(packetsDroppedMetric, packetsDropped.getAndSet(0)); + client.sendTelemetryMetric(packetsDroppedQueueMetric, packetsDroppedQueue.getAndSet(0)); + client.sendTelemetryMetric(aggregatedContextsMetric, aggregatedContexts.getAndSet(0)); // developer metrics - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.gaugeSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.GAUGE))); - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.countSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.COUNT))); - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.setSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.SET))); - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.histogramSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.HISTOGRAM))); - processor.send(new TelemetryMessage(this.metricsByTypeSentMetric, this.distributionSent.getAndSet(0), - getTelemetryTags(tags, Message.Type.DISTRIBUTION))); - - processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedGaugeContexts.getAndSet(0), - getTelemetryTags(tags, Message.Type.GAUGE))); - processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedCountContexts.getAndSet(0), - getTelemetryTags(tags, Message.Type.COUNT))); - processor.send(new TelemetryMessage(this.aggregatedContextsByTypeMetric, this.aggregatedSetContexts.getAndSet(0), - getTelemetryTags(tags, Message.Type.SET))); - } - - protected String getTelemetryTags(String tags, Message.Type type) { + client.sendTelemetryMetric(metricsByTypeSentMetric, gaugeSent.getAndSet(0), "metrics_type:gauge"); + client.sendTelemetryMetric(metricsByTypeSentMetric, countSent.getAndSet(0), "metrics_type:count"); + client.sendTelemetryMetric(metricsByTypeSentMetric, setSent.getAndSet(0), "metrics_type:set"); + client.sendTelemetryMetric(metricsByTypeSentMetric, histogramSent.getAndSet(0), "metrics_type:histogram"); + client.sendTelemetryMetric(metricsByTypeSentMetric, distributionSent.getAndSet(0), "metrics_type:distribution"); - tagBuilder.setLength(0); - tagBuilder.append(tags); - switch (type) { - case GAUGE: - tagBuilder.append(",metrics_type:gauge"); - break; - case COUNT: - tagBuilder.append(",metrics_type:count"); - break; - case SET: - tagBuilder.append(",metrics_type:set"); - break; - case HISTOGRAM: - tagBuilder.append(",metrics_type:histogram"); - break; - case DISTRIBUTION: - tagBuilder.append(",metrics_type:distribution"); - break; - default: - break; - } - - return tagBuilder.toString(); + client.sendTelemetryMetric(aggregatedContextsByTypeMetric, aggregatedGaugeContexts.getAndSet(0), "metrics_type:gauge"); + client.sendTelemetryMetric(aggregatedContextsByTypeMetric, aggregatedCountContexts.getAndSet(0), "metrics_type:count"); + client.sendTelemetryMetric(aggregatedContextsByTypeMetric, aggregatedSetContexts.getAndSet(0), "metrics_type:set"); } /** @@ -207,7 +118,7 @@ protected String getTelemetryTags(String tags, Message.Type type) { * Value to increase metric with */ public void incrMetricsSent(final int value) { - this.metricsSent.addAndGet(value); + metricsSent.addAndGet(value); } /** @@ -242,101 +153,91 @@ public void incrMetricsSent(final int value, Message.Type type) { } public void incrGaugeSent(final int value) { - this.gaugeSent.addAndGet(value); + gaugeSent.addAndGet(value); } public void incrCountSent(final int value) { - this.countSent.addAndGet(value); + countSent.addAndGet(value); } - public void incrHistogramSent(final int value) { - this.histogramSent.addAndGet(value); + histogramSent.addAndGet(value); } - public void incrDistributionSent(final int value) { - this.distributionSent.addAndGet(value); + distributionSent.addAndGet(value); } public void incrSetSent(final int value) { - this.setSent.addAndGet(value); + setSent.addAndGet(value); } public void incrEventsSent(final int value) { - this.eventsSent.addAndGet(value); + eventsSent.addAndGet(value); } public void incrServiceChecksSent(final int value) { - this.serviceChecksSent.addAndGet(value); + serviceChecksSent.addAndGet(value); } public void incrBytesSent(final int value) { - this.bytesSent.addAndGet(value); + bytesSent.addAndGet(value); } public void incrBytesDropped(final int value) { - this.bytesDropped.addAndGet(value); + bytesDropped.addAndGet(value); } public void incrPacketSent(final int value) { - this.packetsSent.addAndGet(value); + packetsSent.addAndGet(value); } public void incrPacketDropped(final int value) { - this.packetsDropped.addAndGet(value); + packetsDropped.addAndGet(value); } public void incrPacketDroppedQueue(final int value) { - this.packetsDroppedQueue.addAndGet(value); + packetsDroppedQueue.addAndGet(value); } public void incrAggregatedContexts(final int value) { - this.aggregatedContexts.addAndGet(value); + aggregatedContexts.addAndGet(value); } public void incrAggregatedGaugeContexts(final int value) { - this.aggregatedGaugeContexts.addAndGet(value); + aggregatedGaugeContexts.addAndGet(value); } public void incrAggregatedCountContexts(final int value) { - this.aggregatedCountContexts.addAndGet(value); + aggregatedCountContexts.addAndGet(value); } public void incrAggregatedSetContexts(final int value) { - this.aggregatedSetContexts.addAndGet(value); + aggregatedSetContexts.addAndGet(value); } /** * Resets all counter in the telemetry (this is useful for tests purposes). */ public void reset() { - this.metricsSent.set(0); - this.eventsSent.set(0); - this.serviceChecksSent.set(0); - this.bytesSent.set(0); - this.bytesDropped.set(0); - this.packetsSent.set(0); - this.packetsDropped.set(0); - this.packetsDroppedQueue.set(0); - this.aggregatedContexts.set(0); - - this.gaugeSent.set(0); - this.countSent.set(0); - this.histogramSent.set(0); - this.distributionSent.set(0); - this.setSent.set(0); - - this.aggregatedGaugeContexts.set(0); - this.aggregatedCountContexts.set(0); - this.aggregatedSetContexts.set(0); - } - - /** - * Gets the telemetry tags string. - * @return this Telemetry instance applied tags. - */ - public String getTags() { - return this.tags; + metricsSent.set(0); + eventsSent.set(0); + serviceChecksSent.set(0); + bytesSent.set(0); + bytesDropped.set(0); + packetsSent.set(0); + packetsDropped.set(0); + packetsDroppedQueue.set(0); + aggregatedContexts.set(0); + + gaugeSent.set(0); + countSent.set(0); + histogramSent.set(0); + distributionSent.set(0); + setSent.set(0); + + aggregatedGaugeContexts.set(0); + aggregatedCountContexts.set(0); + aggregatedSetContexts.set(0); } } diff --git a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java index d910d786..23e2b8ed 100644 --- a/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java +++ b/src/main/java/com/timgroup/statsd/UnixStreamClientChannel.java @@ -19,7 +19,6 @@ public class UnixStreamClientChannel implements ClientChannel { private final int connectionTimeout; private final int bufferSize; - private SocketChannel delegate; private final ByteBuffer delimiterBuffer = ByteBuffer.allocateDirect(Integer.SIZE / Byte.SIZE).order(ByteOrder.LITTLE_ENDIAN); @@ -157,10 +156,9 @@ private void connect() throws IOException { throw e; } - this.delegate = delegate; } - + @Override public void close() throws IOException { disconnect(); diff --git a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java index 7b484fc2..e37c427e 100644 --- a/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java +++ b/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java @@ -1560,7 +1560,7 @@ public TestAlphaNumericMessage(String aspect, Type type, String value, String[] } @Override - boolean writeTo(StringBuilder builder, int capacity, String containerID) { + boolean writeTo(StringBuilder builder, int capacity) { return false; } } diff --git a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java index 1cbf9432..1682cf0e 100644 --- a/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java +++ b/src/test/java/com/timgroup/statsd/StatsDAggregatorTest.java @@ -56,7 +56,7 @@ protected FakeMessage(String aspect, Message.Type type, T value) { } @Override - protected boolean writeTo(StringBuilder builder, int capacity, String containerID) { + protected boolean writeTo(StringBuilder builder, int capacity) { return false; } } @@ -67,7 +67,7 @@ protected FakeAlphaMessage(String aspect, Message.Type type, String value) { } @Override - protected boolean writeTo(StringBuilder builder, int capacity, String containerID) { + protected boolean writeTo(StringBuilder builder, int capacity) { return false; } } @@ -81,7 +81,7 @@ public static class FakeProcessor extends StatsDProcessor { private final AtomicInteger messageAggregated = new AtomicInteger(0); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory(), null); + super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory()); this.messages = new ConcurrentLinkedQueue<>(); } @@ -145,12 +145,6 @@ public void clear() { public static void start() throws Exception { fakeProcessor = new FakeProcessor(NO_OP_HANDLER); - // set telemetry - Telemetry telemetry = new Telemetry.Builder() - .processor(fakeProcessor) - .build(); - fakeProcessor.setTelemetry(telemetry); - // 15s flush period should be enough for all tests to be done - flushes will be manual StatsDAggregator aggregator = new StatsDAggregator(fakeProcessor, StatsDAggregator.DEFAULT_SHARDS, 3000L); fakeProcessor.aggregator = aggregator; @@ -272,7 +266,7 @@ public void test_aggregation_degradation_to_treenodes() { for (int i = 0; i < numMessages; i++) { fakeProcessor.send(new NumericMessage("some.counter", Message.Type.COUNT, 1, tags[i % numTags]) { @Override - boolean writeTo(StringBuilder builder, int capacity, String containerID) { + boolean writeTo(StringBuilder builder, int capacity) { return false; } diff --git a/src/test/java/com/timgroup/statsd/StatsDTestMessage.java b/src/test/java/com/timgroup/statsd/StatsDTestMessage.java index 61411ac1..e62cb722 100644 --- a/src/test/java/com/timgroup/statsd/StatsDTestMessage.java +++ b/src/test/java/com/timgroup/statsd/StatsDTestMessage.java @@ -9,7 +9,7 @@ protected StatsDTestMessage(String aspect, Message.Type type, T value, double sa } @Override - public final boolean writeTo(StringBuilder builder, int capacity, String containerID) { + public final boolean writeTo(StringBuilder builder, int capacity) { builder.append("test.").append(aspect).append(':'); writeValue(builder); builder.append('|').append(type); @@ -17,9 +17,6 @@ public final boolean writeTo(StringBuilder builder, int capacity, String contain builder.append('|').append('@').append(NonBlockingStatsDClient.format(NonBlockingStatsDClient.SAMPLE_RATE_FORMATTER, sampleRate)); } NonBlockingStatsDClient.tagString(this.tags, "", builder); - if (containerID != null && !containerID.isEmpty()) { - builder.append("|c:").append(containerID); - } builder.append('\n'); return false; diff --git a/src/test/java/com/timgroup/statsd/TelemetryTest.java b/src/test/java/com/timgroup/statsd/TelemetryTest.java index e18f6af4..fd496417 100644 --- a/src/test/java/com/timgroup/statsd/TelemetryTest.java +++ b/src/test/java/com/timgroup/statsd/TelemetryTest.java @@ -2,10 +2,12 @@ import java.util.logging.Logger; import org.junit.After; -import org.junit.AfterClass; +import org.junit.Before; import org.junit.Assume; -import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.io.IOException; import java.util.List; @@ -16,7 +18,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.equalTo; - +@RunWith(Parameterized.class) public class TelemetryTest { private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() { @Override public void handle(final Exception ex) { /* No-op */ } @@ -35,7 +37,7 @@ public static class FakeProcessor extends StatsDProcessor { public final List messages = new ArrayList<>(); FakeProcessor(final StatsDClientErrorHandler handler) throws Exception { - super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory(), null); + super(0, handler, 0, 1, 1, 0, 0, new StatsDThreadFactory()); } @@ -70,7 +72,7 @@ protected synchronized List getMessagesAsStrings() { ArrayList stringMessages = new ArrayList<>(messages.size()); for(Message m : messages) { sb.setLength(0); - m.writeTo(sb, Integer.MAX_VALUE, this.containerID); + m.writeTo(sb, Integer.MAX_VALUE); stringMessages.add(sb.toString()); } return stringMessages; @@ -83,28 +85,9 @@ public void clear() { } } - private static final int STATSD_SERVER_PORT = 17254; - private static final NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder() - .prefix("my.prefix") - .hostname("localhost") - .constantTags("test") - .port(STATSD_SERVER_PORT) - .originDetectionEnabled(false) - .enableTelemetry(false); // disable telemetry so we can control calls to "flush" - private static NonBlockingStatsDClient client = builder.build(); - - // telemetry client - private static final NonBlockingStatsDClientBuilder telemetryBuilder = new NonBlockingStatsDClientBuilder() - .prefix("my.prefix") - .hostname("localhost") - .constantTags("test") - .port(STATSD_SERVER_PORT) - .enableAggregation(false) - .originDetectionEnabled(false) - .enableTelemetry(false); // disable telemetry so we can control calls to "flush" - private static NonBlockingStatsDClient telemetryClient = telemetryBuilder.build(); + private static NonBlockingStatsDClient client; - private static DummyStatsDServer server; + private static UDPDummyStatsDServer server; private static FakeProcessor fakeProcessor; private static String computeTelemetryTags() throws IOException, Exception { @@ -115,19 +98,50 @@ private static String computeTelemetryTags() throws IOException, Exception { } private static String telemetryTags; + private final String containerID; + private final String tail; + + @Parameters + public static Object[][] parameters() { + return new Object[][]{ + { null }, + { "my-fake-container-id" }, + }; + } - @BeforeClass - public static void start() throws IOException, Exception { - server = new UDPDummyStatsDServer(STATSD_SERVER_PORT); + public TelemetryTest(String containerID) { + this.containerID = containerID; + + StringBuilder sb = new StringBuilder(); + if (containerID != null) { + sb.append("|c:").append(containerID); + } + sb.append("\n"); + tail = sb.toString(); + } + + @Before + public void start() throws IOException, Exception { + server = new UDPDummyStatsDServer(0); fakeProcessor = new FakeProcessor(LOGGING_HANDLER); - client.telemetry.processor = fakeProcessor; - telemetryClient.telemetry.processor = fakeProcessor; + NonBlockingStatsDClientBuilder builder = new NonBlockingStatsDClientBuilder() + .prefix("my.prefix") + .hostname("localhost") + .constantTags("test") + .port(server.getPort()) + .originDetectionEnabled(false) + .enableTelemetry(false); // disable telemetry so we can control calls to "flush" + if (containerID != null) { + builder.containerID(containerID); + } + client = builder.build(); + client.telemetryStatsDProcessor = fakeProcessor; telemetryTags = computeTelemetryTags(); } - @AfterClass - public static void stop() throws Exception { + @After + public void stop() throws Exception { try { client.stop(); server.close(); @@ -136,139 +150,122 @@ public static void stop() throws Exception { } } - @After - public void clear() { - server.clear(); - client.telemetry.reset(); - telemetryClient.telemetry.reset(); - fakeProcessor.clear(); - fakeProcessor.containerID = null; - } - @Test(timeout = 5000L) public void telemetry_incrManuallyIncrData() throws Exception { - telemetryClient.telemetry.incrMetricsSent(1); - telemetryClient.telemetry.incrGaugeSent(1); - telemetryClient.telemetry.incrCountSent(1); - telemetryClient.telemetry.incrSetSent(1); - telemetryClient.telemetry.incrHistogramSent(1); - telemetryClient.telemetry.incrDistributionSent(1); - telemetryClient.telemetry.incrMetricsSent(1, Message.Type.GAUGE); // adds to metricsSent - telemetryClient.telemetry.incrMetricsSent(1, Message.Type.COUNT); // adds to metricsSent - telemetryClient.telemetry.incrMetricsSent(1, Message.Type.SET); // adds to metricsSent - telemetryClient.telemetry.incrMetricsSent(1, Message.Type.HISTOGRAM); // adds to metricsSent - telemetryClient.telemetry.incrMetricsSent(1, Message.Type.DISTRIBUTION); // adds to metricsSent - telemetryClient.telemetry.incrEventsSent(2); - telemetryClient.telemetry.incrServiceChecksSent(3); - telemetryClient.telemetry.incrBytesSent(4); - telemetryClient.telemetry.incrBytesDropped(5); - telemetryClient.telemetry.incrPacketSent(6); - telemetryClient.telemetry.incrPacketDropped(7); - telemetryClient.telemetry.incrPacketDroppedQueue(8); - telemetryClient.telemetry.incrAggregatedContexts(9); - telemetryClient.telemetry.incrAggregatedGaugeContexts(10); - telemetryClient.telemetry.incrAggregatedCountContexts(11); - telemetryClient.telemetry.incrAggregatedSetContexts(12); - - assertThat(telemetryClient.telemetry.metricsSent.get(), equalTo(6)); - assertThat(telemetryClient.telemetry.gaugeSent.get(), equalTo(2)); - assertThat(telemetryClient.telemetry.countSent.get(), equalTo(2)); - assertThat(telemetryClient.telemetry.setSent.get(), equalTo(2)); - assertThat(telemetryClient.telemetry.histogramSent.get(), equalTo(2)); - assertThat(telemetryClient.telemetry.distributionSent.get(), equalTo(2)); - assertThat(telemetryClient.telemetry.eventsSent.get(), equalTo(2)); - assertThat(telemetryClient.telemetry.serviceChecksSent.get(), equalTo(3)); - assertThat(telemetryClient.telemetry.bytesSent.get(), equalTo(4)); - assertThat(telemetryClient.telemetry.bytesDropped.get(), equalTo(5)); - assertThat(telemetryClient.telemetry.packetsSent.get(), equalTo(6)); - assertThat(telemetryClient.telemetry.packetsDropped.get(), equalTo(7)); - assertThat(telemetryClient.telemetry.packetsDroppedQueue.get(), equalTo(8)); - assertThat(telemetryClient.telemetry.aggregatedContexts.get(), equalTo(9)); - assertThat(telemetryClient.telemetry.aggregatedGaugeContexts.get(), equalTo(10)); - assertThat(telemetryClient.telemetry.aggregatedCountContexts.get(), equalTo(11)); - assertThat(telemetryClient.telemetry.aggregatedSetContexts.get(), equalTo(12)); - - telemetryClient.telemetry.flush(); - - assertThat(telemetryClient.telemetry.metricsSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.gaugeSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.countSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.setSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.histogramSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.distributionSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.eventsSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.serviceChecksSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.bytesSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.bytesDropped.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.packetsSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.packetsDropped.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.packetsDroppedQueue.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.aggregatedContexts.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.aggregatedGaugeContexts.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.aggregatedCountContexts.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.aggregatedSetContexts.get(), equalTo(0)); + client.telemetry.incrMetricsSent(1); + client.telemetry.incrGaugeSent(1); + client.telemetry.incrCountSent(1); + client.telemetry.incrSetSent(1); + client.telemetry.incrHistogramSent(1); + client.telemetry.incrDistributionSent(1); + client.telemetry.incrMetricsSent(1, Message.Type.GAUGE); // adds to metricsSent + client.telemetry.incrMetricsSent(1, Message.Type.COUNT); // adds to metricsSent + client.telemetry.incrMetricsSent(1, Message.Type.SET); // adds to metricsSent + client.telemetry.incrMetricsSent(1, Message.Type.HISTOGRAM); // adds to metricsSent + client.telemetry.incrMetricsSent(1, Message.Type.DISTRIBUTION); // adds to metricsSent + client.telemetry.incrEventsSent(2); + client.telemetry.incrServiceChecksSent(3); + client.telemetry.incrBytesSent(4); + client.telemetry.incrBytesDropped(5); + client.telemetry.incrPacketSent(6); + client.telemetry.incrPacketDropped(7); + client.telemetry.incrPacketDroppedQueue(8); + client.telemetry.incrAggregatedContexts(9); + client.telemetry.incrAggregatedGaugeContexts(10); + client.telemetry.incrAggregatedCountContexts(11); + client.telemetry.incrAggregatedSetContexts(12); + + assertThat(client.telemetry.metricsSent.get(), equalTo(6)); + assertThat(client.telemetry.gaugeSent.get(), equalTo(2)); + assertThat(client.telemetry.countSent.get(), equalTo(2)); + assertThat(client.telemetry.setSent.get(), equalTo(2)); + assertThat(client.telemetry.histogramSent.get(), equalTo(2)); + assertThat(client.telemetry.distributionSent.get(), equalTo(2)); + assertThat(client.telemetry.eventsSent.get(), equalTo(2)); + assertThat(client.telemetry.serviceChecksSent.get(), equalTo(3)); + assertThat(client.telemetry.bytesSent.get(), equalTo(4)); + assertThat(client.telemetry.bytesDropped.get(), equalTo(5)); + assertThat(client.telemetry.packetsSent.get(), equalTo(6)); + assertThat(client.telemetry.packetsDropped.get(), equalTo(7)); + assertThat(client.telemetry.packetsDroppedQueue.get(), equalTo(8)); + assertThat(client.telemetry.aggregatedContexts.get(), equalTo(9)); + assertThat(client.telemetry.aggregatedGaugeContexts.get(), equalTo(10)); + assertThat(client.telemetry.aggregatedCountContexts.get(), equalTo(11)); + assertThat(client.telemetry.aggregatedSetContexts.get(), equalTo(12)); + + client.telemetry.flush(); + + assertThat(client.telemetry.metricsSent.get(), equalTo(0)); + assertThat(client.telemetry.gaugeSent.get(), equalTo(0)); + assertThat(client.telemetry.countSent.get(), equalTo(0)); + assertThat(client.telemetry.setSent.get(), equalTo(0)); + assertThat(client.telemetry.histogramSent.get(), equalTo(0)); + assertThat(client.telemetry.distributionSent.get(), equalTo(0)); + assertThat(client.telemetry.eventsSent.get(), equalTo(0)); + assertThat(client.telemetry.serviceChecksSent.get(), equalTo(0)); + assertThat(client.telemetry.bytesSent.get(), equalTo(0)); + assertThat(client.telemetry.bytesDropped.get(), equalTo(0)); + assertThat(client.telemetry.packetsSent.get(), equalTo(0)); + assertThat(client.telemetry.packetsDropped.get(), equalTo(0)); + assertThat(client.telemetry.packetsDroppedQueue.get(), equalTo(0)); + assertThat(client.telemetry.aggregatedContexts.get(), equalTo(0)); + assertThat(client.telemetry.aggregatedGaugeContexts.get(), equalTo(0)); + assertThat(client.telemetry.aggregatedCountContexts.get(), equalTo(0)); + assertThat(client.telemetry.aggregatedSetContexts.get(), equalTo(0)); List statsdMessages = fakeProcessor.getMessagesAsStrings() ; assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.metrics:6|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.metrics:6|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); + hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + telemetryTags + ",metrics_type:gauge" + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); + hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + telemetryTags + ",metrics_type:count" + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); + hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + telemetryTags + ",metrics_type:set" + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.HISTOGRAM) + "\n")); + hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + telemetryTags + ",metrics_type:histogram" + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.DISTRIBUTION) + "\n")); + hasItem("datadog.dogstatsd.client.metrics_by_type:2|c|#test," + telemetryTags + ",metrics_type:distribution" + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.events:2|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.events:2|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.service_checks:3|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.service_checks:3|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.bytes_sent:4|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.bytes_sent:4|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.bytes_dropped:5|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.bytes_dropped:5|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_sent:6|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.packets_sent:6|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_dropped:7|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.packets_dropped:7|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_dropped_queue:8|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.packets_dropped_queue:8|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.aggregated_context:9|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.aggregated_context:9|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.aggregated_context_by_type:10|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); + hasItem("datadog.dogstatsd.client.aggregated_context_by_type:10|c|#test," + telemetryTags + ",metrics_type:gauge" + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.aggregated_context_by_type:11|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); + hasItem("datadog.dogstatsd.client.aggregated_context_by_type:11|c|#test," + telemetryTags + ",metrics_type:count" + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.aggregated_context_by_type:12|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); + hasItem("datadog.dogstatsd.client.aggregated_context_by_type:12|c|#test," + telemetryTags + ",metrics_type:set" + tail)); } @Test(timeout = 5000L) @@ -287,31 +284,31 @@ public void telemetry_incrMetricsSent() throws Exception { List statsdMessages = fakeProcessor.getMessagesAsStrings() ; assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.bytes_sent:29|c|#test," + telemetryTags + "\n")); + hasItem(String.format("datadog.dogstatsd.client.bytes_sent:%d|c|#test,%s%s", 28 + tail.length(), telemetryTags, tail))); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_sent:1|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.packets_sent:1|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags + tail)); assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.aggregated_context:0|c|#test," + telemetryTags + "\n")); + hasItem("datadog.dogstatsd.client.aggregated_context:0|c|#test," + telemetryTags + tail)); } @Test(timeout = 5000L) @@ -373,14 +370,14 @@ public void telemetry_flushInterval() throws Exception { List statsdMessages = fakeProcessor.getMessagesAsStrings() ; - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_sent:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_sent:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags + "\n")); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags + tail)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags + tail)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags + tail)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_sent:0|c|#test," + telemetryTags + tail)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags + tail)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_sent:0|c|#test," + telemetryTags + tail)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags + tail)); + assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags + tail)); } @Test(timeout = 5000L) @@ -434,120 +431,6 @@ public void telemetry_SentData() throws Exception { assertThat(client.telemetry.metricsSent.get(), equalTo(1)); assertThat(client.telemetry.packetsSent.get(), equalTo(1)); - assertThat(client.telemetry.bytesSent.get(), equalTo(27)); - } - - @Test(timeout = 5000L) - public void telemetry_DevModeData() throws Exception { - - - telemetryClient.gauge("gauge", 24); - telemetryClient.count("count", 1); - telemetryClient.histogram("histo", 1); - telemetryClient.distribution("distro", 1); - - // leaving time to the server to flush metrics (equivalent to waitForMessage) - while (telemetryClient.telemetry.metricsSent.get() == 0 - || telemetryClient.telemetry.packetsSent.get() == 0 - || telemetryClient.telemetry.bytesSent.get() == 0) { - try { - Thread.sleep(50L); - } catch (InterruptedException e) {} - } - - assertThat(telemetryClient.telemetry.metricsSent.get(), equalTo(4)); - assertThat(telemetryClient.telemetry.gaugeSent.get(), equalTo(1)); - assertThat(telemetryClient.telemetry.countSent.get(), equalTo(1)); - assertThat(telemetryClient.telemetry.setSent.get(), equalTo(0)); - assertThat(telemetryClient.telemetry.histogramSent.get(), equalTo(1)); - assertThat(telemetryClient.telemetry.distributionSent.get(), equalTo(1)); - assertThat(telemetryClient.telemetry.packetsSent.get(), equalTo(1)); - assertThat(telemetryClient.telemetry.bytesSent.get(), equalTo(106)); - - // Start flush timer with a 50ms interval - telemetryClient.telemetry.start(50L); - - // Wait for the flush to happen - while (telemetryClient.telemetry.metricsSent.get() != 0) { - try { - Thread.sleep(30L); - } catch (InterruptedException e) {} - } - telemetryClient.telemetry.stop(); - - assertThat(telemetryClient.telemetry.metricsSent.get(), equalTo(0)); - List statsdMessages = fakeProcessor.getMessagesAsStrings(); - - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics:4|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:1|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:1|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:0|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:1|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.HISTOGRAM) + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.metrics_by_type:1|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.DISTRIBUTION) + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_sent:106|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_sent:1|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags + "\n")); - // aggregation is disabled - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context:0|c|#test," + telemetryTags + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:0|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.GAUGE) + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:0|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.COUNT) + "\n")); - assertThat(statsdMessages, hasItem("datadog.dogstatsd.client.aggregated_context_by_type:0|c|#test," + - telemetryClient.telemetry.getTelemetryTags(telemetryTags, Message.Type.SET) + "\n")); - } - - @Test(timeout = 5000L) - public void telemetry_containerID() throws Exception { - final String fakeContainerID = "fake-container-id"; - client.count("mycount", 24); - fakeProcessor.containerID = fakeContainerID; - - // wait for the "mycount" to be sent - server.waitForMessage("mycount"); - server.clear(); - fakeProcessor.clear(); - - assertThat(client.telemetry.metricsSent.get(), equalTo(1)); - client.telemetry.flush(); - assertThat(client.telemetry.metricsSent.get(), equalTo(0)); - - List statsdMessages = fakeProcessor.getMessagesAsStrings() ; - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.metrics:1|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.events:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.service_checks:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.bytes_sent:29|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.bytes_dropped:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_sent:1|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_dropped:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.packets_dropped_queue:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); - - assertThat(statsdMessages, - hasItem("datadog.dogstatsd.client.aggregated_context:0|c|#test," + telemetryTags + "|c:" + fakeContainerID + "\n")); + assertThat(client.telemetry.bytesSent.get(), equalTo(26 + tail.length())); } } diff --git a/src/test/java/com/timgroup/statsd/UDPDummyStatsDServer.java b/src/test/java/com/timgroup/statsd/UDPDummyStatsDServer.java index 181e6481..1bea7162 100644 --- a/src/test/java/com/timgroup/statsd/UDPDummyStatsDServer.java +++ b/src/test/java/com/timgroup/statsd/UDPDummyStatsDServer.java @@ -31,4 +31,8 @@ public void close() throws IOException { //ignore } } + + int getPort() throws IOException { + return ((InetSocketAddress)server.getLocalAddress()).getPort(); + } } diff --git a/style.xml b/style.xml index e951e185..c407406c 100644 --- a/style.xml +++ b/style.xml @@ -85,6 +85,8 @@ + +