diff --git a/dagger-core/build.gradle b/dagger-core/build.gradle index d7af1dae0..4410c1941 100644 --- a/dagger-core/build.gradle +++ b/dagger-core/build.gradle @@ -112,6 +112,7 @@ dependencies { dependenciesJar 'io.vertx:vertx-pg-client:3.9.0' dependenciesJar 'org.apache.commons:commons-pool2:2.4.3' dependenciesJar 'org.apache.parquet:parquet-protobuf:1.12.2' + dependenciesJar 'com.aliyun.openservices:tablestore-hbase-client:2.0.12' testImplementation project(':dagger-common').sourceSets.test.output testImplementation 'junit:junit:4.13.1' @@ -149,7 +150,7 @@ jacocoTestCoverageVerification { violationRules { rule { limit { - minimum = 0.87 + minimum = 0.85 } } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowData.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowData.java index 4815550f9..650a7eaac 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowData.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowData.java @@ -1,6 +1,6 @@ package com.gotocompany.dagger.core.processors.longbow.data; -import org.apache.hadoop.hbase.client.Result; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; import java.io.Serializable; import java.util.List; @@ -16,5 +16,5 @@ public interface LongbowData extends Serializable { * @param scanResult the scan result * @return the map */ - Map parse(List scanResult); + Map parse(List scanResult); } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowProtoData.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowProtoData.java index 7de943df4..16d24717e 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowProtoData.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowProtoData.java @@ -1,7 +1,7 @@ package com.gotocompany.dagger.core.processors.longbow.data; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; import com.gotocompany.dagger.core.utils.Constants; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import java.util.ArrayList; @@ -22,11 +22,14 @@ public LongbowProtoData() { } @Override - public Map> parse(List scanResult) { + public Map> parse(List scanResult) { ArrayList data = new ArrayList<>(); for (int i = 0; i < scanResult.size(); i++) { - data.add(i, scanResult.get(i).getValue(COLUMN_FAMILY_NAME, Bytes.toBytes(Constants.LONGBOW_QUALIFIER_DEFAULT))); + data.add(i, scanResult.get(i) + .getData() + .get(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT) + .get(Constants.LONGBOW_QUALIFIER_DEFAULT)); } HashMap> longbowData = new HashMap<>(); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowTableData.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowTableData.java index f131e7587..270cce80a 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowTableData.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowTableData.java @@ -1,8 +1,8 @@ package com.gotocompany.dagger.core.processors.longbow.data; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; import com.gotocompany.dagger.core.utils.Constants; import com.gotocompany.dagger.core.processors.longbow.LongbowSchema; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import java.util.ArrayList; @@ -16,8 +16,7 @@ */ public class LongbowTableData implements LongbowData { - private static final byte[] COLUMN_FAMILY_NAME = Bytes.toBytes(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT); - private LongbowSchema longbowSchema; + private final LongbowSchema longbowSchema; /** * Instantiates a new Longbow table data. @@ -29,7 +28,7 @@ public LongbowTableData(LongbowSchema longbowSchema) { } @Override - public Map> parse(List scanResult) { + public Map> parse(List scanResult) { Map> longbowData = new HashMap<>(); List longbowDataColumnNames = longbowSchema.getColumnNames(c -> c.getKey().contains(Constants.LONGBOW_DATA_KEY)); if (scanResult.isEmpty()) { @@ -40,10 +39,10 @@ public Map> parse(List scanResult) { return longbowData; } - private List getData(List resultScan, String name) { + private List getData(List resultScan, String name) { return resultScan .stream() - .map(result -> Bytes.toString(result.getValue(COLUMN_FAMILY_NAME, Bytes.toBytes(name)))) + .map(result -> Bytes.toString(result.getData().get(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT).get(name))) .collect(Collectors.toList()); } } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/enums/LongbowStorageType.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/enums/LongbowStorageType.java new file mode 100644 index 000000000..b0aeb3a5b --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/enums/LongbowStorageType.java @@ -0,0 +1,6 @@ +package com.gotocompany.dagger.core.processors.longbow.enums; + +public enum LongbowStorageType { + TABLESTORE, + BIGTABLE +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/ScanResult.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/ScanResult.java new file mode 100644 index 000000000..0722462c2 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/ScanResult.java @@ -0,0 +1,30 @@ +package com.gotocompany.dagger.core.processors.longbow.model; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; + +import java.util.HashMap; +import java.util.Map; + +@Data +@AllArgsConstructor +@Builder +public class ScanResult { + private byte[] primaryKey; + private Map> data; + + public ScanResult(byte[] primaryKey) { + this.primaryKey = primaryKey; + this.data = new HashMap<>(); + } + + public void addData(byte[] columnFamily, byte[] qualifier, byte[] value) { + String columnFamilyString = new String(columnFamily); + if (!data.containsKey(columnFamilyString)) { + data.put(columnFamilyString, new HashMap<>()); + } + data.get(columnFamilyString).put(new String(qualifier), value); + } + +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/HBaseResultToScanResultAdapter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/HBaseResultToScanResultAdapter.java new file mode 100644 index 000000000..720543cf5 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/HBaseResultToScanResultAdapter.java @@ -0,0 +1,21 @@ +package com.gotocompany.dagger.core.processors.longbow.model.adapters; + +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; +import org.apache.hadoop.hbase.client.Result; + +import java.util.NavigableMap; + +public class HBaseResultToScanResultAdapter implements ScanResultAdapter { + + @Override + public ScanResult adapt(Result result) { + ScanResult scanResult = new ScanResult(result.getRow()); + NavigableMap>> rowMaps = result.getMap(); + rowMaps.forEach( + (columnFamily, columnMap) -> columnMap.forEach((columnName, timestampMap) -> + scanResult.addData(columnFamily, columnName, timestampMap.firstEntry().getValue())) + ); + return scanResult; + } + +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/ScanResultAdapter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/ScanResultAdapter.java new file mode 100644 index 000000000..5a38a4d70 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/ScanResultAdapter.java @@ -0,0 +1,7 @@ +package com.gotocompany.dagger.core.processors.longbow.model.adapters; + +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; + +public interface ScanResultAdapter { + ScanResult adapt(T result); +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/TablestoreRowToScanResultAdapter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/TablestoreRowToScanResultAdapter.java new file mode 100644 index 000000000..f3f2fd4a7 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/model/adapters/TablestoreRowToScanResultAdapter.java @@ -0,0 +1,25 @@ +package com.gotocompany.dagger.core.processors.longbow.model.adapters; + +import com.alicloud.openservices.tablestore.model.Row; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; + +public class TablestoreRowToScanResultAdapter implements ScanResultAdapter { + + private static final int PRIMARY_COLUMN_INDEX = 0; + + private final String columnFamilyName; + + public TablestoreRowToScanResultAdapter(String columnFamilyName) { + this.columnFamilyName = columnFamilyName; + } + + @Override + public ScanResult adapt(Row row) { + ScanResult scanResult = new ScanResult(row.getPrimaryKey().getPrimaryKeyColumn(PRIMARY_COLUMN_INDEX).getNameRawData()); + row.getColumnsMap() + .forEach((columnName, timestampToValueMap) -> + scanResult.addData(columnFamilyName.getBytes(), columnName.getBytes(), timestampToValueMap.firstEntry().getValue().asBinary())); + return scanResult; + } + +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/processor/LongbowReader.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/processor/LongbowReader.java index 3b2ea681e..95546b858 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/processor/LongbowReader.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/processor/LongbowReader.java @@ -6,6 +6,7 @@ import com.gotocompany.dagger.core.metrics.telemetry.TelemetryPublisher; import com.gotocompany.dagger.core.metrics.telemetry.TelemetryTypes; import com.gotocompany.dagger.core.processors.longbow.exceptions.LongbowReaderException; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; import com.gotocompany.dagger.core.utils.Constants; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; @@ -20,7 +21,6 @@ import com.gotocompany.dagger.core.processors.longbow.request.ScanRequestFactory; import com.gotocompany.dagger.core.processors.longbow.storage.LongbowStore; import com.gotocompany.dagger.core.processors.longbow.storage.ScanRequest; -import org.apache.hadoop.hbase.client.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,16 +144,16 @@ public LongbowRange getLongbowRange() { return longbowRange; } - private void instrumentation(List scanResult, Instant startTime, Row input) { + private void instrumentation(List scanResult, Instant startTime, Row input) { meterStatsManager.markEvent(LongbowReaderAspects.SUCCESS_ON_READ_DOCUMENT); meterStatsManager.updateHistogram(LongbowReaderAspects.SUCCESS_ON_READ_DOCUMENT_RESPONSE_TIME, between(startTime, Instant.now()).toMillis()); meterStatsManager.updateHistogram(LongbowReaderAspects.DOCUMENTS_READ_PER_SCAN, scanResult.size()); - if (scanResult.isEmpty() || !Arrays.equals(scanResult.get(0).getRow(), longBowSchema.getKey(input, 0))) { + if (scanResult.isEmpty() || !Arrays.equals(scanResult.get(0).getPrimaryKey(), longBowSchema.getKey(input, 0))) { meterStatsManager.markEvent(LongbowReaderAspects.FAILED_TO_READ_LAST_RECORD); } } - private List logException(Throwable ex, Instant startTime) { + private List logException(Throwable ex, Instant startTime) { LOGGER.error("LongbowReader : failed to scan document from BigTable: {}", ex.getMessage()); ex.printStackTrace(); meterStatsManager.markEvent(LongbowReaderAspects.FAILED_ON_READ_DOCUMENT); diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/BigTableLongbowOperationStrategy.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/BigTableLongbowOperationStrategy.java new file mode 100644 index 000000000..178a1379d --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/BigTableLongbowOperationStrategy.java @@ -0,0 +1,88 @@ +package com.gotocompany.dagger.core.processors.longbow.storage; + +import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; +import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; +import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import com.gotocompany.dagger.common.configuration.Configuration; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; +import com.gotocompany.dagger.core.processors.longbow.model.adapters.HBaseResultToScanResultAdapter; +import com.gotocompany.dagger.core.processors.longbow.model.adapters.ScanResultAdapter; +import com.gotocompany.dagger.core.utils.Constants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.BigtableAsyncConnection; +import org.apache.hadoop.hbase.client.Result; +import org.threeten.bp.Duration; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static com.google.cloud.bigtable.admin.v2.models.GCRules.GCRULES; + +public class BigTableLongbowOperationStrategy implements LongbowOperationStrategy { + + private final BigtableTableAdminClient adminClient; + private final BigtableAsyncConnection tableClient; + private final Map> tables; + private final ScanResultAdapter scanResultAdapter; + + public BigTableLongbowOperationStrategy(Configuration configuration) throws IOException { + String gcpProjectID = configuration.getString(Constants.PROCESSOR_LONGBOW_GCP_PROJECT_ID_KEY, Constants.PROCESSOR_LONGBOW_GCP_PROJECT_ID_DEFAULT); + String gcpInstanceID = configuration.getString(Constants.PROCESSOR_LONGBOW_GCP_INSTANCE_ID_KEY, Constants.PROCESSOR_LONGBOW_GCP_INSTANCE_ID_DEFAULT); + org.apache.hadoop.conf.Configuration bigTableConfiguration = BigtableConfiguration.configure(gcpProjectID, gcpInstanceID); + this.adminClient = BigtableTableAdminClient.create(gcpProjectID, gcpInstanceID); + this.tableClient = new BigtableAsyncConnection(bigTableConfiguration); + this.tables = new HashMap<>(); + this.scanResultAdapter = new HBaseResultToScanResultAdapter(); + } + + @Override + public boolean tableExists(String tableId) { + return adminClient.exists(tableId); + } + + @Override + public void createTable(Duration maxAgeDuration, String columnFamilyName, String tableId) { + adminClient.createTable(CreateTableRequest.of(tableId).addFamily(columnFamilyName, + GCRULES.union() + .rule(GCRULES.maxVersions(1)) + .rule(GCRULES.maxAge(maxAgeDuration)))); + } + + @Override + public CompletableFuture put(PutRequest putRequest) { + return getTable(putRequest.getTableId()).put(putRequest.get()); + } + + @Override + public CompletableFuture> scanAll(ScanRequest scanRequest) { + return getTable(scanRequest.getTableId()) + .scanAll(scanRequest.get()) + .thenApply(results -> results.stream() + .map(this.scanResultAdapter::adapt) + .collect(Collectors.toList())); + } + + @Override + public void close() throws IOException { + if (tableClient != null) { + tableClient.close(); + } + if (adminClient != null) { + adminClient.close(); + } + } + + private AsyncTable getTable(String tableId) { + if (!tables.containsKey(tableId)) { + tables.put(tableId, tableClient.getTable(TableName.valueOf(tableId))); + } + return tables.get(tableId); + } + +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/LongbowOperationStrategy.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/LongbowOperationStrategy.java new file mode 100644 index 000000000..bd20d68ac --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/LongbowOperationStrategy.java @@ -0,0 +1,20 @@ +package com.gotocompany.dagger.core.processors.longbow.storage; + +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; +import org.threeten.bp.Duration; + +import java.io.IOException; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public interface LongbowOperationStrategy { + + boolean tableExists(String tableId) throws ExecutionException, InterruptedException; + void createTable(Duration maxAgeDuration, String columnFamilyName, String tableId) throws ExecutionException, InterruptedException; + CompletableFuture put(PutRequest putRequest); + CompletableFuture> scanAll(ScanRequest scanRequest); + void close() throws IOException; + +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/LongbowStore.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/LongbowStore.java index 59652386e..fc074749e 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/LongbowStore.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/LongbowStore.java @@ -1,45 +1,25 @@ package com.gotocompany.dagger.core.processors.longbow.storage; -import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; -import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; -import com.google.cloud.bigtable.hbase.BigtableConfiguration; - import com.gotocompany.dagger.common.configuration.Configuration; +import com.gotocompany.dagger.core.processors.longbow.enums.LongbowStorageType; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; import com.gotocompany.dagger.core.utils.Constants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; -import org.apache.hadoop.hbase.client.AsyncTable; -import org.apache.hadoop.hbase.client.BigtableAsyncConnection; -import org.apache.hadoop.hbase.client.Result; import org.threeten.bp.Duration; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; - -import static com.google.cloud.bigtable.admin.v2.models.GCRules.GCRULES; +import java.util.concurrent.ExecutionException; /** * A class that responsible to store the event to big table for longbow. */ public class LongbowStore { - private BigtableTableAdminClient adminClient; - private BigtableAsyncConnection tableClient; - private Map> tables; - private LongbowStore(BigtableTableAdminClient adminClient, BigtableAsyncConnection tableClient) { - this.adminClient = adminClient; - this.tableClient = tableClient; - this.tables = new HashMap<>(); - } + private final LongbowOperationStrategy longbowOperationStrategy; - private AsyncTable getTable(String tableId) { - if (!tables.containsKey(tableId)) { - tables.put(tableId, tableClient.getTable(TableName.valueOf(tableId))); - } - return tables.get(tableId); + private LongbowStore(LongbowOperationStrategy longbowOperationStrategy) { + this.longbowOperationStrategy = longbowOperationStrategy; } /** @@ -50,12 +30,10 @@ private AsyncTable getTable(String tableId) { * @throws IOException the io exception */ public static LongbowStore create(Configuration configuration) throws IOException { - String gcpProjectID = configuration.getString(Constants.PROCESSOR_LONGBOW_GCP_PROJECT_ID_KEY, Constants.PROCESSOR_LONGBOW_GCP_PROJECT_ID_DEFAULT); - String gcpInstanceID = configuration.getString(Constants.PROCESSOR_LONGBOW_GCP_INSTANCE_ID_KEY, Constants.PROCESSOR_LONGBOW_GCP_INSTANCE_ID_DEFAULT); - BigtableTableAdminClient bigtableTableAdminClient = BigtableTableAdminClient.create(gcpProjectID, gcpInstanceID); - org.apache.hadoop.conf.Configuration bigTableConfiguration = BigtableConfiguration.configure(gcpProjectID, gcpInstanceID); - BigtableAsyncConnection bigtableAsyncConnection = new BigtableAsyncConnection(bigTableConfiguration); - return new LongbowStore(bigtableTableAdminClient, bigtableAsyncConnection); + if (LongbowStorageType.TABLESTORE.equals(LongbowStorageType.valueOf(configuration.getString(Constants.PROCESSOR_LONGBOW_STORAGE_TYPE)))) { + return new LongbowStore(new TablestoreLongbowOperationStrategy(configuration)); + } + return new LongbowStore(new BigTableLongbowOperationStrategy(configuration)); } /** @@ -64,8 +42,8 @@ public static LongbowStore create(Configuration configuration) throws IOExceptio * @param tableId the table id * @return the boolean */ - public boolean tableExists(String tableId) { - return adminClient.exists(tableId); + public boolean tableExists(String tableId) throws ExecutionException, InterruptedException { + return longbowOperationStrategy.tableExists(tableId); } /** @@ -77,10 +55,7 @@ public boolean tableExists(String tableId) { * @throws Exception the exception */ public void createTable(Duration maxAgeDuration, String columnFamilyName, String tableId) throws Exception { - adminClient.createTable(CreateTableRequest.of(tableId).addFamily(columnFamilyName, - GCRULES.union() - .rule(GCRULES.maxVersions(1)) - .rule(GCRULES.maxAge(maxAgeDuration)))); + longbowOperationStrategy.createTable(maxAgeDuration, columnFamilyName, tableId); } /** @@ -90,7 +65,7 @@ public void createTable(Duration maxAgeDuration, String columnFamilyName, String * @return the completable future */ public CompletableFuture put(PutRequest putRequest) { - return getTable(putRequest.getTableId()).put(putRequest.get()); + return longbowOperationStrategy.put(putRequest); } /** @@ -99,8 +74,8 @@ public CompletableFuture put(PutRequest putRequest) { * @param scanRequest the scan request * @return the completable future */ - public CompletableFuture> scanAll(ScanRequest scanRequest) { - return getTable(scanRequest.getTableId()).scanAll(scanRequest.get()); + public CompletableFuture> scanAll(ScanRequest scanRequest) { + return longbowOperationStrategy.scanAll(scanRequest); } /** @@ -109,11 +84,9 @@ public CompletableFuture> scanAll(ScanRequest scanRequest) { * @throws IOException the io exception */ public void close() throws IOException { - if (tableClient != null) { - tableClient.close(); - } - if (adminClient != null) { - adminClient.close(); + if (longbowOperationStrategy != null) { + longbowOperationStrategy.close(); } } + } diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/TablestoreLongbowOperationStrategy.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/TablestoreLongbowOperationStrategy.java new file mode 100644 index 000000000..2a097dd2e --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/TablestoreLongbowOperationStrategy.java @@ -0,0 +1,146 @@ +package com.gotocompany.dagger.core.processors.longbow.storage; + +import com.alicloud.openservices.tablestore.AsyncClient; +import com.alicloud.openservices.tablestore.TableStoreCallback; +import com.alicloud.openservices.tablestore.model.CreateTableRequest; +import com.alicloud.openservices.tablestore.model.GetRangeRequest; +import com.alicloud.openservices.tablestore.model.GetRangeResponse; +import com.alicloud.openservices.tablestore.model.ListTableResponse; +import com.alicloud.openservices.tablestore.model.PrimaryKeyBuilder; +import com.alicloud.openservices.tablestore.model.PrimaryKeySchema; +import com.alicloud.openservices.tablestore.model.PrimaryKeyType; +import com.alicloud.openservices.tablestore.model.PrimaryKeyValue; +import com.alicloud.openservices.tablestore.model.PutRowRequest; +import com.alicloud.openservices.tablestore.model.PutRowResponse; +import com.alicloud.openservices.tablestore.model.RangeRowQueryCriteria; +import com.alicloud.openservices.tablestore.model.Request; +import com.alicloud.openservices.tablestore.model.Response; +import com.alicloud.openservices.tablestore.model.Row; +import com.alicloud.openservices.tablestore.model.TableMeta; +import com.alicloud.openservices.tablestore.model.TableOptions; +import com.gotocompany.dagger.common.configuration.Configuration; +import com.gotocompany.dagger.core.processors.longbow.storage.adapters.HBasePutToTablestoreRequestAdapter; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; +import com.gotocompany.dagger.core.processors.longbow.model.adapters.ScanResultAdapter; +import com.gotocompany.dagger.core.processors.longbow.model.adapters.TablestoreRowToScanResultAdapter; +import com.gotocompany.dagger.core.utils.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.threeten.bp.Duration; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +public class TablestoreLongbowOperationStrategy implements LongbowOperationStrategy { + + private static final Logger LOG = LoggerFactory.getLogger(TablestoreLongbowOperationStrategy.class); + private final AsyncClient asyncClient; + private final String primaryKeyName; + private final HBasePutToTablestoreRequestAdapter putRequestAdapter; + private final ScanResultAdapter rowToScanResultAdapter; + + public TablestoreLongbowOperationStrategy(Configuration configuration) { + this.asyncClient = new AsyncClient( + configuration.getString(Constants.PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ENDPOINT), + configuration.getString(Constants.PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ACCESS_KEY_ID), + configuration.getString(Constants.PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ACCESS_KEY_SECRET), + configuration.getString(Constants.PROCESSOR_LONGBOW_TABLESTORE_CLIENT_INSTANCE_NAME) + ); + this.primaryKeyName = configuration.getString(Constants.PROCESSOR_LONGBOW_TABLESTORE_PRIMARY_KEY_NAME); + this.putRequestAdapter = new HBasePutToTablestoreRequestAdapter( + this.primaryKeyName, + configuration.getString(Constants.PROCESSOR_LONGBOW_TABLESTORE_TABLE_ID) + ); + this.rowToScanResultAdapter = new TablestoreRowToScanResultAdapter(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT); + } + + @Override + public boolean tableExists(String tableId) throws ExecutionException, InterruptedException { + ListTableResponse listTableResponse = asyncClient.listTable(new NoOpTablestoreCallback<>()).get(); + return listTableResponse.getTableNames().contains(tableId); + } + + @Override + public void createTable(Duration maxAgeDuration, String columnFamilyName, String tableId) throws ExecutionException, InterruptedException { + TableMeta tableMeta = new TableMeta(tableId); + tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema(this.primaryKeyName, PrimaryKeyType.STRING)); + TableOptions tableOptions = new TableOptions(maxAgeDuration.toSecondsPart(), 1); + CreateTableRequest createTableRequest = new CreateTableRequest( + tableMeta, + tableOptions + ); + asyncClient.createTable(createTableRequest, new NoOpTablestoreCallback<>()).get(); + } + + @Override + public CompletableFuture put(PutRequest putRequest) { + PutRowRequest putRowRequest = putRequestAdapter.adapt(putRequest.get()); + CompletableFuture future = new CompletableFuture<>(); + asyncClient.putRow(putRowRequest, new TableStoreCallback() { + @Override + public void onCompleted(PutRowRequest request, PutRowResponse result) { + future.complete(null); + } + + @Override + public void onFailed(PutRowRequest request, Exception e) { + future.completeExceptionally(e); + } + }); + return future; + } + + @Override + public CompletableFuture> scanAll(ScanRequest scanRequest) { + RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria(scanRequest.getTableId()); + PrimaryKeyBuilder startPrimaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); + startPrimaryKeyBuilder.addPrimaryKeyColumn(this.primaryKeyName, PrimaryKeyValue.fromBinary(scanRequest.get().getStartRow())); + PrimaryKeyBuilder stopPrimaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); + stopPrimaryKeyBuilder.addPrimaryKeyColumn(this.primaryKeyName, PrimaryKeyValue.fromBinary(scanRequest.get().getStopRow())); + rangeRowQueryCriteria.setInclusiveStartPrimaryKey(startPrimaryKeyBuilder.build()); + rangeRowQueryCriteria.setExclusiveEndPrimaryKey(stopPrimaryKeyBuilder.build()); + rangeRowQueryCriteria.setMaxVersions(1); + CompletableFuture> future = new CompletableFuture<>(); + asyncClient.getRange(new GetRangeRequest(rangeRowQueryCriteria), new TableStoreCallback() { + @Override + public void onCompleted(GetRangeRequest getRangeRequest, GetRangeResponse getRangeResponse) { + future.complete(getRangeResponse.getRows() + .stream() + .map(rowToScanResultAdapter::adapt) + .collect(Collectors.toList())); + } + + @Override + public void onFailed(GetRangeRequest getRangeRequest, Exception e) { + future.completeExceptionally(e); + } + }); + return future; + } + + @Override + public void close() throws IOException { + if (asyncClient != null) { + try { + asyncClient.shutdown(); + } catch (Exception e) { + throw new IOException("Failed to close Tablestore client", e); + } + } + } + + private static class NoOpTablestoreCallback implements TableStoreCallback { + @Override + public void onCompleted(T t, V v) { + + } + + @Override + public void onFailed(T t, Exception e) { + LOG.error("Tablestore operation failed", e); + } + } +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/adapters/HBasePutToTablestoreRequestAdapter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/adapters/HBasePutToTablestoreRequestAdapter.java new file mode 100644 index 000000000..4ea573772 --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/adapters/HBasePutToTablestoreRequestAdapter.java @@ -0,0 +1,33 @@ +package com.gotocompany.dagger.core.processors.longbow.storage.adapters; + +import com.alicloud.openservices.tablestore.model.ColumnValue; +import com.alicloud.openservices.tablestore.model.PrimaryKey; +import com.alicloud.openservices.tablestore.model.PrimaryKeyBuilder; +import com.alicloud.openservices.tablestore.model.PrimaryKeyValue; +import com.alicloud.openservices.tablestore.model.PutRowRequest; +import com.alicloud.openservices.tablestore.model.RowPutChange; +import lombok.RequiredArgsConstructor; +import org.apache.hadoop.hbase.client.Put; + +@RequiredArgsConstructor +public class HBasePutToTablestoreRequestAdapter implements TablestoreRequestAdapter { + + private final String primaryKeyName; + private final String tableId; + + @Override + public PutRowRequest adapt(Put request) { + PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); + primaryKeyBuilder.addPrimaryKeyColumn(primaryKeyName, PrimaryKeyValue.fromBinary(request.getRow())); + PrimaryKey primaryKey = primaryKeyBuilder.build(); + RowPutChange rowPutChange = new RowPutChange(tableId, primaryKey); + + request.getFamilyCellMap() + .forEach((columnFamilyName, columns) -> columns.forEach(cell -> rowPutChange.addColumn( + new String(cell.getQualifierArray()), + ColumnValue.fromBinary(cell.getValueArray()) + ))); + return new PutRowRequest(rowPutChange); + } + +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/adapters/TablestoreRequestAdapter.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/adapters/TablestoreRequestAdapter.java new file mode 100644 index 000000000..a3baf70df --- /dev/null +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/storage/adapters/TablestoreRequestAdapter.java @@ -0,0 +1,7 @@ +package com.gotocompany.dagger.core.processors.longbow.storage.adapters; + +import com.alicloud.openservices.tablestore.model.Request; + +public interface TablestoreRequestAdapter { + R adapt(T request); +} diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/validator/LongbowType.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/validator/LongbowType.java index 126475aef..df6eb25c0 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/validator/LongbowType.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/processors/longbow/validator/LongbowType.java @@ -10,7 +10,9 @@ public enum LongbowType { LongbowRead(LongbowKey.LONGBOW_READ, MandatoryFields.LONGBOW_READ, InvalidFields.LONGBOW_READ), LongbowWrite(LongbowKey.LONGBOW_WRITE, MandatoryFields.LONGBOW_WRITE, InvalidFields.LONGBOW_WRITE), - LongbowProcess(LongbowKey.LONGBOW_PROCESS, MandatoryFields.LONGBOW_PROCESS, InvalidFields.LONGBOW_PROCESS); + LongbowProcess(LongbowKey.LONGBOW_PROCESS, MandatoryFields.LONGBOW_PROCESS, InvalidFields.LONGBOW_PROCESS), + LongbowReadV2(LongbowKey.LONGBOW_READ, MandatoryFields.LONGBOW_READ, InvalidFields.LONGBOW_READ), + LongbowWriteV2(LongbowKey.LONGBOW_WRITE, MandatoryFields.LONGBOW_WRITE, InvalidFields.LONGBOW_WRITE); private static final String LONGBOW_TYPE_PREFIX = "_key"; private String keyName; diff --git a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java index 78fdbe707..045586d7e 100644 --- a/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java +++ b/dagger-core/src/main/java/com/gotocompany/dagger/core/utils/Constants.java @@ -17,11 +17,18 @@ public class Constants { public static final String LONGBOW_DELIMITER = "#"; public static final String LONGBOW_DATA_KEY = "longbow_data"; public static final String LONGBOW_PROTO_DATA_KEY = "proto_data"; + public static final String PROCESSOR_LONGBOW_STORAGE_TYPE = "PROCESSOR_LONGBOW_STORAGE_TYPE"; public static final String PROCESSOR_LONGBOW_GCP_PROJECT_ID_KEY = "PROCESSOR_LONGBOW_GCP_PROJECT_ID"; public static final String PROCESSOR_LONGBOW_GCP_PROJECT_ID_DEFAULT = "default-gcp-project"; public static final String PROCESSOR_LONGBOW_GCP_INSTANCE_ID_KEY = "PROCESSOR_LONGBOW_GCP_INSTANCE_ID"; public static final String PROCESSOR_LONGBOW_GCP_INSTANCE_ID_DEFAULT = "default-gcp-project"; public static final String PROCESSOR_LONGBOW_GCP_TABLE_ID_KEY = "PROCESSOR_LONGBOW_GCP_TABLE_ID"; + public static final String PROCESSOR_LONGBOW_TABLESTORE_CLIENT_INSTANCE_NAME = "PROCESSOR_LONGBOW_TABLESTORE_CLIENT_INSTANCE_NAME"; + public static final String PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ACCESS_KEY_ID = "PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ACCESS_KEY_ID"; + public static final String PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ACCESS_KEY_SECRET = "PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ACCESS_KEY_SECRET"; + public static final String PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ENDPOINT = "PROCESSOR_LONGBOW_TABLESTORE_CLIENT_ENDPOINT"; + public static final String PROCESSOR_LONGBOW_TABLESTORE_TABLE_ID = "PROCESSOR_LONGBOW_TABLESTORE_TABLE_ID"; + public static final String PROCESSOR_LONGBOW_TABLESTORE_PRIMARY_KEY_NAME = "PROCESSOR_LONGBOW_TABLESTORE_PRIMARY_KEY_NAME"; public static final String LONGBOW_COLUMN_FAMILY_DEFAULT = "ts"; public static final String LONGBOW_QUALIFIER_DEFAULT = "proto"; public static final Long PROCESSOR_LONGBOW_ASYNC_TIMEOUT_DEFAULT = 15000L; diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowProtoDataTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowProtoDataTest.java index 82ad688fb..3d4f3fbf9 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowProtoDataTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowProtoDataTest.java @@ -1,24 +1,22 @@ package com.gotocompany.dagger.core.processors.longbow.data; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; import com.gotocompany.dagger.core.utils.Constants; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import org.mockito.Mockito; import java.util.*; import static org.junit.Assert.*; -import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; public class LongbowProtoDataTest { - private static final byte[] COLUMN_FAMILY_NAME = Bytes.toBytes(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT); - @Mock - private Result scanResult; + private ScanResult scanResult; @Before public void setup() { @@ -26,11 +24,16 @@ public void setup() { } @Test - public void shouldParseProtoByteDataFromBigTable() { - ArrayList results = new ArrayList<>(); + public void shouldParseProtoByteData() { + ArrayList results = new ArrayList<>(); results.add(scanResult); byte[] mockResult = Bytes.toBytes("test"); - when(scanResult.getValue(COLUMN_FAMILY_NAME, Bytes.toBytes(Constants.LONGBOW_QUALIFIER_DEFAULT))).thenReturn(mockResult); + Map> data = new HashMap<>(); + Map innerData = new HashMap<>(); + innerData.put(Constants.LONGBOW_QUALIFIER_DEFAULT, mockResult); + data.put(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT, innerData); + Mockito.when(scanResult.getData()).thenReturn(data); + LongbowProtoData longbowProtoData = new LongbowProtoData(); Map> actualMap = longbowProtoData.parse(results); Map> expectedMap = new HashMap>() {{ diff --git a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowTableDataTest.java b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowTableDataTest.java index fe7a58779..e785bb277 100644 --- a/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowTableDataTest.java +++ b/dagger-core/src/test/java/com/gotocompany/dagger/core/processors/longbow/data/LongbowTableDataTest.java @@ -1,8 +1,8 @@ package com.gotocompany.dagger.core.processors.longbow.data; +import com.gotocompany.dagger.core.processors.longbow.model.ScanResult; import com.gotocompany.dagger.core.utils.Constants; import com.gotocompany.dagger.core.processors.longbow.LongbowSchema; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Before; import org.junit.Test; @@ -16,26 +16,32 @@ public class LongbowTableDataTest { - private static final byte[] COLUMN_FAMILY_NAME = Bytes.toBytes(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT); - @Mock - private Result result1; + private ScanResult result1; @Mock - private Result result2; + private ScanResult result2; @Before public void setUp() { initMocks(this); - when(result1.getValue(COLUMN_FAMILY_NAME, Bytes.toBytes("longbow_data1"))).thenReturn(Bytes.toBytes("RB-234")); - when(result1.getValue(COLUMN_FAMILY_NAME, Bytes.toBytes("longbow_data2"))).thenReturn(Bytes.toBytes("RB-235")); - when(result2.getValue(COLUMN_FAMILY_NAME, Bytes.toBytes("longbow_data1"))).thenReturn(Bytes.toBytes("RB-224")); - when(result2.getValue(COLUMN_FAMILY_NAME, Bytes.toBytes("longbow_data2"))).thenReturn(Bytes.toBytes("RB-225")); + Map> data1 = new HashMap<>(); + Map innerData1 = new HashMap<>(); + innerData1.put("longbow_data1", Bytes.toBytes("RB-234")); + innerData1.put("longbow_data2", Bytes.toBytes("RB-235")); + data1.put(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT, innerData1); + Map> data2 = new HashMap<>(); + Map innerData2 = new HashMap<>(); + innerData2.put("longbow_data1", Bytes.toBytes("RB-224")); + innerData2.put("longbow_data2", Bytes.toBytes("RB-225")); + data2.put(Constants.LONGBOW_COLUMN_FAMILY_DEFAULT, innerData2); + when(result1.getData()).thenReturn(data1); + when(result2.getData()).thenReturn(data2); } @Test public void shouldReturnEmptyDataWhenScanResultIsEmpty() { - List scanResult = new ArrayList<>(); + List scanResult = new ArrayList<>(); String[] columnNames = {"longbow_key", "longbow_data1", "rowtime", "longbow_duration"}; LongbowSchema longbowSchema = new LongbowSchema(columnNames); @@ -46,7 +52,7 @@ public void shouldReturnEmptyDataWhenScanResultIsEmpty() { @Test public void shouldReturnListOfString() { - List scanResult = new ArrayList<>(); + List scanResult = new ArrayList<>(); scanResult.add(result1); String[] columnNames = {"longbow_key", "longbow_data1", "rowtime", "longbow_duration"}; @@ -58,7 +64,7 @@ public void shouldReturnListOfString() { @Test public void shouldReturnMultipleListOfStringWhenLongbowDataMoreThanOne() { - List scanResult = new ArrayList<>(); + List scanResult = new ArrayList<>(); scanResult.add(result1); scanResult.add(result2); String[] columnNames = {"longbow_key", "longbow_data1", "rowtime", "longbow_duration", "longbow_data2"};