Skip to content

AGTMETRICS-212 Cleanups and refactorings #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/java/com/timgroup/statsd/BufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ public class BufferPool {
private final int bufferSize;
private final boolean direct;


BufferPool(final int poolSize, int bufferSize, final boolean direct) throws InterruptedException {

size = poolSize;
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/com/timgroup/statsd/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ protected Message(String aspect, Message.Type type, String[] tags) {
*
* @param builder StringBuilder the text representation will be written to.
* @param capacity The capacity of the send buffer.
* @param containerID The container ID to be appended to the message.
* @return boolean indicating whether the message was partially written to the builder.
* If true, the method will be called again with the same arguments to continue writing.
*/
abstract boolean writeTo(StringBuilder builder, int capacity, String containerID);
abstract boolean writeTo(StringBuilder builder, int capacity);

/**
* Aggregate message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ public final void aggregate(Message message) {
}

@Override
public final boolean writeTo(StringBuilder builder, int capacity, String containerID) {
int metadataSize = metadataSize(builder, containerID);
public final boolean writeTo(StringBuilder builder, int capacity) {
int metadataSize = metadataSize(builder);
writeHeadMetadata(builder);
boolean partialWrite = writeValuesTo(builder, capacity - metadataSize);
writeTailMetadata(builder, containerID);
writeTailMetadata(builder);
return partialWrite;

}

private int metadataSize(StringBuilder builder, String containerID) {
private int metadataSize(StringBuilder builder) {
if (metadataSize == -1) {
final int previousLength = builder.length();
final int previousEncodedLength = Utf8.encodedLength(builder);
writeHeadMetadata(builder);
writeTailMetadata(builder, containerID);
writeTailMetadata(builder);
metadataSize = Utf8.encodedLength(builder) - previousEncodedLength;
builder.setLength(previousLength);
}
Expand All @@ -67,7 +67,7 @@ private void writeHeadMetadata(StringBuilder builder) {
builder.append(prefix).append(aspect);
}

private void writeTailMetadata(StringBuilder builder, String containerID) {
private void writeTailMetadata(StringBuilder builder) {
builder.append('|').append(type);
if (!Double.isNaN(sampleRate)) {
builder.append('|').append('@').append(format(SAMPLE_RATE_FORMATTER, sampleRate));
Expand All @@ -76,11 +76,7 @@ private void writeTailMetadata(StringBuilder builder, String containerID) {
builder.append("|T").append(timestamp);
}
tagString(tags, builder);
if (containerID != null && !containerID.isEmpty()) {
builder.append("|c:").append(containerID);
}

builder.append('\n');
writeMessageTail(builder);
}

private boolean writeValuesTo(StringBuilder builder, int remainingCapacity) {
Expand Down
252 changes: 95 additions & 157 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClient.java

Large diffs are not rendered by default.

111 changes: 101 additions & 10 deletions src/main/java/com/timgroup/statsd/NonBlockingStatsDClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,93 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadFactory;

/**
* Create a new StatsD client communicating with a StatsD instance on the
* specified host and port. All messages send via this client will have
* their keys prefixed with the specified string. The new client will
* attempt to open a connection to the StatsD server immediately upon
* instantiation, and may throw an exception if that a connection cannot
* be established. Once a client has been instantiated in this way, all
* exceptions thrown during subsequent usage are passed to the specified
* handler and then consumed, guaranteeing that failures in metrics will
* not affect normal code execution.
*/
public class NonBlockingStatsDClientBuilder implements Cloneable {

/**
* 1400 chosen as default here so that the number of bytes in a message plus the number of bytes required
* for additional udp headers should be under the 1500 Maximum Transmission Unit for ethernet.
* See https://github.com/DataDog/java-dogstatsd-client/pull/17 for discussion.
*/

/** The maximum number of bytes for a message that can be sent. */
public int maxPacketSizeBytes = 0;
public int port = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT;
public int telemetryPort = NonBlockingStatsDClient.DEFAULT_DOGSTATSD_PORT;
/** The maximum amount of unprocessed messages in the queue. */
public int queueSize = NonBlockingStatsDClient.DEFAULT_QUEUE_SIZE;
/** The timeout in milliseconds for blocking operations. Applies to unix sockets only. */
public int timeout = NonBlockingStatsDClient.SOCKET_TIMEOUT_MS;
/** The size for the network buffer pool. */
public int bufferPoolSize = NonBlockingStatsDClient.DEFAULT_POOL_SIZE;
/** The socket buffer size in bytes. Applies to unix sockets only. */
public int socketBufferSize = NonBlockingStatsDClient.SOCKET_BUFFER_BYTES;
/** The number of processor worker threads assembling buffers for submission. */
public int processorWorkers = NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS;
/** The number of sender worker threads submitting buffers to the socket. */
public int senderWorkers = NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS;
/** Blocking or non-blocking implementation for statsd message queue. */
public boolean blocking = NonBlockingStatsDClient.DEFAULT_BLOCKING;
/** Enable sending client telemetry. */
public boolean enableTelemetry = NonBlockingStatsDClient.DEFAULT_ENABLE_TELEMETRY;
public boolean enableAggregation = NonBlockingStatsDClient.DEFAULT_ENABLE_AGGREGATION;
/** Telemetry flush interval, in milliseconds. */
public int telemetryFlushInterval = Telemetry.DEFAULT_FLUSH_INTERVAL;
/** Aggregation flush interval, in milliseconds. 0 disables aggregation. */
public int aggregationFlushInterval = StatsDAggregator.DEFAULT_FLUSH_INTERVAL;
public int aggregationShards = StatsDAggregator.DEFAULT_SHARDS;
/**
* Enable/disable the client origin detection.
*
* <p>This feature requires Datadog Agent version &gt;=6.35.0 &amp;&amp; &lt;7.0.0 or Agent versions &gt;=7.35.0.
* When enabled, the client tries to discover its container ID and sends it to the Agent
* to enrich the metrics with container tags.
* Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false
* The client tries to read the container ID by parsing the file /proc/self/cgroup.
* This is not supported on Windows.
*/
public boolean originDetectionEnabled = NonBlockingStatsDClient.DEFAULT_ENABLE_ORIGIN_DETECTION;
/**
* The timeout in milliseconds for connecting to the StatsD server. Applies to unix sockets only.
*
* <p>It is also used to detect if a connection is still alive and re-establish a new one if needed.
*/
public int connectionTimeout = NonBlockingStatsDClient.SOCKET_CONNECT_TIMEOUT_MS;

/** Yields the IP address and socket of the StatsD server. */
public Callable<SocketAddress> addressLookup;
/** Yields the IP address and socket of the StatsD telemetry server destination. */
public Callable<SocketAddress> telemetryAddressLookup;

public String hostname;
public String telemetryHostname;
public String namedPipe;

/** The prefix to apply to keys sent via this client. */
public String prefix;

/** The entity id value used with an internal tag for tracking client entity.
*
* <p>If null the client default the value with the environment variable "DD_ENTITY_ID".
* If the environment variable is not defined, the internal tag is not added.
*/
public String entityID;
/** Tags to be added to all content sent. */
public String[] constantTags;
/**
* Allows passing the container ID, this will be used by the Agent to enrich
* metrics with container tags.
*
* <p>This feature requires Datadog Agent version &gt;=6.35.0 &amp;&amp; &lt;7.0.0 or Agent versions &gt;=7.35.0.
* When configured, the provided container ID is prioritized over the container ID discovered
* via Origin Detection. When entityID or DD_ENTITY_ID are set, this value is ignored.
*/
public String containerID;

/** Handler to use when an exception occurs during usage, may be null to indicate noop. */
public StatsDClientErrorHandler errorHandler;
public ThreadFactory threadFactory;

Expand All @@ -63,26 +113,35 @@ public NonBlockingStatsDClientBuilder telemetryPort(int val) {
return this;
}

/** The maximum amount of unprocessed messages in the queue. */
public NonBlockingStatsDClientBuilder queueSize(int val) {
queueSize = val;
return this;
}

/** The timeout in milliseconds for blocking operations. Applies to unix sockets only. */
public NonBlockingStatsDClientBuilder timeout(int val) {
timeout = val;
return this;
}

/**
* The timeout in milliseconds for connecting to the StatsD server. Applies to unix sockets only.
*
* <p>It is also used to detect if a connection is still alive and re-establish a new one if needed.
*/
public NonBlockingStatsDClientBuilder connectionTimeout(int val) {
connectionTimeout = val;
return this;
}

/** The size for the network buffer pool. */
public NonBlockingStatsDClientBuilder bufferPoolSize(int val) {
bufferPoolSize = val;
return this;
}

/** The socket buffer size in bytes. Applies to unix sockets only. */
public NonBlockingStatsDClientBuilder socketBufferSize(int val) {
socketBufferSize = val;
return this;
Expand All @@ -93,26 +152,31 @@ public NonBlockingStatsDClientBuilder maxPacketSizeBytes(int val) {
return this;
}

/** The number of processor worker threads assembling buffers for submission. */
public NonBlockingStatsDClientBuilder processorWorkers(int val) {
processorWorkers = val;
return this;
}

/** The number of sender worker threads submitting buffers to the socket. */
public NonBlockingStatsDClientBuilder senderWorkers(int val) {
senderWorkers = val;
return this;
}

/** Blocking or non-blocking implementation for statsd message queue. */
public NonBlockingStatsDClientBuilder blocking(boolean val) {
blocking = val;
return this;
}

/** Yields the IP address and socket of the StatsD server. */
public NonBlockingStatsDClientBuilder addressLookup(Callable<SocketAddress> val) {
addressLookup = val;
return this;
}

/** Yields the IP address and socket of the StatsD telemetry server destination. */
public NonBlockingStatsDClientBuilder telemetryAddressLookup(Callable<SocketAddress> val) {
telemetryAddressLookup = val;
return this;
Expand Down Expand Up @@ -143,26 +207,35 @@ public NonBlockingStatsDClientBuilder telemetryAddress(String address) {
return this;
}

/** The prefix to apply to keys sent via this client. */
public NonBlockingStatsDClientBuilder prefix(String val) {
prefix = val;
return this;
}

/** The entity id value used with an internal tag for tracking client entity.
*
* <p>If null the client default the value with the environment variable "DD_ENTITY_ID".
* If the environment variable is not defined, the internal tag is not added.
*/
public NonBlockingStatsDClientBuilder entityID(String val) {
entityID = val;
return this;
}

/** Tags to be added to all content sent. */
public NonBlockingStatsDClientBuilder constantTags(String... val) {
constantTags = val;
return this;
}

/** Handler to use when an exception occurs during usage, may be null to indicate noop. */
public NonBlockingStatsDClientBuilder errorHandler(StatsDClientErrorHandler val) {
errorHandler = val;
return this;
}

/** Enable sending client telemetry. */
public NonBlockingStatsDClientBuilder enableTelemetry(boolean val) {
enableTelemetry = val;
return this;
Expand All @@ -173,11 +246,13 @@ public NonBlockingStatsDClientBuilder enableAggregation(boolean val) {
return this;
}

/** Telemetry flush interval, in milliseconds. */
public NonBlockingStatsDClientBuilder telemetryFlushInterval(int val) {
telemetryFlushInterval = val;
return this;
}

/** Aggregation flush interval, in milliseconds. 0 disables aggregation. */
public NonBlockingStatsDClientBuilder aggregationFlushInterval(int val) {
aggregationFlushInterval = val;
return this;
Expand All @@ -193,11 +268,30 @@ public NonBlockingStatsDClientBuilder threadFactory(ThreadFactory val) {
return this;
}

/**
* Allows passing the container ID, this will be used by the Agent to enrich
* metrics with container tags.
*
* <p>This feature requires Datadog Agent version &gt;=6.35.0 &amp;&amp; &lt;7.0.0 or Agent versions &gt;=7.35.0.
* When configured, the provided container ID is prioritized over the container ID discovered
* via Origin Detection. When entityID or DD_ENTITY_ID are set, this value is ignored.
*/
public NonBlockingStatsDClientBuilder containerID(String val) {
containerID = val;
return this;
}

/**
* Enable/disable the client origin detection.
*
* <p>This feature requires Datadog Agent version &gt;=6.35.0 &amp;&amp; &lt;7.0.0 or Agent versions &gt;7.35.0.
* When enabled, the client tries to discover its container ID and sends it to the Agent
* to enrich the metrics with container tags.
* Origin detection can be disabled by configuring the environment variabe DD_ORIGIN_DETECTION_ENABLED=false
* The client tries to read the container ID by parsing the file /proc/self/cgroup.
* This is not supported on Windows.
* The client prioritizes the value passed via or entityID or DD_ENTITY_ID (if set) over the container ID.
*/
public NonBlockingStatsDClientBuilder originDetectionEnabled(boolean val) {
originDetectionEnabled = val;
return this;
Expand Down Expand Up @@ -426,7 +520,4 @@ private static int getPortFromEnvVar(final int defaultPort) {
}
}
}


}

5 changes: 0 additions & 5 deletions src/main/java/com/timgroup/statsd/NumericMessage.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.timgroup.statsd;


public abstract class NumericMessage<T extends Number> extends Message {

protected Number value;

protected NumericMessage(Message.Type type) {
Expand All @@ -14,7 +12,6 @@ protected NumericMessage(String aspect, Message.Type type, T value, String[] tag
this.value = value;
}


/**
* Aggregate message.
*
Expand Down Expand Up @@ -59,6 +56,4 @@ public Number getValue() {
public void setValue(Number value) {
this.value = value;
}

}

2 changes: 0 additions & 2 deletions src/main/java/com/timgroup/statsd/StatsDAggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Timer;
import java.util.TimerTask;


public class StatsDAggregator {
public static int DEFAULT_FLUSH_INTERVAL = 2000; // 2s
public static int DEFAULT_SHARDS = 4; // 4 partitions to reduce contention.
Expand Down Expand Up @@ -89,7 +88,6 @@ public boolean aggregateMessage(Message message) {
return false;
}


int hash = message.hashCode();
int bucket = Math.abs(hash % this.shardGranularity);
Map<Message, Message> map = aggregateMetrics.get(bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;


public class StatsDBlockingProcessor extends StatsDProcessor {

private final BlockingQueue<Message> messages;
Expand All @@ -30,10 +29,10 @@ protected boolean haveMessages() {
StatsDBlockingProcessor(final int queueSize, final StatsDClientErrorHandler handler,
final int maxPacketSizeBytes, final int poolSize, final int workers,
final int aggregatorFlushInterval, final int aggregatorShards,
final ThreadFactory threadFactory, final String containerID) throws Exception {
final ThreadFactory threadFactory) throws Exception {

super(queueSize, handler, maxPacketSizeBytes, poolSize, workers,
aggregatorFlushInterval, aggregatorShards, threadFactory, containerID);
aggregatorFlushInterval, aggregatorShards, threadFactory);
this.messages = new ArrayBlockingQueue<>(queueSize);
}

Expand Down
Loading