Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,14 @@ private static void parseBuckets(SimpleOrderedMap<Object> solrFacets, FacetField

List<FacetField.Bucket> buckets = new ArrayList<>();
for (SimpleOrderedMap<Object> solrBucket: solrBuckets) {
int count = 0;
long count = 0;
String value = "";
FacetField subfield;
List<FacetField> subfields = new ArrayList<>();
for (int i = 0; i < solrBucket.size(); i++) {
String fullname = solrBucket.getName(i);
if ("count".equals(fullname)) {
count = (int) solrBucket.getVal(i);
count = ((Number) solrBucket.getVal(i)).longValue();
} else if ("val".equals(fullname)) {
value = solrBucket.getVal(i).toString();
} else {
Expand Down
139 changes: 121 additions & 18 deletions commons-lib/src/main/java/org/opencb/commons/ProgressLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package org.opencb.commons;

import org.apache.commons.lang3.tuple.Pair;
import org.opencb.commons.run.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.DecimalFormat;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -43,33 +45,43 @@ public class ProgressLogger {

private final String message;
private final int numLinesLog;
private final long logFrequencyMillis;
private boolean progressRateEnabled = true;
private long progressRateWindowSizeSeconds;
private boolean progressRateMillionHours = false; // If true, progress rate is in millions of elements per hour
private long totalCount;
private boolean isApproximated; // Total count is an approximated value
private final AtomicReference<Future<Long>> futureTotalCount = new AtomicReference<>();
private final AtomicLong count;
private final long startTime;
private final LinkedList<Pair<Long, Long>> times = new LinkedList<>();

private double batchSize;

private Logger logger = LoggerFactory.getLogger(ProgressLogger.class);

public ProgressLogger(String message) {
this(message, 0, null, 200);
this(message, 0, null, 200, 0);
}

public ProgressLogger(String message, long logFrequency, TimeUnit timeUnit) {
this(message, 0, null, 0, timeUnit.toMillis(logFrequency));
}

public ProgressLogger(String message, long totalCount) {
this(message, totalCount, null, 200);
this(message, totalCount, null, 200, 0);
}

public ProgressLogger(String message, long totalCount, int numLinesLog) {
this(message, totalCount, null, numLinesLog);
this(message, totalCount, null, numLinesLog, 0);
}

public ProgressLogger(String message, Future<Long> futureTotalCount) {
this(message, 0, futureTotalCount, 200);
this(message, 0, futureTotalCount, 200, 0);
}

public ProgressLogger(String message, Future<Long> futureTotalCount, int numLinesLog) {
this(message, 0, futureTotalCount, numLinesLog);
this(message, 0, futureTotalCount, numLinesLog, 0);
}

/**
Expand All @@ -79,10 +91,10 @@ public ProgressLogger(String message, Future<Long> futureTotalCount, int numLine
* @param numLinesLog Number of lines to print
*/
public ProgressLogger(String message, Callable<Long> totalCountCallable, int numLinesLog) {
this(message, 0, getFuture(totalCountCallable), numLinesLog);
this(message, 0, getFuture(totalCountCallable), numLinesLog, 0);
}

private ProgressLogger(String message, long totalCount, Future<Long> futureTotalCount, int numLinesLog) {
private ProgressLogger(String message, long totalCount, Future<Long> futureTotalCount, int numLinesLog, long logFrequencyMillis) {
if (message.endsWith(" ")) {
this.message = message;
} else {
Expand All @@ -92,12 +104,21 @@ private ProgressLogger(String message, long totalCount, Future<Long> futureTotal
this.totalCount = totalCount;
this.futureTotalCount.set(futureTotalCount);
this.count = new AtomicLong();
if (totalCount == 0) {
batchSize = DEFAULT_BATCH_SIZE;
if (logFrequencyMillis > 0) {
this.logFrequencyMillis = logFrequencyMillis;
batchSize = 0;
} else {
updateBatchSize();
// Avoid not logging for too long. Log at least once a minute by default
this.logFrequencyMillis = TimeUnit.MINUTES.toMillis(1);
if (totalCount == 0) {
batchSize = DEFAULT_BATCH_SIZE;
} else {
updateBatchSize();
}
}
isApproximated = false;
startTime = System.currentTimeMillis();
progressRateWindowSizeSeconds = 60;
}


Expand All @@ -118,6 +139,25 @@ public ProgressLogger setApproximateTotalCount(long aproximateTotalCount) {
return this;
}

public ProgressLogger setProgressRateWindowSize(int progressRateWindowSize, TimeUnit timeUnit) {
this.progressRateWindowSizeSeconds = timeUnit.toSeconds(progressRateWindowSize);
return this;
}

public ProgressLogger setProgressRateAtMillionsPerHours() {
return setProgressRateAtMillionsPerHours(true);
}

public ProgressLogger setProgressRateAtMillionsPerHours(boolean progressRateMillionHours) {
this.progressRateMillionHours = progressRateMillionHours;
return this;
}

public ProgressLogger disableProgressRate() {
this.progressRateEnabled = false;
return this;
}

public void increment(long delta) {
increment(delta, "", null);
}
Expand All @@ -135,13 +175,37 @@ private void increment(long delta, String message, Supplier<String> supplier) {
long count = previousCount + delta;

updateFutureTotalCount();
if ((int) (previousCount / batchSize) != (int) (count / batchSize) || count == totalCount && delta > 0) {
log(count, supplier == null ? message : supplier.get());
long currentTimeMillis = System.currentTimeMillis();
if (shouldLog(delta, previousCount, count, currentTimeMillis)) {
log(count, supplier == null ? message : supplier.get(), currentTimeMillis);
}
}

private boolean shouldLog(long delta, long previousCount, long count, long currentTimeMillis) {
if (batchSize > 0) {
if ((int) (previousCount / batchSize) != (int) (count / batchSize)) {
return true;
}
}
if (logFrequencyMillis > 0) {
long lastLogTime = times.isEmpty() ? startTime : times.getLast().getRight();
if (currentTimeMillis - lastLogTime > logFrequencyMillis) {
return true;
}
}
if (count == totalCount && delta > 0) {
return true;
}
return false;
}

protected synchronized void log(long count, String extraMessage) {
long totalCount = getTotalCount();
protected synchronized void log(long count, String extraMessage, long currentTimeMillis) {
times.add(Pair.of(count, currentTimeMillis));
if (times.size() > 5 && times.get(0).getRight() < currentTimeMillis - progressRateWindowSizeSeconds * 1000) {
// Remove old points that are outside the progress rate window
times.removeFirst();
}
long totalCount = this.totalCount;

StringBuilder sb = new StringBuilder(message).append(count);
if (totalCount > 0) {
Expand All @@ -152,6 +216,45 @@ protected synchronized void log(long count, String extraMessage) {
}
sb.append(totalCount).append(' ').append(DECIMAL_FORMAT.format(((float) (count)) / totalCount));
}
if (progressRateEnabled) {
float elapsedTime = (float) (currentTimeMillis - startTime) / 1000;
float progressRate = count / elapsedTime; // elements per second
boolean addRelativeTime = times.size() > 5 && elapsedTime > progressRateWindowSizeSeconds;
float relativeTime;
float relativeProgressRate; // elements per second
if (addRelativeTime) {
int idx = 5;
do {
Pair<Long, Long> relativePoint = times.get(times.size() - idx);
relativeTime = (float) (currentTimeMillis - relativePoint.getRight()) / 1000;
relativeProgressRate = (count - relativePoint.getLeft()) / relativeTime;
} while (relativeTime < progressRateWindowSizeSeconds && idx++ < times.size());

} else {
relativeTime = 0;
relativeProgressRate = 0;
}
String progressRateUnits;
String rateFormat;
if (progressRateMillionHours) {
progressRateUnits = "M/h";
rateFormat = "%.2f";
progressRate = (progressRate / 1_000_000) * 3600; // Convert to millions per hour
relativeProgressRate = (relativeProgressRate / 1_000_000) * 3600; // Convert to millions per hour
} else {
progressRateUnits = "elements/s";
rateFormat = "%.0f";
}
sb.append(" in ")
.append(String.format("%.2f", elapsedTime)).append("s (")
.append(String.format(rateFormat, progressRate)).append(" " + progressRateUnits + ")");
if (addRelativeTime) {
sb.append(", (")
.append(String.format(rateFormat, relativeProgressRate)).append(" " + progressRateUnits + " in last ")
.append(String.format("%.2f", relativeTime)).append("s")
.append(')');
}
}
if (!extraMessage.isEmpty() && (!extraMessage.startsWith(" ") && !extraMessage.startsWith(",") && !extraMessage.startsWith("."))) {
sb.append(' ');
}
Expand Down Expand Up @@ -181,10 +284,6 @@ private void updateFutureTotalCount() {
}
}

private long getTotalCount() {
return this.totalCount;
}

private void updateBatchSize() {
batchSize = Math.max((double) totalCount / numLinesLog, MIN_BATCH_SIZE);
}
Expand All @@ -196,6 +295,10 @@ private static Future<Long> getFuture(Callable<Long> totalCountCallable) {
return future;
}

public long getCount() {
return count.get();
}

public <T> Task<T, T> asTask() {
return asTask(null);
}
Expand Down
18 changes: 14 additions & 4 deletions commons-lib/src/main/java/org/opencb/commons/io/DataReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

import org.opencb.commons.run.Task;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -163,4 +161,16 @@ public T next() {
};
}

@Override
default void forEach(Consumer<? super T> action) {
forEach(action, 1);
}

default void forEach(Consumer<? super T> action, int batchSize) {
Objects.requireNonNull(action);
for (Iterator<T> iterator = this.iterator(batchSize); iterator.hasNext();) {
T t = iterator.next();
action.accept(t);
}
}
}
57 changes: 57 additions & 0 deletions commons-lib/src/main/java/org/opencb/commons/io/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,61 @@ public void post() throws Exception {
};
}

default DataWriter<T> then(DataWriter<T> nextWriter) {
return then(nextWriter.asTask());
}

default DataWriter<T> then(Task<T, ?> nextTask) {
return new DataWriter<T>() {
@Override
public boolean open() {
return DataWriter.this.open();
}

@Override
public boolean pre() {
boolean res = DataWriter.this.pre();
try {
nextTask.pre();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
return res;
}

@Override
public boolean close() {
return DataWriter.this.close();
}

@Override
public boolean post() {
boolean res = DataWriter.this.post();
try {
nextTask.post();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
return res;
}

@Override
public boolean write(List<T> batch) {
boolean res = DataWriter.this.write(batch);
try {
nextTask.apply(batch);
return res;
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}

}
Loading
Loading