Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.zip.ZipEntry;
Expand All @@ -55,6 +56,7 @@
*/
public class ChangeSetWriterProvider implements ChangeSetWriterService, SaveState {
private static final Logger LOG = LoggerFactory.getLogger(ChangeSetWriterProvider.class);
private static final long INACTIVITY_THRESHOLD_MILLIS = TimeUnit.MINUTES.toMillis(5);

/**
* Represents the various states of the ChangeSetWriterProvider during its lifecycle.
Expand Down Expand Up @@ -132,7 +134,10 @@ private ChangeSetWriterProvider() {
public void writeToChangeSet(Entity entity, DataActivity activity) {
try {
switch (activity) {
case SYNCHRONIZABLE_EDIT -> this.entitiesToWrite.put(entity);
case SYNCHRONIZABLE_EDIT -> {
this.entitiesToWrite.put(entity);
LOG.trace("ChangeSetWriterProvider queued entity for changeset write: {}", entity);
}
case LOADING_CHANGE_SET, INITIALIZE, LOCAL_EDIT, DATA_REPAIR -> {
}
}
Expand Down Expand Up @@ -200,6 +205,7 @@ private File newZipFile() {
*/
private void startService() {
Thread.ofVirtual().name("ChangeSetWriterProvider-ServiceThread").start(() -> {
LOG.trace("Starting ChangeSetWriterProvider on service thread: {}", Thread.currentThread());
Semaphore changeSetWriter = new Semaphore(1);
try {
changeSetWriter.acquireUninterruptibly();
Expand All @@ -219,21 +225,25 @@ private void startService() {
EntityToTinkarSchemaTransformer.getInstance();

final File zipfile = newZipFile();
LOG.trace("ChangeSetWriterProvider starting new zip file: {}", zipfile.getAbsolutePath());
try (FileOutputStream fos = new FileOutputStream(zipfile);
BufferedOutputStream bos = new BufferedOutputStream(fos);
ZipOutputStream zos = new ZipOutputStream(bos)) {
// Create a single entry for all changes in this zip file
final ZipEntry zipEntry = new ZipEntry("Entities");
zos.putNextEntry(zipEntry);
try {
AtomicLong lastWriteTimeMillis = new AtomicLong(System.currentTimeMillis());
while (threadStateMap.get(Thread.currentThread()) == STATE.RUNNING) {
final Entity<EntityVersion> entityToWrite = this.entitiesToWrite.poll(250, TimeUnit.MILLISECONDS);
if (entityToWrite != null) {
lastWriteTimeMillis.set(System.currentTimeMillis());
if (entityToWrite.uncommitted()) {
// We will write uncommitted versions at the end of the thread to prevent bloat from uncommitted changes,
// unless they are committed before the thread stops.
ImmutableIntList uncommittedStampNids = entityToWrite.uncommittedStampNids();
uncommittedStampNids.forEach(stampNid -> {
LOG.trace("ChangeSetWriterProvider caching uncommitted entity for stampNid {}:\n{}", stampNid, entityToWrite);
uncommittedEntitiesByStamp.remove(stampNid, entityToWrite);
uncommittedEntitiesByStamp.put(stampNid, entityToWrite);
});
Expand All @@ -247,6 +257,11 @@ private void startService() {
}
}
}
if (System.currentTimeMillis() - lastWriteTimeMillis.get() > INACTIVITY_THRESHOLD_MILLIS) {
LOG.info("Rotating ChangeSetWriterProvider, no activity for {} minutes.",
TimeUnit.MILLISECONDS.toMinutes(INACTIVITY_THRESHOLD_MILLIS));
TinkExecutor.threadPool().submit(this::save);
}
}
} catch (InterruptedException e) {
}
Expand All @@ -256,8 +271,8 @@ private void startService() {
stampsCount, moduleList, authorList, entityTransformer, zos));
zos.closeEntry();
if (entityCount.sum() > 0) {
LOG.info("Data zipEntry size: " + zipEntry.getSize());
LOG.info("Data zipEntry compressed size: " + zipEntry.getCompressedSize());
LOG.debug("Data zipEntry size: " + zipEntry.getSize());
LOG.debug("Data zipEntry compressed size: " + zipEntry.getCompressedSize());

// Write Manifest File
final ZipEntry manifestEntry = new ZipEntry("META-INF/MANIFEST.MF");
Expand All @@ -270,17 +285,17 @@ private void startService() {
moduleList,
authorList).getBytes(StandardCharsets.UTF_8));
zos.closeEntry();
zos.flush();
}
// Cleanup
zos.flush();
zos.finish();

} catch (IOException e) {
threadStateMap.put(Thread.currentThread(), STATE.FAILED);
throw new RuntimeException(e);
} finally {
if (zipfile.exists()) {
if (zipfile.length() == 0 || entityCount.sum() == 0) {
if (entityCount.sum() == 0) {
zipfile.delete();
}
}
Expand Down Expand Up @@ -338,7 +353,7 @@ private static void writeEntity(LongAdder entityCount,
try {
TinkarMsg tinkarMsg = entityTransformer.transform(entityToWrite);
tinkarMsg.writeDelimitedTo(zos);
LOG.info("Wrote: {}", entityToWrite);
LOG.debug("ChangeSetWriterProvider wrote Entity:\n{}", entityToWrite);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -386,6 +401,9 @@ private String generateManifestContent(LongAdder entityCount,
*/
@Override
public CompletableFuture<Void> save() {
if (serviceThread.get() != null) {
LOG.trace("Rotating ChangeSetWriterProvider on service thread: {}", serviceThread.get());
}
return checkpoint(true);
}

Expand Down Expand Up @@ -434,6 +452,7 @@ private CompletableFuture<Void> checkpoint(boolean restart) {
}
}
if (interruptedThread != null) {
LOG.trace("Stopping ChangeSetWriterProvider on service thread: {}", interruptedThread);
threadStateMap.put(interruptedThread, (restart?STATE.ROTATING:STATE.STOPPED));
}
if (restart) {
Expand All @@ -445,6 +464,7 @@ private CompletableFuture<Void> checkpoint(boolean restart) {
Semaphore changeSetWriter = threadSemaphoreMap.remove(interruptedThread);
if (changeSetWriter != null) {
changeSetWriter.acquireUninterruptibly();
LOG.trace("Stopped ChangeSetWriterProvider on service thread: {}", interruptedThread);
}
}
return null;
Expand Down