diff --git a/ARQUITECTURE.md b/ARQUITECTURE.md new file mode 100644 index 00000000..e06d772f --- /dev/null +++ b/ARQUITECTURE.md @@ -0,0 +1,60 @@ +```mermaid + +sequenceDiagram + box Data Consolidation + participant ConsolidationService + end + + participant SegmentClient + box HTTP + participant QueueHttp + participant LooperHttp + participant SegmentAPI + end + box File + participant QueueFile + participant WriteFile + participant File + participant WatchFile + end + + activate ConsolidationService + ConsolidationService->>+SegmentClient: enqueue + SegmentClient<<->>QueueHttp: offer + alt QueueHttp overflow + SegmentClient<<->>QueueFile: put + end + SegmentClient->>-ConsolidationService: + deactivate ConsolidationService + + loop consume QueueHttp + LooperHttp->>QueueHttp:take + activate LooperHttp + end + LooperHttp->>SegmentAPI: batchUpload + note over LooperHttp,SegmentAPI: Batch + note over LooperHttp,SegmentAPI: CircuitBreaker and Retry + note over LooperHttp,SegmentAPI: HTTP requests submited to a pool + deactivate LooperHttp + + alt retry exhausted or circuit open + note over LooperHttp: pool threads + LooperHttp->>QueueFile: put + end + + loop consume QueueFile + WriteFile->>QueueFile:take + activate WriteFile + end + WriteFile->>File: write + note over WriteFile: Batch and save file + deactivate WriteFile + + note over WatchFile: check last written + activate WatchFile + loop watch QueueFile + WatchFile->>File: read and remove + WatchFile->>QueueHttp: offer + end + deactivate WatchFile +``` diff --git a/analytics/pom.xml b/analytics/pom.xml index 8c80856c..8e61a10a 100644 --- a/analytics/pom.xml +++ b/analytics/pom.xml @@ -39,9 +39,21 @@ findbugs provided + - com.segment.backo - backo + dev.failsafe + failsafe + 3.3.2 + + + dev.failsafe + failsafe-retrofit + 3.3.2 + + + commons-io + commons-io + 2.18.0 junit @@ -67,7 +79,19 @@ org.mockito mockito-core test - + + + org.wiremock + wiremock-standalone + 3.2.0 + test + + + org.awaitility + awaitility + 4.2.2 + test + diff --git a/analytics/src/main/java/com/segment/analytics/Analytics.java b/analytics/src/main/java/com/segment/analytics/Analytics.java index 81af36c7..bccce351 100644 --- a/analytics/src/main/java/com/segment/analytics/Analytics.java +++ b/analytics/src/main/java/com/segment/analytics/Analytics.java @@ -90,11 +90,6 @@ public boolean offer(MessageBuilder builder) { return client.offer(message); } - /** Flush events in the message queue. */ - public void flush() { - client.flush(); - } - /** Stops this instance from processing further requests. */ public void shutdown() { client.shutdown(); @@ -144,9 +139,7 @@ public static class Builder { private ThreadFactory threadFactory; private int flushQueueSize; private int maximumFlushAttempts; - private int maximumQueueSizeInBytes; private long flushIntervalInMillis; - private List callbacks; private int queueCapacity; private boolean forceTlsV1 = false; private GsonBuilder gsonBuilder; @@ -273,17 +266,6 @@ public Builder flushQueueSize(int flushQueueSize) { return this; } - /** Set the queueSize at which flushes should be triggered. */ - @Beta - public Builder maximumQueueSizeInBytes(int bytes) { - if (bytes < 1) { - throw new IllegalArgumentException("maximumQueueSizeInBytes must not be less than 1."); - } - - this.maximumQueueSizeInBytes = bytes; - return this; - } - /** Set the interval at which the queue should be flushed. */ @Beta public Builder flushInterval(long flushInterval, TimeUnit unit) { @@ -323,21 +305,6 @@ public Builder threadFactory(ThreadFactory threadFactory) { return this; } - /** Add a {@link Callback} to be notified when an event is processed. */ - public Builder callback(Callback callback) { - if (callback == null) { - throw new NullPointerException("Null callback"); - } - if (callbacks == null) { - callbacks = new ArrayList<>(); - } - if (callbacks.contains(callback)) { - throw new IllegalStateException("Callback is already registered."); - } - callbacks.add(callback); - return this; - } - /** Use a {@link Plugin} to configure the builder. */ @Beta public Builder plugin(Plugin plugin) { @@ -390,9 +357,6 @@ public Analytics build() { if (flushQueueSize == 0) { flushQueueSize = Platform.get().defaultFlushQueueSize(); } - if (maximumQueueSizeInBytes == 0) { - maximumQueueSizeInBytes = MESSAGE_QUEUE_MAX_BYTE_SIZE; - } if (maximumFlushAttempts == 0) { maximumFlushAttempts = 3; } @@ -412,11 +376,6 @@ public Analytics build() { if (threadFactory == null) { threadFactory = Platform.get().defaultThreadFactory(); } - if (callbacks == null) { - callbacks = Collections.emptyList(); - } else { - callbacks = Collections.unmodifiableList(callbacks); - } HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor( @@ -456,21 +415,17 @@ public void log(String message) { SegmentService segmentService = restAdapter.create(SegmentService.class); - AnalyticsClient analyticsClient = - AnalyticsClient.create( - endpoint, - segmentService, - queueCapacity, - flushQueueSize, - flushIntervalInMillis, - maximumFlushAttempts, - maximumQueueSizeInBytes, - log, - threadFactory, - networkExecutor, - callbacks, - writeKey, - gson); + AnalyticsClient analyticsClient = AnalyticsClient.create( + endpoint, + segmentService, + queueCapacity, + flushQueueSize, + flushIntervalInMillis, + log, + threadFactory, + networkExecutor, + writeKey, + gson); return new Analytics(analyticsClient, messageTransformers, messageInterceptors, log); } diff --git a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java index f7560004..981a948f 100644 --- a/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java +++ b/analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java @@ -5,32 +5,35 @@ import static com.segment.analytics.Log.Level.VERBOSE; import com.google.gson.Gson; -import com.segment.analytics.Callback; import com.segment.analytics.Log; import com.segment.analytics.http.SegmentService; import com.segment.analytics.http.UploadResponse; import com.segment.analytics.messages.Batch; import com.segment.analytics.messages.Message; -import com.segment.backo.Backo; +import dev.failsafe.CircuitBreaker; +import dev.failsafe.CircuitBreakerOpenException; +import dev.failsafe.Failsafe; +import dev.failsafe.FailsafeExecutor; +import dev.failsafe.RetryPolicy; +import dev.failsafe.retrofit.FailsafeCall; import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import okhttp3.HttpUrl; import retrofit2.Call; import retrofit2.Response; @@ -38,10 +41,9 @@ public class AnalyticsClient { private static final Map CONTEXT; private static final int BATCH_MAX_SIZE = 1024 * 500; - private static final int MSG_MAX_SIZE = 1024 * 32; private static final Charset ENCODING = StandardCharsets.UTF_8; private Gson gsonInstance; - private static final String instanceId = UUID.randomUUID().toString(); + private static final String instanceId = UUID.randomUUID().toString(); // TODO configurable ? static { Map library = new LinkedHashMap<>(); @@ -56,17 +58,15 @@ public class AnalyticsClient { private final BlockingQueue messageQueue; private final HttpUrl uploadUrl; private final SegmentService service; - private final int size; - private final int maximumRetries; - private final int maximumQueueByteSize; - private int currentQueueSizeInBytes; + private final int flushQueueSize; + private final long flushIntervalInMillis; private final Log log; - private final List callbacks; private final ExecutorService networkExecutor; - private final ExecutorService looperExecutor; - private final ScheduledExecutorService flushScheduler; + private final Thread looperThread; private final AtomicBoolean isShutDown; private final String writeKey; + private final FailsafeExecutor> failsafe; + private final FallbackAppender fallback; public static AnalyticsClient create( HttpUrl uploadUrl, @@ -74,12 +74,9 @@ public static AnalyticsClient create( int queueCapacity, int flushQueueSize, long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, Log log, ThreadFactory threadFactory, ExecutorService networkExecutor, - List callbacks, String writeKey, Gson gsonInstance) { return new AnalyticsClient( @@ -88,12 +85,9 @@ public static AnalyticsClient create( segmentService, flushQueueSize, flushIntervalInMillis, - maximumRetries, - maximumQueueSizeInBytes, log, threadFactory, networkExecutor, - callbacks, new AtomicBoolean(false), writeKey, gsonInstance); @@ -103,46 +97,55 @@ public AnalyticsClient( BlockingQueue messageQueue, HttpUrl uploadUrl, SegmentService service, - int maxQueueSize, + int flushQueueSize, long flushIntervalInMillis, - int maximumRetries, - int maximumQueueSizeInBytes, Log log, ThreadFactory threadFactory, ExecutorService networkExecutor, - List callbacks, AtomicBoolean isShutDown, String writeKey, Gson gsonInstance) { this.messageQueue = messageQueue; this.uploadUrl = uploadUrl; this.service = service; - this.size = maxQueueSize; - this.maximumRetries = maximumRetries; - this.maximumQueueByteSize = maximumQueueSizeInBytes; + this.flushQueueSize = flushQueueSize; + this.flushIntervalInMillis = flushIntervalInMillis; this.log = log; - this.callbacks = callbacks; - this.looperExecutor = Executors.newSingleThreadExecutor(threadFactory); + this.looperThread = threadFactory.newThread(new Looper()); this.networkExecutor = networkExecutor; this.isShutDown = isShutDown; this.writeKey = writeKey; this.gsonInstance = gsonInstance; - - this.currentQueueSizeInBytes = 0; - - if (!isShutDown.get()) looperExecutor.submit(new Looper()); - - flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); - flushScheduler.scheduleAtFixedRate( - new Runnable() { - @Override - public void run() { - flush(); - } - }, - flushIntervalInMillis, - flushIntervalInMillis, - TimeUnit.MILLISECONDS); + looperThread.start(); + + CircuitBreaker> breaker = CircuitBreaker.>builder() + // 10 failure in 2 minute open the circuit + .withFailureThreshold(10, Duration.ofMinutes(2)) + // once open wait 30 seconds to be half-open + .withDelay(Duration.ofSeconds(30)) + // after 1 success the circuit is closed + .withSuccessThreshold(1) + // 5xx or rate limit is an error + .handleResultIf(response -> is5xx(response.code()) || response.code() == 429) + .onOpen(el -> System.err.println("***\nOPEN\n***")) + .onHalfOpen(el -> System.err.println("***\nHALF OPEN\n***")) + .onClose(el -> System.err.println("***\nCLOSED\n***")) + .build(); + + RetryPolicy> retry = RetryPolicy.>builder() + .withMaxAttempts(5) + .withBackoff(1, 300, ChronoUnit.SECONDS) + .withJitter(.2) + // retry on IOException + .handle(IOException.class) + // retry on 5xx + .handleResultIf(response -> is5xx(response.code())) + // stop retry on rate limit + .abortIf(response -> response.code() == 429) + .build(); + + this.failsafe = Failsafe.with(retry, breaker).with(networkExecutor); + this.fallback = new FallbackAppender(this); } public int messageSizeInBytes(Message message) { @@ -151,74 +154,33 @@ public int messageSizeInBytes(Message message) { return stringifiedMessage.getBytes(ENCODING).length; } - private Boolean isBackPressuredAfterSize(int incomingSize) { - int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON); - int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE; - // Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time - return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9; - } - public boolean offer(Message message) { return messageQueue.offer(message); } - public void enqueue(Message message) { - if (message != StopMessage.STOP && isShutDown.get()) { - log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); - return; - } - - try { - // @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its - // valid message - if (message != StopMessage.STOP && message != FlushMessage.POISON) { - int messageByteSize = messageSizeInBytes(message); - - // @jorgen25 check if message is below 32kb limit for individual messages, no need to check - // for extra characters - if (messageByteSize <= MSG_MAX_SIZE) { - if (isBackPressuredAfterSize(messageByteSize)) { - this.currentQueueSizeInBytes = messageByteSize; - messageQueue.put(FlushMessage.POISON); - messageQueue.put(message); - - log.print(VERBOSE, "Maximum storage size has been hit Flushing..."); - } else { - messageQueue.put(message); - this.currentQueueSizeInBytes += messageByteSize; - } - } else { - log.print( - ERROR, "Message was above individual limit. MessageId: %s", message.messageId()); - throw new IllegalArgumentException( - "Message was above individual limit. MessageId: " + message.messageId()); + public void enqueue(Message message) { + if (isShutDown.get()) { + log.print(ERROR, "Attempt to enqueue a message when shutdown has been called %s.", message); + return; + } + if (!messageQueue.offer(message)) { + handleError(message); + } + else { + System.err.println("enqueued " + message.messageId()); } - } else { - messageQueue.put(message); - } - } catch (InterruptedException e) { - log.print(ERROR, e, "Interrupted while adding message %s.", message); - Thread.currentThread().interrupt(); - } - } - - public void flush() { - if (!isShutDown.get()) { - enqueue(FlushMessage.POISON); } - } + + // FIXME closeable public void shutdown() { if (isShutDown.compareAndSet(false, true)) { final long start = System.currentTimeMillis(); // first let's tell the system to stop - enqueue(StopMessage.STOP); + looperThread.interrupt(); + fallback.close(); - // we can shutdown the flush scheduler without worrying - flushScheduler.shutdownNow(); - - shutdownAndWait(looperExecutor, "looper"); shutdownAndWait(networkExecutor, "network"); log.print( @@ -226,7 +188,7 @@ public void shutdown() { } } - public void shutdownAndWait(ExecutorService executor, String name) { + private void shutdownAndWait(ExecutorService executor, String name) { try { executor.shutdown(); final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); @@ -247,30 +209,21 @@ public void shutdownAndWait(ExecutorService executor, String name) { * messages, it triggers a flush. */ class Looper implements Runnable { - private boolean stop; public Looper() { - this.stop = false; } @Override public void run() { LinkedList messages = new LinkedList<>(); - AtomicInteger currentBatchSize = new AtomicInteger(); + int currentBatchSize = 0; boolean batchSizeLimitReached = false; int contextSize = gsonInstance.toJson(CONTEXT).getBytes(ENCODING).length; try { - while (!stop) { - Message message = messageQueue.take(); - - if (message == StopMessage.STOP) { - log.print(VERBOSE, "Stopping the Looper"); - stop = true; - } else if (message == FlushMessage.POISON) { - if (!messages.isEmpty()) { - log.print(VERBOSE, "Flushing messages."); - } - } else { + while (!Thread.currentThread().isInterrupted()) { + Message message = messageQueue.poll(flushIntervalInMillis, TimeUnit.MILLISECONDS); + + if (message != null) { // we do +1 because we are accounting for this new message we just took from the queue // which is not in list yet // need to check if this message is going to make us go over the limit considering @@ -278,9 +231,9 @@ public void run() { int defaultBatchSize = BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1); int msgSize = messageSizeInBytes(message); - if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { + if (currentBatchSize + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) { messages.add(message); - currentBatchSize.addAndGet(msgSize); + currentBatchSize+=msgSize; } else { // put message that did not make the cut this time back on the queue, we already took // this message if we dont put it back its lost @@ -288,9 +241,13 @@ public void run() { batchSizeLimitReached = true; } } + + if (messages.isEmpty()) { + continue; + } - Boolean isBlockingSignal = message == FlushMessage.POISON || message == StopMessage.STOP; - Boolean isOverflow = messages.size() >= size; + Boolean isBlockingSignal = message == null; + Boolean isOverflow = messages.size() >= flushQueueSize; if (!messages.isEmpty() && (isOverflow || isBlockingSignal || batchSizeLimitReached)) { Batch batch = Batch.create(CONTEXT, new ArrayList<>(messages), writeKey); @@ -299,10 +256,22 @@ public void run() { "Batching %s message(s) into batch %s.", batch.batch().size(), batch.sequence()); - networkExecutor.submit( - BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); - currentBatchSize.set(0); + Call call = service.upload(uploadUrl, batch); + FailsafeCall failsafeCall = + FailsafeCall.with(failsafe).compose(call); + failsafeCall.executeAsync() + .thenAccept(r -> { + if(is5xx(r.code()) || r.code() == 429) { + handleError(batch, null); + } + }) + .exceptionally(t -> { + handleError(batch, t); + return null; + }); + + currentBatchSize = 0; messages.clear(); if (batchSizeLimitReached) { // If this is true that means the last message that would make us go over the limit @@ -315,117 +284,34 @@ public void run() { } } catch (InterruptedException e) { log.print(DEBUG, "Looper interrupted while polling for messages."); - Thread.currentThread().interrupt(); - } + // XXX CANCEL UPLOAD + } catch (Exception e) { + e.printStackTrace(); + } + // SEND pending log.print(VERBOSE, "Looper stopped"); } + } - - static class BatchUploadTask implements Runnable { - private static final Backo BACKO = - Backo.builder() // - .base(TimeUnit.SECONDS, 15) // - .cap(TimeUnit.HOURS, 1) // - .jitter(1) // - .build(); - - private final AnalyticsClient client; - private final Backo backo; - final Batch batch; - private final int maxRetries; - - static BatchUploadTask create(AnalyticsClient client, Batch batch, int maxRetries) { - return new BatchUploadTask(client, BACKO, batch, maxRetries); - } - - BatchUploadTask(AnalyticsClient client, Backo backo, Batch batch, int maxRetries) { - this.client = client; - this.batch = batch; - this.backo = backo; - this.maxRetries = maxRetries; - } - - private void notifyCallbacksWithException(Batch batch, Exception exception) { - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.failure(message, exception); - } + + void handleError(Batch batch, Throwable t) { + if(t instanceof CompletionException ) { + if(t.getCause() instanceof CircuitBreakerOpenException) { + System.err.println("OPEN"); + } } - } - - /** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */ - boolean upload() { - client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence()); - - try { - Call call = client.service.upload(client.uploadUrl, batch); - Response response = call.execute(); - - if (response.isSuccessful()) { - client.log.print(VERBOSE, "Uploaded batch %s.", batch.sequence()); - - for (Message message : batch.batch()) { - for (Callback callback : client.callbacks) { - callback.success(message); - } - } - - return false; - } - - int status = response.code(); - if (is5xx(status)) { - client.log.print( - DEBUG, "Could not upload batch %s due to server error. Retrying.", batch.sequence()); - return true; - } else if (status == 429) { - client.log.print( - DEBUG, "Could not upload batch %s due to rate limiting. Retrying.", batch.sequence()); - return true; - } - - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); - notifyCallbacksWithException(batch, new IOException(response.errorBody().string())); - - return false; - } catch (IOException error) { - client.log.print(DEBUG, error, "Could not upload batch %s. Retrying.", batch.sequence()); - - return true; - } catch (Exception exception) { - client.log.print(DEBUG, "Could not upload batch %s. Giving up.", batch.sequence()); - - notifyCallbacksWithException(batch, exception); - - return false; - } - } - - @Override - public void run() { - int attempt = 0; - for (; attempt <= maxRetries; attempt++) { - boolean retry = upload(); - if (!retry) return; - try { - backo.sleep(attempt); - } catch (InterruptedException e) { - client.log.print( - DEBUG, "Thread interrupted while backing off for batch %s.", batch.sequence()); - return; - } + for(Message msg : batch.batch()) { + fallback.add(msg); } + } - client.log.print(ERROR, "Could not upload batch %s. Retries exhausted.", batch.sequence()); - notifyCallbacksWithException( - batch, new IOException(Integer.toString(attempt) + " retries exhausted")); - } + void handleError(Message msg) { + fallback.add(msg); + } private static boolean is5xx(int status) { return status >= 500 && status < 600; - } } - public static class BatchUtility { /** diff --git a/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java new file mode 100644 index 00000000..1f85e392 --- /dev/null +++ b/analytics/src/main/java/com/segment/analytics/internal/FallbackAppender.java @@ -0,0 +1,225 @@ +package com.segment.analytics.internal; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.segment.analytics.gson.AutoValueAdapterFactory; +import com.segment.analytics.gson.ISO8601DateAdapter; +import com.segment.analytics.messages.Message; +import com.segment.analytics.messages.TrackMessage; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.charset.StandardCharsets; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import org.apache.commons.io.FileSystem; + +public class FallbackAppender { + + private static final int FLUSH_MS = 100; + private static final int BATCH = 20; + private static final int LASTMESSAGE_RETRY_MS = 10_000; + private static final String PATH = "pending"; + + private final AnalyticsClient client; + private final BlockingQueue queue; + private final File file; + private final Lock lock = new ReentrantLock(); + private final Thread writer; + private final Thread reader; + private final Gson gson; + + private transient long lastMessage; + + public FallbackAppender(AnalyticsClient client) { + this.client = client; + this.file = new File(PATH); + this.queue = new ArrayBlockingQueue(100); + this.writer = new Thread(new FileWriter()); // XXX threadFactory daemon + this.reader = new Thread(new FileReader()); // XXX threadFactory daemon + this.gson = new GsonBuilder() + .registerTypeAdapterFactory(new AutoValueAdapterFactory()) + .registerTypeAdapter(Date.class, new ISO8601DateAdapter()) + .create(); + + file.delete(); // FIXME do not remove on start + + this.lastMessage = System.currentTimeMillis(); + this.writer.start(); + this.reader.start(); + } + + public void close() { + reader.interrupt(); + writer.interrupt(); + } + + // block !!! + public void add(Message msg) { + try { + System.err.println("failed " + msg.messageId()); + queue.put(msg); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + class FileReader implements Runnable { + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + if (queue.isEmpty() && System.currentTimeMillis() - lastMessage > LASTMESSAGE_RETRY_MS) { + if (file.length() == 0) { + continue; + } + + List msgs; + try { + msgs = truncate(20); // XXX messageSize + if (msgs.isEmpty()) { + continue; + } + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + lastMessage = System.currentTimeMillis(); + continue; + } + + while (!msgs.isEmpty()) { + boolean canEnqueue = true; + for (int i = msgs.size() - 1; canEnqueue && i >= 0; i--) { + Message msg = msgs.get(i); + canEnqueue = client.offer(msg); + if (canEnqueue) { + msgs.remove(i); + System.err.println("reenqueued " + msg.messageId()); + } else { + // slow down next iteration when http queue overflow + try { + Thread.sleep(1_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + } + + try { + Thread.sleep(1_000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } + + class FileWriter implements Runnable { + @Override + public void run() { + final List batch = new ArrayList<>(); + while (!Thread.currentThread().isInterrupted()) { + try { + final Message msg = queue.poll(FLUSH_MS, TimeUnit.MILLISECONDS); + if (msg == null) { + if (!batch.isEmpty()) { + write(batch); + } + } else { + batch.add(msg); + if (batch.size() >= BATCH) { + write(batch); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (!batch.isEmpty()) { + write(batch); + } + } + } + + List truncate(int numMessages) throws IOException { + lock.lock(); + + if (!file.exists()) { + lock.unlock(); + return Collections.emptyList(); + } + + try (ReversedLinesFileReader reader = ReversedLinesFileReader.builder() + .setPath(file.toPath()) + .setBufferSize(FileSystem.getCurrent().getBlockSize()) + .setCharset(StandardCharsets.UTF_8) + .get()) { + + return reader.readLines(numMessages).stream() + .map(this::fromJson) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } finally { + lock.unlock(); + } + } + + private static final byte[] NEW_LINE = System.lineSeparator().getBytes(StandardCharsets.UTF_8); + + private void write(List batch) { + lock.lock(); + try (FileChannel fileChannel = FileChannel.open( + file.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE); + OutputStream os = Channels.newOutputStream(fileChannel); + FileLock fileLock = fileChannel.lock(); ) { + + for (Message msg : batch) { + os.write(toJson(msg).getBytes(StandardCharsets.UTF_8)); + os.write(NEW_LINE); + } + + fileChannel.force(true); + + batch.clear(); + + lastMessage = System.currentTimeMillis(); + } catch (IOException e) { + e.printStackTrace(); // FIXME + } finally { + lock.unlock(); + } + } + + private String toJson(final Message msg) { + try { + return gson.toJson(msg); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + + private Message fromJson(final String msg) { + try { + // FIXME only track + return gson.fromJson(msg, TrackMessage.class); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java b/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java deleted file mode 100644 index b3ee9dc2..00000000 --- a/analytics/src/main/java/com/segment/analytics/internal/FlushMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.segment.analytics.internal; - -import com.segment.analytics.messages.Message; -import java.util.Date; -import java.util.Map; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -class FlushMessage implements Message { - static final FlushMessage POISON = new FlushMessage(); - - private FlushMessage() {} - - @Nonnull - @Override - public Type type() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public String messageId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Date sentAt() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public Date timestamp() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map context() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String anonymousId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String userId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map integrations() { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - return "FlushMessage{}"; - } -} diff --git a/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java b/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java new file mode 100644 index 00000000..09a7a773 --- /dev/null +++ b/analytics/src/main/java/com/segment/analytics/internal/ReversedLinesFileReader.java @@ -0,0 +1,552 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.segment.analytics.internal; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.FileSystem; +import org.apache.commons.io.StandardLineSeparator; +import org.apache.commons.io.build.AbstractStreamBuilder; + +/** + * Reads lines in a file reversely (similar to a BufferedReader, but starting at the last line). Useful for e.g. searching in log files. + *

+ * To build an instance, use {@link Builder}. + *

+ * + * @see Builder + * @since 2.2 + */ +public class ReversedLinesFileReader implements Closeable { + + // @formatter:off + /** + * Builds a new {@link ReversedLinesFileReader}. + * + *

+ * For example: + *

+ *
{@code
+     * ReversedLinesFileReader r = ReversedLinesFileReader.builder()
+     *   .setPath(path)
+     *   .setBufferSize(4096)
+     *   .setCharset(StandardCharsets.UTF_8)
+     *   .get();}
+     * 
+ * + * @see #get() + * @since 2.12.0 + */ + // @formatter:on + public static class Builder extends AbstractStreamBuilder { + + /** + * Constructs a new {@link Builder}. + */ + public Builder() { + setBufferSizeDefault(DEFAULT_BLOCK_SIZE); + setBufferSize(DEFAULT_BLOCK_SIZE); + } + + /** + * Builds a new {@link ReversedLinesFileReader}. + *

+ * You must set input that supports {@link #getInputStream()} on this builder, otherwise, this method throws an exception. + *

+ *

+ * This builder use the following aspects: + *

+ *
    + *
  • {@link #getInputStream()}
  • + *
  • {@link #getBufferSize()}
  • + *
  • {@link #getCharset()}
  • + *
+ * + * @return a new instance. + * @throws IllegalStateException if the {@code origin} is {@code null}. + * @throws UnsupportedOperationException if the origin cannot be converted to a {@link Path}. + * @throws IOException if an I/O error occurs. + * @see #getPath() + * @see #getBufferSize() + * @see #getCharset() + */ + @Override + public ReversedLinesFileReader get() throws IOException { + return new ReversedLinesFileReader(getPath(), getBufferSize(), getCharset()); + } + } + + private final class FilePart { + private final long no; + + private final byte[] data; + + private byte[] leftOver; + + private int currentLastBytePos; + + /** + * Constructs a new instance. + * + * @param no the part number + * @param length its length + * @param leftOverOfLastFilePart remainder + * @throws IOException if there is a problem reading the file + */ + private FilePart(final long no, final int length, final byte[] leftOverOfLastFilePart) throws IOException { + this.no = no; + final int dataLength = length + (leftOverOfLastFilePart != null ? leftOverOfLastFilePart.length : 0); + this.data = new byte[dataLength]; + final long off = (no - 1) * blockSize; + + // read data + if (no > 0 /* file not empty */) { + channel.position(off); + final int countRead = channel.read(ByteBuffer.wrap(data, 0, length)); + if (countRead != length) { + throw new IllegalStateException("Count of requested bytes and actually read bytes don't match"); + } + } + // copy left over part into data arr + if (leftOverOfLastFilePart != null) { + System.arraycopy(leftOverOfLastFilePart, 0, data, length, leftOverOfLastFilePart.length); + } + this.currentLastBytePos = data.length - 1; + this.leftOver = null; + } + + /** + * Constructs the buffer containing any leftover bytes. + */ + private void createLeftOver() { + final int lineLengthBytes = currentLastBytePos + 1; + if (lineLengthBytes > 0) { + // create left over for next block + leftOver = Arrays.copyOf(data, lineLengthBytes); + } else { + leftOver = null; + } + currentLastBytePos = -1; + } + + /** + * Finds the new-line sequence and return its length. + * + * @param data buffer to scan + * @param i start offset in buffer + * @return length of newline sequence or 0 if none found + */ + private int getNewLineMatchByteCount(final byte[] data, final int i) { + for (final byte[] newLineSequence : newLineSequences) { + boolean match = true; + for (int j = newLineSequence.length - 1; j >= 0; j--) { + final int k = i + j - (newLineSequence.length - 1); + match &= k >= 0 && data[k] == newLineSequence[j]; + } + if (match) { + return newLineSequence.length; + } + } + return 0; + } + + /** + * Reads a line. + * + * @return the line or null + */ + private String readLine() { // NOPMD Bug in PMD + + String line = null; + int newLineMatchByteCount; + + final boolean isLastFilePart = no == 1; + + int i = currentLastBytePos; + while (i > -1) { + + if (!isLastFilePart && i < avoidNewlineSplitBufferSize) { + // avoidNewlineSplitBuffer: for all except the last file part we + // take a few bytes to the next file part to avoid splitting of newlines + createLeftOver(); + break; // skip last few bytes and leave it to the next file part + } + + // check for newline + if ((newLineMatchByteCount = getNewLineMatchByteCount(data, i)) > 0 /* found newline */) { + final int lineStart = i + 1; + final int lineLengthBytes = currentLastBytePos - lineStart + 1; + + if (lineLengthBytes < 0) { + throw new IllegalStateException("Unexpected negative line length=" + lineLengthBytes); + } + final byte[] lineData = Arrays.copyOfRange(data, lineStart, lineStart + lineLengthBytes); + + line = new String(lineData, charset); + + currentLastBytePos = i - newLineMatchByteCount; + break; // found line + } + + // move cursor + i -= byteDecrement; + + // end of file part handling + if (i < 0) { + createLeftOver(); + break; // end of file part + } + } + + // last file part handling + if (isLastFilePart && leftOver != null) { + // there will be no line break anymore, this is the first line of the file + line = new String(leftOver, charset); + leftOver = null; + } + + return line; + } + + /** + * Handles block rollover + * + * @return the new FilePart or null + * @throws IOException if there was a problem reading the file + */ + private FilePart rollOver() throws IOException { + + if (currentLastBytePos > -1) { + throw new IllegalStateException("Current currentLastCharPos unexpectedly positive... " + + "last readLine() should have returned something! currentLastCharPos=" + currentLastBytePos); + } + + if (no > 1) { + return new FilePart(no - 1, blockSize, leftOver); + } + // NO 1 was the last FilePart, we're finished + if (leftOver != null) { + throw new IllegalStateException("Unexpected leftover of the last block: leftOverOfThisFilePart=" + + new String(leftOver, charset)); + } + return null; + } + } + + private static final String EMPTY_STRING = ""; + + private static final int DEFAULT_BLOCK_SIZE = FileSystem.getCurrent().getBlockSize(); + + /** + * Constructs a new {@link Builder}. + * + * @return a new {@link Builder}. + * @since 2.12.0 + */ + public static Builder builder() { + return new Builder(); + } + + private final int blockSize; + private final Charset charset; + private final FileChannel channel; + private final FileLock fileLock; + private final long totalByteLength; + private final long totalBlockCount; + private final byte[][] newLineSequences; + private final int avoidNewlineSplitBufferSize; + private final int byteDecrement; + private FilePart currentFilePart; + private boolean trailingNewlineOfFileSkipped; + + /** + * Constructs a ReversedLinesFileReader with default block size of 4KB and the + * platform's default encoding. + * + * @param file the file to be read + * @throws IOException if an I/O error occurs. + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final File file) throws IOException { + this(file, DEFAULT_BLOCK_SIZE, Charset.defaultCharset()); + } + + /** + * Constructs a ReversedLinesFileReader with default block size of 4KB and the + * specified encoding. + * + * @param file the file to be read + * @param charset the charset to use, null uses the default Charset. + * @throws IOException if an I/O error occurs. + * @since 2.5 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final File file, final Charset charset) throws IOException { + this(file.toPath(), charset); + } + + /** + * Constructs a ReversedLinesFileReader with the given block size and encoding. + * + * @param file the file to be read + * @param blockSize size of the internal buffer (for ideal performance this + * should match with the block size of the underlying file + * system). + * @param charset the encoding of the file, null uses the default Charset. + * @throws IOException if an I/O error occurs. + * @since 2.3 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final File file, final int blockSize, final Charset charset) throws IOException { + this(file.toPath(), blockSize, charset); + } + + /** + * Constructs a ReversedLinesFileReader with the given block size and encoding. + * + * @param file the file to be read + * @param blockSize size of the internal buffer (for ideal performance this + * should match with the block size of the underlying file + * system). + * @param charsetName the encoding of the file, null uses the default Charset. + * @throws IOException if an I/O error occurs + * @throws java.nio.charset.UnsupportedCharsetException if the encoding is not supported + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final File file, final int blockSize, final String charsetName) throws IOException { + this(file.toPath(), blockSize, charsetName); + } + + /** + * Constructs a ReversedLinesFileReader with default block size of 4KB and the + * specified encoding. + * + * @param file the file to be read + * @param charset the charset to use, null uses the default Charset. + * @throws IOException if an I/O error occurs. + * @since 2.7 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final Path file, final Charset charset) throws IOException { + this(file, DEFAULT_BLOCK_SIZE, charset); + } + + /** + * Constructs a ReversedLinesFileReader with the given block size and encoding. + * + * @param file the file to be read + * @param blockSize size of the internal buffer (for ideal performance this + * should match with the block size of the underlying file + * system). + * @param charset the encoding of the file, null uses the default Charset. + * @throws IOException if an I/O error occurs. + * @since 2.7 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final Path file, final int blockSize, final Charset charset) throws IOException { + this.blockSize = blockSize; + this.charset = Charsets.toCharset(charset); + + // --- check & prepare encoding --- + final CharsetEncoder charsetEncoder = this.charset.newEncoder(); + final float maxBytesPerChar = charsetEncoder.maxBytesPerChar(); + if (maxBytesPerChar == 1f || this.charset == StandardCharsets.UTF_8) { + // all one byte encodings are no problem + byteDecrement = 1; + } else if (this.charset == Charset.forName("Shift_JIS") + || // Same as for UTF-8 + // http://www.herongyang.com/Unicode/JIS-Shift-JIS-Encoding.html + this.charset == Charset.forName("windows-31j") + || // Windows code page 932 (Japanese) + this.charset == Charset.forName("x-windows-949") + || // Windows code page 949 (Korean) + this.charset == Charset.forName("gbk") + || // Windows code page 936 (Simplified Chinese) + this.charset == Charset.forName("x-windows-950")) { // Windows code page 950 (Traditional Chinese) + byteDecrement = 1; + } else if (this.charset == StandardCharsets.UTF_16BE || this.charset == StandardCharsets.UTF_16LE) { + // UTF-16 new line sequences are not allowed as second tuple of four byte + // sequences, + // however byte order has to be specified + byteDecrement = 2; + } else if (this.charset == StandardCharsets.UTF_16) { + throw new UnsupportedEncodingException( + "For UTF-16, you need to specify the byte order (use UTF-16BE or " + "UTF-16LE)"); + } else { + throw new UnsupportedEncodingException( + "Encoding " + charset + " is not supported yet (feel free to " + "submit a patch)"); + } + + // NOTE: The new line sequences are matched in the order given, so it is + // important that \r\n is BEFORE \n + this.newLineSequences = new byte[][] { + StandardLineSeparator.CRLF.getBytes(this.charset), + StandardLineSeparator.LF.getBytes(this.charset), + StandardLineSeparator.CR.getBytes(this.charset) + }; + + this.avoidNewlineSplitBufferSize = newLineSequences[0].length; + + // Open file + this.channel = FileChannel.open(file, StandardOpenOption.READ, StandardOpenOption.WRITE); + this.fileLock = channel.lock(); + this.totalByteLength = channel.size(); + int lastBlockLength = (int) (this.totalByteLength % blockSize); + if (lastBlockLength > 0) { + this.totalBlockCount = this.totalByteLength / blockSize + 1; + } else { + this.totalBlockCount = this.totalByteLength / blockSize; + if (this.totalByteLength > 0) { + lastBlockLength = blockSize; + } + } + this.currentFilePart = new FilePart(totalBlockCount, lastBlockLength, null); + } + + /** + * Constructs a ReversedLinesFileReader with the given block size and encoding. + * + * @param file the file to be read + * @param blockSize size of the internal buffer (for ideal performance this + * should match with the block size of the underlying file + * system). + * @param charsetName the encoding of the file, null uses the default Charset. + * @throws IOException if an I/O error occurs + * @throws java.nio.charset.UnsupportedCharsetException if the encoding is not supported + * @since 2.7 + * @deprecated Use {@link #builder()}, {@link Builder}, and {@link Builder#get()} + */ + @Deprecated + public ReversedLinesFileReader(final Path file, final int blockSize, final String charsetName) throws IOException { + this(file, blockSize, Charsets.toCharset(charsetName)); + } + + /** + * Closes underlying resources. + * + * @throws IOException if an I/O error occurs. + */ + @Override + public void close() throws IOException { + fileLock.release(); + channel.close(); + } + + /** + * Returns the lines of the file from bottom to top. + * + * @return the next line or null if the start of the file is reached + * @throws IOException if an I/O error occurs. + */ + public String readLine() throws IOException { + + String line = currentFilePart.readLine(); + while (line == null) { + currentFilePart = currentFilePart.rollOver(); + if (currentFilePart == null) { + // no more FileParts: we're done, leave line set to null + break; + } + line = currentFilePart.readLine(); + } + + // aligned behavior with BufferedReader that doesn't return a last, empty line + if (EMPTY_STRING.equals(line) && !trailingNewlineOfFileSkipped) { + trailingNewlineOfFileSkipped = true; + line = readLine(); + } + + return line; + } + + /** + * Returns {@code lineCount} lines of the file from bottom to top. + *

+ * If there are less than {@code lineCount} lines in the file, then that's what + * you get. + *

+ *

+ * Note: You can easily flip the result with {@link Collections#reverse(List)}. + *

+ * + * @param lineCount How many lines to read. + * @return A new list + * @throws IOException if an I/O error occurs. + * @since 2.8.0 + */ + public List readLines(final int lineCount) throws IOException { + if (lineCount < 0) { + throw new IllegalArgumentException("lineCount < 0"); + } + final ArrayList arrayList = new ArrayList<>(lineCount); + for (int i = 0; i < lineCount; i++) { + final String line = readLine(); + if (line == null) { + channel.truncate(0); + return arrayList; + } + arrayList.add(line); + } + + long truncateTo = (this.currentFilePart.no - 1) * blockSize; + truncateTo += this.currentFilePart.currentLastBytePos + 1; + channel.truncate(truncateTo); + + channel.force(true); + + return arrayList; + } + + /** + * Returns the last {@code lineCount} lines of the file. + *

+ * If there are less than {@code lineCount} lines in the file, then that's what + * you get. + *

+ * + * @param lineCount How many lines to read. + * @return A String. + * @throws IOException if an I/O error occurs. + * @since 2.8.0 + */ + public String toString(final int lineCount) throws IOException { + final List lines = readLines(lineCount); + Collections.reverse(lines); + return lines.isEmpty() ? EMPTY_STRING : String.join(System.lineSeparator(), lines) + System.lineSeparator(); + } +} diff --git a/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java b/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java deleted file mode 100644 index eccd278c..00000000 --- a/analytics/src/main/java/com/segment/analytics/internal/StopMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.segment.analytics.internal; - -import com.segment.analytics.messages.Message; -import java.util.Date; -import java.util.Map; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -class StopMessage implements Message { - static final StopMessage STOP = new StopMessage(); - - private StopMessage() {} - - @Nonnull - @Override - public Type type() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public String messageId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Date sentAt() { - throw new UnsupportedOperationException(); - } - - @Nonnull - @Override - public Date timestamp() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map context() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String anonymousId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public String userId() { - throw new UnsupportedOperationException(); - } - - @Nullable - @Override - public Map integrations() { - throw new UnsupportedOperationException(); - } - - @Override - public String toString() { - return "StopMessage{}"; - } -} diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java index 31596e90..e1c282ba 100644 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java +++ b/analytics/src/test/java/com/segment/analytics/AnalyticsBuilderTest.java @@ -381,46 +381,12 @@ public void buildsWithThreadFactory() { assertThat(analytics).isNotNull(); } - @Test - public void nullCallback() { - try { - builder.callback(null); - fail("Should fail for null callback"); - } catch (NullPointerException e) { - assertThat(e).hasMessage("Null callback"); - } - } - - @Test - public void duplicateCallback() { - Callback callback = mock(Callback.class); - try { - builder.callback(callback).callback(callback); - } catch (IllegalStateException e) { - assertThat(e).hasMessage("Callback is already registered."); - } - } - - @Test - public void buildsWithValidCallback() { - Analytics analytics = builder.callback(mock(Callback.class)).build(); - assertThat(analytics).isNotNull(); - } - @Test public void buildsWithForceTlsV1() { Analytics analytics = builder.forceTlsVersion1().build(); assertThat(analytics).isNotNull(); } - @Test - public void multipleCallbacks() { - Analytics analytics = - builder.callback(mock(Callback.class)).callback(mock(Callback.class)).build(); - - assertThat(analytics).isNotNull(); - } - @Test public void nullPlugin() { try { diff --git a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java b/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java deleted file mode 100644 index 8be3012e..00000000 --- a/analytics/src/test/java/com/segment/analytics/AnalyticsTest.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.segment.analytics; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; - -import com.segment.analytics.TestUtils.MessageBuilderTest; -import com.segment.analytics.internal.AnalyticsClient; -import com.segment.analytics.messages.Message; -import com.segment.analytics.messages.MessageBuilder; -import com.squareup.burst.BurstJUnit4; -import java.lang.reflect.Field; -import java.time.LocalDateTime; -import java.time.temporal.ChronoUnit; -import java.util.Collections; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; - -@RunWith(BurstJUnit4.class) -public class AnalyticsTest { - @Mock AnalyticsClient client; - @Mock Log log; - @Mock MessageTransformer messageTransformer; - @Mock MessageInterceptor messageInterceptor; - Analytics analytics; - - @Before - public void setUp() { - initMocks(this); - - analytics = - new Analytics( - client, - Collections.singletonList(messageTransformer), - Collections.singletonList(messageInterceptor), - log); - } - - @Test - public void enqueueIsDispatched(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - Message message = messageBuilder.build(); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - when(messageInterceptor.intercept(any(Message.class))).thenReturn(message); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client).enqueue(message); - } - - @Test - public void doesNotEnqueueWhenTransformerReturnsFalse(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - when(messageTransformer.transform(messageBuilder)).thenReturn(false); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor, never()).intercept(any(Message.class)); - verify(client, never()).enqueue(any(Message.class)); - } - - @Test - public void doesNotEnqueueWhenInterceptorReturnsNull(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("prateek"); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - - analytics.enqueue(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client, never()).enqueue(any(Message.class)); - } - - @Test - public void shutdownIsDispatched() { - analytics.shutdown(); - - verify(client).shutdown(); - } - - @Test - public void flushIsDispatched() { - analytics.flush(); - - verify(client).flush(); - } - - @Test - public void offerIsDispatched(MessageBuilderTest builder) { - MessageBuilder messageBuilder = builder.get().userId("dummy"); - Message message = messageBuilder.build(); - when(messageTransformer.transform(messageBuilder)).thenReturn(true); - when(messageInterceptor.intercept(any(Message.class))).thenReturn(message); - - analytics.offer(messageBuilder); - - verify(messageTransformer).transform(messageBuilder); - verify(messageInterceptor).intercept(any(Message.class)); - verify(client).offer(message); - } - - @Test - public void threadSafeTest(MessageBuilderTest builder) - throws NoSuchFieldException, IllegalAccessException, InterruptedException { - // we want to test if msgs get lost during a multithreaded env - Analytics analytics = Analytics.builder("testWriteKeyForIssue321").build(); - // So we just want to spy on the client of an Analytics object created normally - Field clientField = analytics.getClass().getDeclaredField("client"); - clientField.setAccessible(true); - AnalyticsClient spy = spy((AnalyticsClient) clientField.get(analytics)); - clientField.set(analytics, spy); - - // we are going to run this test for a specific amount of seconds - int millisRunning = 200; - LocalDateTime initialTime = LocalDateTime.now(); - LocalDateTime now; - - // and a set number of threads will be using the library - ExecutorService service = Executors.newFixedThreadPool(20); - AtomicInteger counter = new AtomicInteger(); - - MessageBuilder messageBuilder = builder.get().userId("jorgen25"); - - do { - service.submit( - () -> { - analytics.enqueue(messageBuilder); - counter.incrementAndGet(); - }); - now = LocalDateTime.now(); - } while (initialTime.until(now, ChronoUnit.MILLIS) < millisRunning); - - service.shutdown(); - while (!service.isShutdown() || !service.isTerminated()) {} - - verify(spy, times(counter.get())).enqueue(any(Message.class)); - } -} diff --git a/analytics/src/test/java/com/segment/analytics/SegmentTest.java b/analytics/src/test/java/com/segment/analytics/SegmentTest.java new file mode 100644 index 00000000..26474649 --- /dev/null +++ b/analytics/src/test/java/com/segment/analytics/SegmentTest.java @@ -0,0 +1,141 @@ +package com.segment.analytics; + +import static com.github.tomakehurst.wiremock.client.WireMock.okJson; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig; + +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.junit.WireMockRule; +import com.github.tomakehurst.wiremock.stubbing.ServeEvent; +import com.segment.analytics.messages.TrackMessage; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import wiremock.com.fasterxml.jackson.core.JsonProcessingException; +import wiremock.com.fasterxml.jackson.databind.JsonNode; +import wiremock.com.fasterxml.jackson.databind.ObjectMapper; +import wiremock.com.google.common.util.concurrent.RateLimiter; + +public class SegmentTest { + + @Rule + public WireMockRule wireMockRule = new WireMockRule( + wireMockConfig() + .port(8088) + // .dynamicPort() + .gzipDisabled(true), + false); + + Analytics analytics; + + @Before + public void confWireMockAndClient() { + stubFor(post(urlEqualTo("/v1/import/")).willReturn(okJson("{\"success\": \"true\"}"))); + + analytics = Analytics.builder("write-key") + .endpoint(wireMockRule.baseUrl()) + .flushInterval(1, TimeUnit.SECONDS) + .flushQueueSize(20) + .queueCapacity(50) + // http client + .build(); + } + + @After + public void tearDown() { + analytics.shutdown(); + } + + @Test + public void test() throws Throwable { + + stubFor(post(urlEqualTo("/v1/import/")) + .willReturn( + WireMock.aResponse().withStatus(503).withBody("fail").withUniformRandomDelay(100, 1_000))); + + int requestsPerSecond = 10; + int numClients = 10; + int timeToRun = 90_000; + int timeToRestore = 30_000; + + long start = System.currentTimeMillis(); + boolean upAgain = false; + final AtomicInteger id = new AtomicInteger(0); + List ids = new ArrayList<>(); + + RateLimiter rate = RateLimiter.create(requestsPerSecond); + ExecutorService exec = Executors.newWorkStealingPool(numClients); + + while (System.currentTimeMillis() - start < timeToRun) { + if (rate.tryAcquire()) { + exec.submit(() -> { + String msgid = "m" + id.getAndIncrement(); + ids.add(msgid); + analytics.enqueue( + TrackMessage.builder("my-track").messageId(msgid).userId("userId")); + }); + } + Thread.sleep(1); + if (!upAgain && System.currentTimeMillis() - start > timeToRestore) { + upAgain = true; + stubFor(post(urlEqualTo("/v1/import/")) + .willReturn(okJson("{\"success\": \"true\"}").withUniformRandomDelay(100, 1_000))); + System.err.println("UP AGAIN"); + } + } + + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollInterval(1, TimeUnit.SECONDS) + .until(() -> sentMessagesEqualsTo(ids.toArray(new String[ids.size()]))); + + exec.shutdownNow(); + exec.awaitTermination(10, TimeUnit.SECONDS); + } + + private static final ObjectMapper OM = new ObjectMapper(); + + private boolean sentMessagesEqualsTo(String... msgIds) { + return new HashSet<>(sentMessages()).equals(new HashSet<>(Arrays.asList(msgIds))); + } + + private Set sentMessages() { + Set messageIds = new HashSet<>(); + for (ServeEvent event : wireMockRule.getAllServeEvents()) { + if (event.getResponse().getStatus() != 200) { + continue; + } + + JsonNode batch; + try { + JsonNode json = OM.readTree(event.getRequest().getBodyAsString()); + batch = json.get("batch"); + if (batch == null) { + continue; + } + } catch (JsonProcessingException e) { + continue; + } + Iterator msgs = batch.elements(); + while (msgs.hasNext()) { + messageIds.add(msgs.next().get("messageId").asText()); + } + } + System.err.println("Confirmed msgs : " + messageIds.size()); + return messageIds; + } +} diff --git a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java b/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java deleted file mode 100644 index 74f04e13..00000000 --- a/analytics/src/test/java/com/segment/analytics/internal/AnalyticsClientTest.java +++ /dev/null @@ -1,969 +0,0 @@ -package com.segment.analytics.internal; - -import static com.segment.analytics.internal.FlushMessage.POISON; -import static com.segment.analytics.internal.StopMessage.STOP; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.openMocks; - -import com.google.gson.Gson; -import com.segment.analytics.Callback; -import com.segment.analytics.Log; -import com.segment.analytics.TestUtils.MessageBuilderTest; -import com.segment.analytics.http.SegmentService; -import com.segment.analytics.http.UploadResponse; -import com.segment.analytics.internal.AnalyticsClient.BatchUploadTask; -import com.segment.analytics.messages.Batch; -import com.segment.analytics.messages.Message; -import com.segment.analytics.messages.TrackMessage; -import com.segment.backo.Backo; -import com.squareup.burst.BurstJUnit4; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Queue; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import okhttp3.ResponseBody; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; -import org.mockito.Mock; -import org.mockito.Spy; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import retrofit2.Call; -import retrofit2.Response; -import retrofit2.mock.Calls; - -@RunWith(BurstJUnit4.class) // -public class AnalyticsClientTest { - // Backo instance for testing which trims down the wait times. - private static final Backo BACKO = - Backo.builder().base(TimeUnit.NANOSECONDS, 1).factor(1).build(); - - private int DEFAULT_RETRIES = 10; - private int MAX_BATCH_SIZE = 1024 * 500; // 500kb - private int MAX_MSG_SIZE = 1024 * 32; // 32kb //This is the limit for a message object - private int MSG_MAX_CREATE_SIZE = - MAX_MSG_SIZE - - 200; // Once we create msg object with this size it barely below 32 threshold so good - // for tests - private static String writeKey = "writeKey"; - - Log log = Log.NONE; - - ThreadFactory threadFactory; - @Spy LinkedBlockingQueue messageQueue; - @Mock SegmentService segmentService; - @Mock ExecutorService networkExecutor; - @Mock Callback callback; - @Mock UploadResponse response; - - AtomicBoolean isShutDown; - - @Before - public void setUp() { - openMocks(this); - - isShutDown = new AtomicBoolean(false); - threadFactory = Executors.defaultThreadFactory(); - } - - // Defers loading the client until tests can initialize all required - // dependencies. - AnalyticsClient newClient() { - return new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - } - - @Test - public void enqueueAddsToQueue(MessageBuilderTest builder) throws InterruptedException { - AnalyticsClient client = newClient(); - - Message message = builder.get().userId("prateek").build(); - client.enqueue(message); - - verify(messageQueue).put(message); - } - - @Test - public void shutdown() throws InterruptedException { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - client.shutdown(); - - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - } - - @Test - public void flushInsertsPoison() throws InterruptedException { - AnalyticsClient client = newClient(); - - client.flush(); - - verify(messageQueue).put(FlushMessage.POISON); - } - - /** Wait until the queue is drained. */ - static void wait(Queue queue) { - // noinspection StatementWithEmptyBody - while (queue.size() > 0) {} - } - - /** - * Verify that a {@link BatchUploadTask} was submitted to the executor, and return the {@link - * BatchUploadTask#batch} it was uploading.. - */ - static Batch captureBatch(ExecutorService executor) { - final ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(executor, timeout(1000)).submit(runnableArgumentCaptor.capture()); - final BatchUploadTask task = (BatchUploadTask) runnableArgumentCaptor.getValue(); - return task.batch; - } - - private static String generateDataOfSize(int msgSize) { - char[] chars = new char[msgSize]; - Arrays.fill(chars, 'a'); - - return new String(chars); - } - - private static String generateDataOfSizeSpecialChars( - int sizeInBytes, boolean slightlyBelowLimit) { - StringBuilder builder = new StringBuilder(); - Character[] specialChars = new Character[] {'$', '¢', 'ह', '€', '한', '©', '¶'}; - int currentSize = 0; - String smileyFace = "\uD83D\uDE01"; - // 😁 = '\uD83D\uDE01'; - Random rand = new Random(); - int loopCount = 1; - while (currentSize < sizeInBytes) { - int randomNum; - // decide if regular/special character - if (loopCount > 3 && loopCount % 4 == 0) { - randomNum = rand.nextInt(((specialChars.length - 1) - 0) + 1) + 0; - builder.append(specialChars[randomNum]); - } else if (loopCount > 9 && loopCount % 10 == 0) { - builder.append(smileyFace); - } else { - // random letter from a - z - randomNum = rand.nextInt(('z' - 'a') + 1) + 'a'; - builder.append((char) randomNum); - } - - // check size so far - String temp = builder.toString(); - currentSize = temp.getBytes(StandardCharsets.UTF_8).length; - if (slightlyBelowLimit && ((sizeInBytes - currentSize) < 500)) { - break; - } - loopCount++; - } - return builder.toString(); - } - - @Test - public void flushSubmitsToExecutor() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - TrackMessage first = TrackMessage.builder("foo").userId("bar").build(); - TrackMessage second = TrackMessage.builder("qaz").userId("qux").build(); - client.enqueue(first); - client.enqueue(second); - client.flush(); - wait(messageQueue); - - assertThat(captureBatch(networkExecutor).batch()).containsExactly(first, second); - } - - @Test - public void enqueueMaxTriggersFlush() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - // Enqueuing 51 messages (> 50) should trigger flush. - for (int i = 0; i < 51; i++) { - client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); - } - wait(messageQueue); - - // Verify that the executor saw the batch. - assertThat(captureBatch(networkExecutor).batch()).hasSize(50); - } - - @Test - public void shouldBeAbleToCalculateMessageSize() { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(1024 * 33)); - - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - // can't test for exact size cause other attributes come in play - assertThat(client.messageSizeInBytes(bigMessage)).isGreaterThan(1024 * 33); - } - - @Test - public void dontFlushUntilReachesMaxSize() throws InterruptedException { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - - properties.put("property2", generateDataOfSize(MAX_BATCH_SIZE - 200)); - - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - // throw new InterruptedException(e.getMessage()); - } - - wait(messageQueue); - - verify(networkExecutor, never()).submit(any(Runnable.class)); - } - - /** - * Modified this test case since we are changing logic to NOT allow messages bigger than 32 kbs - * individually to be enqueued, hence had to lower the size of the generated msg here. chose - * MSG_MAX_CREATE_SIZE because it will generate a message just below the limit of 32 kb after it - * creates a Message object modified the number of events that will be created since the batch - * creation logic was also changed to not allow batches larger than 500 kb meaning every 15/16 - * events the queue will be backPressured and poisoned/flushed (3 times) (purpose of test) AND - * there will be 4 batches submitted (15 msgs, 1 msg, 15 msg, 15 msg) so purpose of test case - * stands - * - * @throws InterruptedException - */ - @Test - public void flushHowManyTimesNecessaryToStayWithinLimit() throws InterruptedException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - - properties.put("property3", generateDataOfSize(MSG_MAX_CREATE_SIZE)); - - for (int i = 0; i < 46; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - client.enqueue(bigMessage); - verify(messageQueue).put(bigMessage); - } - - wait(messageQueue); - /** - * modified from expected 4 to expected 3 times, since we removed the inner loop. The inner loop - * was forcing to message list created from the queue to keep making batches even if its a 1 - * message batch until the message list is empty, that was forcing the code to make one last - * batch of 1 msg in size bumping the number of times a batch would be submitted from 3 to 4 - */ - verify(networkExecutor, times(3)).submit(any(Runnable.class)); - } - - /** - * Had to slightly change test case since we are now modifying the logic to NOT allow messages - * above 32 KB in size So needed to change size of generated msg to MSG_MAX_CREATE_SIZE to keep - * purpose of test case intact which is to test the scenario for several messages eventually - * filling up the queue and flushing. Batches submitted will change from 1 to 2 because the queue - * will be backpressured at 16 (at this point queue is over the 500KB batch limit so its flushed - * and when batch is created 16 will be above 500kbs limit so it creates one batch for 15 msg and - * another one for the remaining single message so 500kb limit per batch is not violated - * - * @throws InterruptedException - */ - @Test - public void flushWhenMultipleMessagesReachesMaxSize() throws InterruptedException { - AnalyticsClient client = newClient(); - Map properties = new HashMap(); - properties.put("property3", generateDataOfSize(MSG_MAX_CREATE_SIZE)); - - for (int i = 0; i < 16; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("bar").properties(properties).build(); - client.enqueue(bigMessage); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - verify(networkExecutor, times(2)).submit(any(Runnable.class)); - } - - @Test - public void enqueueBeforeMaxDoesNotTriggerFlush() { - messageQueue = new LinkedBlockingQueue<>(); - AnalyticsClient client = newClient(); - - // Enqueuing 5 messages (< 50) should not trigger flush. - for (int i = 0; i < 5; i++) { - client.enqueue(TrackMessage.builder("Event " + i).userId("bar").build()); - } - wait(messageQueue); - - // Verify that the executor didn't see anything. - verify(networkExecutor, never()).submit(any(Runnable.class)); - } - - static Batch batchFor(Message message) { - return Batch.create( - Collections.emptyMap(), Collections.singletonList(message), writeKey); - } - - @Test - public void batchRetriesForNetworkErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - Response successResponse = Response.success(200, response); - Response failureResponse = Response.error(429, ResponseBody.create(null, "")); - - // Throw a network error 3 times. - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(failureResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchRetriesForHTTP5xxErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error 3 times. - - Response successResponse = Response.success(200, response); - Response failResponse = - Response.error(500, ResponseBody.create(null, "Server Error")); - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchRetriesForHTTP429Errors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error 3 times. - Response successResponse = Response.success(200, response); - Response failResponse = - Response.error(429, ResponseBody.create(null, "Rate Limited")); - when(segmentService.upload(null, batch)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(failResponse)) - .thenReturn(Calls.response(successResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify that we tried to upload 4 times, 3 failed and 1 succeeded. - verify(segmentService, times(4)).upload(null, batch); - verify(callback).success(trackMessage); - } - - @Test - public void batchDoesNotRetryForNon5xxAndNon429HTTPErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - // Throw a HTTP error that should not be retried. - Response failResponse = - Response.error(404, ResponseBody.create(null, "Not Found")); - when(segmentService.upload(null, batch)).thenReturn(Calls.response(failResponse)); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify we only tried to upload once. - verify(segmentService).upload(null, batch); - verify(callback).failure(eq(trackMessage), any(IOException.class)); - } - - @Test - public void batchDoesNotRetryForNonNetworkErrors() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - Call networkFailure = Calls.failure(new RuntimeException()); - when(segmentService.upload(null, batch)).thenReturn(networkFailure); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, DEFAULT_RETRIES); - batchUploadTask.run(); - - // Verify we only tried to upload once. - verify(segmentService).upload(null, batch); - verify(callback).failure(eq(trackMessage), any(RuntimeException.class)); - } - - @Test - public void givesUpAfterMaxRetries() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 10); - batchUploadTask.run(); - - // DEFAULT_RETRIES == maxRetries - // tries 11(one normal run + 10 retries) even though default is 50 in AnalyticsClient.java - verify(segmentService, times(11)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("11 retries exhausted"); - } - })); - } - - @Test - public void hasDefaultRetriesSetTo3() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 3); - batchUploadTask.run(); - - // DEFAULT_RETRIES == maxRetries - // tries 11(one normal run + 10 retries) - verify(segmentService, times(4)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("4 retries exhausted"); - } - })); - } - - @Test - public void flushWhenNotShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - - client.flush(); - verify(messageQueue).put(POISON); - } - - @Test - public void flushWhenShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.flush(); - - verify(messageQueue, times(0)).put(any(Message.class)); - } - - @Test - public void enqueueWithRegularMessageWhenNotShutdown(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - - final Message message = builder.get().userId("foo").build(); - client.enqueue(message); - - verify(messageQueue).put(message); - } - - @Test - public void enqueueWithRegularMessageWhenShutdown(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.enqueue(builder.get().userId("foo").build()); - - verify(messageQueue, times(0)).put(any(Message.class)); - } - - @Test - public void enqueueWithStopMessageWhenShutdown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.enqueue(STOP); - - verify(messageQueue).put(STOP); - } - - @Test - public void shutdownWhenAlreadyShutDown() throws InterruptedException { - AnalyticsClient client = newClient(); - isShutDown.set(true); - - client.shutdown(); - - verify(messageQueue, times(0)).put(any(Message.class)); - verifyNoInteractions(networkExecutor, callback, segmentService); - } - - @Test - public void shutdownWithNoMessageInTheQueue() throws InterruptedException { - AnalyticsClient client = newClient(); - client.shutdown(); - - verify(messageQueue).put(STOP); - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - verifyNoMoreInteractions(networkExecutor); - } - - @Test - public void shutdownWithMessagesInTheQueue(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient client = newClient(); - - client.enqueue(builder.get().userId("foo").build()); - client.shutdown(); - - verify(messageQueue).put(STOP); - verify(networkExecutor).shutdown(); - verify(networkExecutor).awaitTermination(1, TimeUnit.SECONDS); - verify(networkExecutor).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - @Test - public void neverRetries() { - AnalyticsClient client = newClient(); - TrackMessage trackMessage = TrackMessage.builder("foo").userId("bar").build(); - Batch batch = batchFor(trackMessage); - - when(segmentService.upload(null, batch)) - .thenAnswer( - new Answer>() { - public Call answer(InvocationOnMock invocation) { - Response failResponse = - Response.error(429, ResponseBody.create(null, "Not Found")); - return Calls.response(failResponse); - } - }); - - BatchUploadTask batchUploadTask = new BatchUploadTask(client, BACKO, batch, 0); - batchUploadTask.run(); - - // runs once but never retries - verify(segmentService, times(1)).upload(null, batch); - verify(callback) - .failure( - eq(trackMessage), - argThat( - new ArgumentMatcher() { - @Override - public boolean matches(IOException exception) { - return exception.getMessage().equals("1 retries exhausted"); - } - })); - } - - /** - * ********************************************************************************************** - * Test cases for Size check - * ********************************************************************************************* - */ - - /** Individual Size check happy path regular chars */ - @Test - public void checkForIndividualMessageSizeLessThanLimit() { - AnalyticsClient client = newClient(); - int msgSize = 1024 * 31; // 31KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(msgSize)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isLessThanOrEqualTo(sizeLimit); - } - - /** Individual Size check sad path regular chars (over the limit) */ - @Test - public void checkForIndividualMessageSizeOverLimit() throws IllegalArgumentException { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE + 1; // BARELY over the limit - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSize(msgSize)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isGreaterThan(sizeLimit); - } - - /** Individual Size check happy path special chars */ - @Test - public void checkForIndividualMessageSizeSpecialCharsLessThanLimit() { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE; // 32KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - - Map properties = new HashMap(); - properties.put("property1", generateDataOfSizeSpecialChars(msgSize, true)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isLessThanOrEqualTo(sizeLimit); - } - - /** Individual Size check sad path special chars (over the limit) */ - @Test - public void checkForIndividualMessageSizeSpecialCharsAboveLimit() { - AnalyticsClient client = newClient(); - int msgSize = MAX_MSG_SIZE; // 32KB - int sizeLimit = MAX_MSG_SIZE; // 32KB = 32768 - Map properties = new HashMap(); - - properties.put("property1", generateDataOfSizeSpecialChars(msgSize, false)); - - TrackMessage bigMessage = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - - try { - client.enqueue(bigMessage); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - int msgActualSize = client.messageSizeInBytes(bigMessage); - assertThat(msgActualSize).isGreaterThan(sizeLimit); - } - - /** - * ***************************************************************************************************************** - * Test cases for enqueue modified logic - * *************************************************************************************************************** - */ - @Test - public void enqueueVerifyPoisonIsNotCheckedForSize() throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - clientSpy.enqueue(POISON); - verify(messageQueue).put(POISON); - verify(clientSpy, never()).messageSizeInBytes(POISON); - } - - @Test - public void enqueueVerifyStopIsNotCheckedForSize() throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - clientSpy.enqueue(STOP); - verify(messageQueue).put(STOP); - verify(clientSpy, never()).messageSizeInBytes(STOP); - } - - @Test - public void enqueueVerifyRegularMessageIsEnqueuedAndCheckedForSize(MessageBuilderTest builder) - throws InterruptedException { - AnalyticsClient clientSpy = spy(newClient()); - - Message message = builder.get().userId("jorgen25").build(); - clientSpy.enqueue(message); - verify(messageQueue).put(message); - verify(clientSpy, times(1)).messageSizeInBytes(message); - } - - /** - * This test case was to prove the limit in batch is not being respected so will probably delete - * it later NOTE: Used to be a test case created to prove huge messages above the limit are still - * being submitted in batch modified it to prove they are not anymore after changing logic in - * analyticsClient - * - * @param builder - * @throws InterruptedException - */ - @Test - public void enqueueSingleMessageAboveLimitWhenNotShutdown(MessageBuilderTest builder) - throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = newClient(); - - // Message is above batch limit - final String massData = generateDataOfSizeSpecialChars(MAX_MSG_SIZE, false); - Map integrationOpts = new HashMap<>(); - integrationOpts.put("massData", massData); - Message message = - builder.get().userId("foo").integrationOptions("someKey", integrationOpts).build(); - - try { - client.enqueue(message); - } catch (IllegalArgumentException e) { - assertThat(e).isExactlyInstanceOf(e.getClass()); - } - - wait(messageQueue); - - // Message is above MSG/BATCH size limit so it should not be put in queue - verify(messageQueue, never()).put(message); - // And since it was never in the queue, it was never submitted in batch - verify(networkExecutor, never()).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - @Test - public void enqueueVerifyRegularMessagesSpecialCharactersBelowLimit(MessageBuilderTest builder) - throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = newClient(); - int msgSize = 1024 * 18; // 18KB - - for (int i = 0; i < 2; i++) { - final String data = generateDataOfSizeSpecialChars(msgSize, true); - Map integrationOpts = new HashMap<>(); - integrationOpts.put("data", data); - Message message = - builder.get().userId("jorgen25").integrationOptions("someKey", integrationOpts).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - client.enqueue(POISON); - verify(messageQueue).put(POISON); - - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(1)).submit(any(AnalyticsClient.BatchUploadTask.class)); - } - - /** - * ****************************************************************************************************************** - * Test cases for Batch creation logic - * **************************************************************************************************************** - */ - - /** - * Several messages are enqueued and then submitted in a batch - * - * @throws InterruptedException - */ - @Test - public void submitBatchBelowThreshold() throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(((int) (MAX_MSG_SIZE * 0.9)), true)); - - for (int i = 0; i < 15; i++) { - TrackMessage bigMessage = - TrackMessage.builder("Big Event").userId("jorgen25").properties(properties).build(); - client.enqueue(bigMessage); - verify(messageQueue).put(bigMessage); - } - client.enqueue(POISON); - wait(messageQueue); - - client.shutdown(); - while (!isShutDown.get()) {} - verify(networkExecutor, times(1)).submit(any(Runnable.class)); - } - - /** - * Enqueued several messages above threshold of 500Kbs so queue gets backpressured at some point - * and several batches have to be created to not violate threshold - * - * @throws InterruptedException - */ - @Test - public void submitBatchAboveThreshold() throws InterruptedException, IllegalArgumentException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(MAX_MSG_SIZE, true)); - - for (int i = 0; i < 100; i++) { - TrackMessage message = - TrackMessage.builder("Big Event").userId("jorgen25").properties(properties).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(8)).submit(any(Runnable.class)); - } - - @Test - public void submitManySmallMessagesBatchAboveThreshold() throws InterruptedException { - AnalyticsClient client = - new AnalyticsClient( - messageQueue, - null, - segmentService, - 50, - TimeUnit.HOURS.toMillis(1), - 0, - MAX_BATCH_SIZE * 4, - log, - threadFactory, - networkExecutor, - Collections.singletonList(callback), - isShutDown, - writeKey, - new Gson()); - - Map properties = new HashMap(); - properties.put("property3", generateDataOfSizeSpecialChars(1024 * 8, true)); - - for (int i = 0; i < 600; i++) { - TrackMessage message = - TrackMessage.builder("Event").userId("jorgen25").properties(properties).build(); - client.enqueue(message); - verify(messageQueue).put(message); - } - wait(messageQueue); - client.shutdown(); - while (!isShutDown.get()) {} - - verify(networkExecutor, times(21)).submit(any(Runnable.class)); - } -}