Skip to content
Draft
2 changes: 1 addition & 1 deletion env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ INPUT_SCHEMA_PROTO_CLASS=com.gotocompany.firehose.consumer.TestMessage
# APPLICATION_THREAD_COUNT=2
# TRACE_JAEGAR_ENABLE=true
# LOG_LEVEL=info
# INPUT_SCHEMA_DATA_TYPE=protobuf
#
#
#############################################
Expand Down Expand Up @@ -122,7 +123,6 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
# SINK_HTTP_OAUTH2_CLIENT_SECRET=client-secret
# SINK_HTTP_OAUTH2_SCOPE=User:read, sys:info
#
#
#############################################
#
## INFLUX SINK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ public List<Message> readMessages() {
List<Message> messages = new ArrayList<>();

for (ConsumerRecord<byte[], byte[]> record : records) {
messages.add(new Message(record.key(), record.value(), record.topic(), record.partition(), record.offset(), record.headers(), record.timestamp(), System.currentTimeMillis()));
Message msg = new Message(record.key(), record.value(), record.topic(), record.partition(), record.offset(), record.headers(), record.timestamp(), System.currentTimeMillis());
msg.setInputSchemaType(consumerConfig.getInputSchemaType());
messages.add(msg);
firehoseInstrumentation.logDebug("Pulled record: {}", record);
}

return messages;
}

Expand Down
25 changes: 22 additions & 3 deletions src/main/java/com/gotocompany/firehose/message/Message.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gotocompany.firehose.message;


import com.gotocompany.firehose.config.enums.InputSchemaType;
import com.gotocompany.firehose.exception.DefaultException;
import com.gotocompany.depot.error.ErrorInfo;
import com.gotocompany.depot.error.ErrorType;
Expand Down Expand Up @@ -29,6 +30,8 @@ public class Message {
private long consumeTimestamp;
@Setter
private ErrorInfo errorInfo;
@Setter
private InputSchemaType inputSchemaType;

public void setDefaultErrorIfNotPresent() {
if (errorInfo == null) {
Expand All @@ -54,7 +57,7 @@ public Message(byte[] logKey, byte[] logMessage, String topic, int partition, lo
}

/**
* Instantiates a new Message without providing errorType.
* Instantiates a new Message without providing errorType and inputSchemaType.
*
* @param logKey
* @param logMessage
Expand All @@ -66,7 +69,7 @@ public Message(byte[] logKey, byte[] logMessage, String topic, int partition, lo
* @param consumeTimestamp
*/
public Message(byte[] logKey, byte[] logMessage, String topic, int partition, long offset, Headers headers, long timestamp, long consumeTimestamp) {
this(logKey, logMessage, topic, partition, offset, headers, timestamp, consumeTimestamp, null);
this(logKey, logMessage, topic, partition, offset, headers, timestamp, consumeTimestamp, null, InputSchemaType.PROTOBUF);
}

public Message(Message message, ErrorInfo errorInfo) {
Expand All @@ -78,7 +81,23 @@ public Message(Message message, ErrorInfo errorInfo) {
message.getHeaders(),
message.getTimestamp(),
message.getConsumeTimestamp(),
errorInfo);
errorInfo,
message.getInputSchemaType()
);
}

public Message(Message message, ErrorInfo errorInfo, InputSchemaType inputSchemaType) {
this(message.getLogKey(),
message.getLogMessage(),
message.getTopic(),
message.getPartition(),
message.getOffset(),
message.getHeaders(),
message.getTimestamp(),
message.getConsumeTimestamp(),
errorInfo,
inputSchemaType
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gotocompany.firehose.serializer;


import com.gotocompany.firehose.config.enums.InputSchemaType;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.exception.DeserializerException;
import com.google.gson.ExclusionStrategy;
Expand All @@ -17,6 +18,7 @@
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -55,6 +57,17 @@ public String serialize(Message message) throws DeserializerException {
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", message.getTopic());

if (message.getInputSchemaType() == InputSchemaType.JSON) {
JSONParser parser = new JSONParser();
JSONObject json = (JSONObject) parser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8));
jsonObject.put("logMessage", gson.toJson(json));
if (message.getLogKey() != null && message.getLogKey().length != 0) {
jsonObject.put("logKey", new String(message.getLogKey(), StandardCharsets.UTF_8));
}

return jsonObject.toJSONString();
}

if (message.getLogKey() != null && message.getLogKey().length != 0) {
DynamicMessage key = protoParser.parse(message.getLogKey());
jsonObject.put("logKey", this.gson.toJson(convertDynamicMessageToJson(key)));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.gotocompany.firehose.serializer;


import com.gotocompany.firehose.config.enums.InputSchemaType;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.exception.DeserializerException;
import com.gotocompany.firehose.exception.ConfigurationException;
Expand All @@ -12,9 +13,11 @@
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import com.gotocompany.stencil.Parser;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -77,9 +80,18 @@ public String serialize(Message message) throws DeserializerException {
try {
String jsonMessage;
String jsonString;
// only supports messages not keys
DynamicMessage msg = protoParser.parse(message.getLogMessage());
jsonMessage = JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(msg);

if (message.getInputSchemaType() == InputSchemaType.JSON) {
System.out.println(new String(message.getLogMessage()));
JSONObject json = (JSONObject) jsonParser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8));
System.out.println(json.toJSONString());
jsonMessage = json.toJSONString();
} else {
// only supports messages not keys
DynamicMessage msg = protoParser.parse(message.getLogMessage());
jsonMessage = JsonFormat.printer().includingDefaultValueFields().preservingProtoFieldNames().print(msg);
}

String finalMessage = httpSinkJsonBodyTemplate;
for (String path : pathsToReplace) {
if (path.equals(ALL_FIELDS_FROM_TEMPLATE)) {
Expand All @@ -91,7 +103,8 @@ public String serialize(Message message) throws DeserializerException {
finalMessage = finalMessage.replace(path, jsonString);
}
return finalMessage;
} catch (InvalidProtocolBufferException | PathNotFoundException e) {
} catch (InvalidProtocolBufferException | ParseException | PathNotFoundException e) {
e.printStackTrace();
throw new DeserializerException(e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.gotocompany.firehose.config.HttpSinkConfig;
import com.gotocompany.firehose.config.enums.HttpSinkDataFormatType;
import com.gotocompany.firehose.config.enums.InputSchemaType;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
import com.gotocompany.firehose.serializer.JsonWrappedProtoByte;
import com.gotocompany.firehose.serializer.MessageSerializer;
Expand All @@ -24,12 +25,16 @@ public class SerializerFactory {

public MessageSerializer build() {
FirehoseInstrumentation firehoseInstrumentation = new FirehoseInstrumentation(statsDReporter, SerializerFactory.class);
if (isProtoSchemaEmpty() || httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.PROTO) {
if ( (httpSinkConfig.getInputSchemaType() == InputSchemaType.PROTOBUF && isProtoSchemaEmpty()) || httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.PROTO) {
firehoseInstrumentation.logDebug("Serializer type: JsonWrappedProtoByte");
// Fallback to json wrapped proto byte

// todo(sushmith):
// here output is proto, but input expected is also proto.
// need to have json as input and proto as output.
// this is currently not possible because of the way we are using the parser.
return new JsonWrappedProtoByte();
}

if (httpSinkConfig.getSinkHttpDataFormat() == HttpSinkDataFormatType.JSON) {
Parser protoParser = stencilClient.getParser(httpSinkConfig.getInputSchemaProtoClass());
if (httpSinkConfig.getSinkHttpJsonBodyTemplate().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package com.gotocompany.firehose.sink.http.request.uri;


import com.google.gson.JsonObject;
import com.gotocompany.firehose.config.enums.InputSchemaType;
import com.gotocompany.firehose.exception.JsonParseException;
import com.gotocompany.firehose.message.Message;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.gotocompany.stencil.Parser;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;

Expand All @@ -18,16 +25,23 @@
public class UriParser {
private Parser protoParser;
private String parserMode;
private JSONParser jsonParser;


public UriParser(Parser protoParser, String parserMode) {
this.protoParser = protoParser;
this.parserMode = parserMode;
this.jsonParser = new JSONParser();
}

public String parse(Message message, String serviceUrl) {
DynamicMessage parsedMessage = parseEsbMessage(message);
return parseServiceUrl(parsedMessage, serviceUrl);
public UriParser(Parser protoParser, String parserMode, JSONParser jsonParser) {
this.protoParser = protoParser;
this.parserMode = parserMode;
this.jsonParser = jsonParser;
}

public String parse(Message message, String serviceUrl) {
return parseServiceUrl(message, serviceUrl);
}

private DynamicMessage parseEsbMessage(Message message) {
Expand All @@ -40,7 +54,18 @@ private DynamicMessage parseEsbMessage(Message message) {
return parsedMessage;
}

private String parseServiceUrl(DynamicMessage data, String serviceUrl) {
private JSONObject parseJsonMessage(Message message) {
JSONObject jsonObject;
try {
jsonObject = (JSONObject) jsonParser.parse(new String(message.getLogMessage(), StandardCharsets.UTF_8));
} catch (ParseException e) {
throw new JsonParseException(e.getMessage(), e.getCause());
}
return jsonObject;
}


private String parseServiceUrl(Message message, String serviceUrl) {
if (StringUtils.isEmpty(serviceUrl)) {
throw new IllegalArgumentException("Service URL '" + serviceUrl + "' is invalid");
}
Expand All @@ -55,16 +80,43 @@ private String parseServiceUrl(DynamicMessage data, String serviceUrl) {

String urlPattern = urlStrings[0];
String urlVariables = StringUtils.join(Arrays.copyOfRange(urlStrings, 1, urlStrings.length), ",");
String renderedUrl = renderStringUrl(data, urlPattern, urlVariables);
return StringUtils.isEmpty(urlVariables)
? urlPattern
: renderedUrl;

if (StringUtils.isEmpty(urlVariables)) {
return urlPattern;
}

String renderedUrl;
if (message.getInputSchemaType() == InputSchemaType.JSON) {
JSONObject json = parseJsonMessage(message);
renderedUrl = renderStringUrl(json, urlPattern, urlVariables);
} else {
// InputSchemaType.PROTOBUF
DynamicMessage data = parseEsbMessage(message);
renderedUrl = renderStringUrl(data, urlPattern, urlVariables);
}

return renderedUrl;
}

private String renderStringUrl(DynamicMessage parsedMessage, String pattern, String patternVariables) {
if (StringUtils.isEmpty(patternVariables)) {
return pattern;
private String renderStringUrl(JSONObject jsonObject, String pattern, String patternVariables) {
List<String> patternVariablesList = Arrays.asList(patternVariables.split(","));
Object[] patternVariableData = patternVariablesList
.stream()
.map(field -> getDataByFieldName(jsonObject, field))
.toArray();
return String.format(pattern, patternVariableData);
}

private Object getDataByFieldName(JSONObject jsonObject, String fieldName) {
if (!jsonObject.containsKey(fieldName)) {
throw new IllegalArgumentException("Invalid json field name: " + fieldName);
}

return jsonObject.get(fieldName);
}


private String renderStringUrl(DynamicMessage parsedMessage, String pattern, String patternVariables) {
List<String> patternVariableFieldNumbers = Arrays.asList(patternVariables.split(","));
Object[] patternVariableData = patternVariableFieldNumbers
.stream()
Expand Down
Loading