Skip to content

Commit

Permalink
Add firehose gcs connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ekawinataa committed Aug 30, 2024
1 parent 1e2b0e0 commit 9dafc98
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 9 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ dependencies {
protobuf {
generatedFilesBaseDir = "$projectDir/src/generated"
protoc {
artifact = "com.google.protobuf:protoc:3.1.0"
artifact = "com.google.protobuf:protoc:3.17.3"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:1.0.3"
artifact = "io.grpc:protoc-gen-grpc-java:1.60.1"
}
}
generateProtoTasks {
Expand Down
28 changes: 23 additions & 5 deletions env/local.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,17 @@
#
## Generic
#
GCS_TYPE=DLQ
SOURCE_KAFKA_CONSUMER_MODE=DLQ
KAFKA_RECORD_PARSER_MODE=message
SINK_TYPE=log
INPUT_SCHEMA_PROTO_CLASS=com.gotocompany.firehose.consumer.TestMessage
INPUT_SCHEMA_PROTO_CLASS=demo.User

DLQ_GCS_GOOGLE_CLOUD_PROJECT_ID=gtfndata-integration
DLQ_GCS_BUCKET_NAME=g-gtfndata-core-dlq
DLQ_GCS_CREDENTIAL_PATH=cred.json
GCS_BLOB_ARCHIVE_PATH=test-dlq
GCS_BLOB_SOURCE_PATH_PREFIX=test-grpc
# INPUT_SCHEMA_PROTO_TO_COLUMN_MAPPING={"1":"order_number","2":"event_timestamp","3":"driver_id"}
# METRIC_STATSD_HOST=localhost
# METRIC_STATSD_PORT=8125
Expand All @@ -26,8 +34,8 @@ INPUT_SCHEMA_PROTO_CLASS=com.gotocompany.firehose.consumer.TestMessage
## Stencil Client
#
SCHEMA_REGISTRY_STENCIL_ENABLE=true
SCHEMA_REGISTRY_STENCIL_URLS=http://localhost:8081/descriptors.bin
# SCHEMA_REGISTRY_STENCIL_FETCH_TIMEOUT_MS=10000
SCHEMA_REGISTRY_STENCIL_URLS=http://stencil.integration.gtfdata.io/v1beta1/namespaces/poc/schemas/demo
# SCHEMA_REGISTRY_STENCIL_FETCH_RETRIES=3
# SCHEMA_REGISTRY_STENCIL_FETCH_BACKOFF_MIN_MS=60000
# SCHEMA_REGISTRY_STENCIL_FETCH_AUTH_BEARER_TOKEN=tcDpw34J8d1
Expand All @@ -40,8 +48,8 @@ SCHEMA_REGISTRY_STENCIL_REFRESH_STRATEGY=LONG_POLLING
#
## Kafka Consumer
#
SOURCE_KAFKA_BROKERS=localhost:9092
SOURCE_KAFKA_TOPIC=test-topic
SOURCE_KAFKA_BROKERS=10.114.36.46:6668,10.114.36.48:6668,10.114.36.49:6668
SOURCE_KAFKA_TOPIC=test-grpc
# SOURCE_KAFKA_CONSUMER_CONFIG_MAX_POLL_RECORDS=500
# SOURCE_KAFKA_ASYNC_COMMIT_ENABLE=true
# SOURCE_KAFKA_CONSUMER_CONFIG_SESSION_TIMEOUT_MS=10000
Expand Down Expand Up @@ -70,7 +78,17 @@ SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-group-id
# RETRY_EXPONENTIAL_BACKOFF_INITIAL_MS=10
# RETRY_EXPONENTIAL_BACKOFF_RATE=2
# RETRY_EXPONENTIAL_BACKOFF_MAX_MS=60000
# DLQ_ENABLE=true
DLQ_SINK_ENABLE=true
DLQ_WRITER_TYPE=kafka
DLQ_KAFKA_TOPIC=dlq-topic
DLQ_KAFKA_BROKERS=10.114.36.46:6668,10.114.36.48:6668,10.114.36.49:6668
DLQ_KAFKA_ACKS=all
DLQ_KAFKA_RETRIES=1
DLQ_KAFKA_BATCH_SIZE=100
DLQ_KAFKA_LINGER_MS=0
DLQ_KAFKA_BUFFER_MEMORY=333334
# DLQ_KAFKA_KEY_SERIALIZER=com.gotocompany.generic.GenericResponse
# DLQ_KAFKA_VALUE_SERIALIZER=com.gotocompany.generic.GenericResponse
#
#
#############################################
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.gotocompany.firehose.config;

import org.aeonbits.owner.Config;

public interface BlobConsumerConfig extends Config {
@Config.Key("BLOB_SOURCE_TYPE")
String getSourceBlobBucketName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public interface GCSConfig extends Config {
@Key("${GCS_TYPE}_GCS_RETRY_RPC_MAX_TIMEOUT_MS")
@DefaultValue("5000")
Long getGCSRetryRPCMaxTimeoutMS();

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.gotocompany.firehose.config;

import org.aeonbits.owner.Config;

public interface GcsBlobConsumerConfig extends Config {
@Config.Key("GCS_BLOB_SOURCE_BUCKET_NAME")
String getSourceBlobBucketName();

@Config.Key("GCS_BLOB_SOURCE_SERVICE_ACCOUNT")
String getSourceBlobServiceAccount();

@Config.Key("GCS_BLOB_SOURCE_PROJECT_ID")
String getSourceBlobProjectId();

@Config.Key("GCS_BLOB_SOURCE_PATH_PREFIX")
String getSourcePathPrefix();

@Config.Key("GCS_BLOB_ARCHIVE_PATH")
String getSourceArchivePath();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

public enum KafkaConsumerMode {
ASYNC,
SYNC
SYNC,
DLQ
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.gotocompany.firehose.consumer;

import com.gotocompany.firehose.config.GcsBlobConsumerConfig;
import com.gotocompany.firehose.consumer.kafka.ConsumerAndOffsetManager;
import com.gotocompany.firehose.consumer.kafka.FirehoseKafkaConsumer;
import com.gotocompany.firehose.consumer.kafka.OffsetManager;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageFactory;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorageType;
import io.jaegertracing.Configuration;
import com.gotocompany.depot.metrics.StatsDReporter;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
Expand Down Expand Up @@ -40,6 +43,7 @@
import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracerFactory;
import org.aeonbits.owner.ConfigFactory;
import org.apache.hadoop.fs.viewfs.ConfigUtil;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -126,6 +130,13 @@ public FirehoseConsumer buildConsumer() {
kafkaConsumerConfig.isTraceJaegarEnable());
SinkFactory sinkFactory = new SinkFactory(kafkaConsumerConfig, statsDReporter, stencilClient, offsetManager);
sinkFactory.init();
if (kafkaConsumerConfig.getSourceKafkaConsumerMode().equals(KafkaConsumerMode.DLQ)) {
Sink sink = createSink(tracer, sinkFactory);
return new GcsBlobStorageFirehoseConsumer(BlobStorageFactory.createObjectStorage(BlobStorageType.GCS, config),
sink,
firehoseTracer,
ConfigFactory.create(GcsBlobConsumerConfig.class, config));
}
if (kafkaConsumerConfig.getSourceKafkaConsumerMode().equals(KafkaConsumerMode.SYNC)) {
Sink sink = createSink(tracer, sinkFactory);
ConsumerAndOffsetManager consumerAndOffsetManager = new ConsumerAndOffsetManager(Collections.singletonList(sink), offsetManager, firehoseKafkaConsumer, kafkaConsumerConfig, new FirehoseInstrumentation(statsDReporter, ConsumerAndOffsetManager.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.gotocompany.firehose.consumer;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.gotocompany.firehose.config.GcsBlobConsumerConfig;
import com.gotocompany.firehose.message.Message;
import com.gotocompany.firehose.sink.Sink;
import com.gotocompany.firehose.sink.common.blobstorage.BlobStorage;
import com.gotocompany.firehose.tracer.SinkTracer;
import io.opentracing.Span;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;

public class GcsBlobStorageFirehoseConsumer implements FirehoseConsumer {

private final BlobStorage blobStorage;
private final Sink sink;
private final SinkTracer sinkTracer;
private final GcsBlobConsumerConfig gcsBlobConsumerConfig;
private final ObjectMapper objectMapper;

public GcsBlobStorageFirehoseConsumer(BlobStorage blobStorage,
Sink sink,
SinkTracer sinkTracer,
GcsBlobConsumerConfig gcsBlobConsumerConfig) {
this.blobStorage = blobStorage;
this.sink = sink;
this.gcsBlobConsumerConfig = gcsBlobConsumerConfig;
this.sinkTracer = sinkTracer;
this.objectMapper = new ObjectMapper();
}

@Override
public void process() throws IOException {
List<String> fileNames = blobStorage.list(gcsBlobConsumerConfig.getSourcePathPrefix());

for (String fileName : fileNames) {
byte[] content = blobStorage.get(fileName);
List<Message> messages = parseBlobToMessages(content);
List<Span> spans = sinkTracer.startTrace(messages);
sink.pushMessage(messages);
sinkTracer.finishTrace(spans);
}

}

private List<Message> parseBlobToMessages(byte[] content) throws IOException {
InputStream inputStream = new ByteArrayInputStream(content);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String line = bufferedReader.readLine();
List<Message> result = new ArrayList<>();
while (line != null) {
JsonNode node = objectMapper.readTree(line);
String logKey = node.get("key").asText();
String logMessage = node.get("value").asText();
String topic = node.get("topic").asText();
int partition = node.get("partition").asInt();
long offset = node.get("offset").asLong();
Message message = new Message(Base64.getDecoder().decode(logKey), Base64.getDecoder().decode(logMessage), topic, partition, offset);
result.add(message);
line = bufferedReader.readLine();
}
return result;
}

@Override
public void close() throws IOException {
sink.close();
}
}
6 changes: 5 additions & 1 deletion src/main/java/com/gotocompany/firehose/launch/Main.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.gotocompany.firehose.launch;

import com.gotocompany.firehose.config.enums.KafkaConsumerMode;
import com.gotocompany.firehose.consumer.FirehoseConsumer;
import com.gotocompany.firehose.consumer.FirehoseConsumerFactory;
import com.gotocompany.firehose.metrics.FirehoseInstrumentation;
Expand Down Expand Up @@ -42,7 +43,6 @@ private static void multiThreadedConsumers(KafkaConsumerConfig kafkaConsumerConf
kafkaConsumerConfig.getApplicationThreadCleanupDelay(),
new FirehoseInstrumentation(statsDReporter, Task.class),
taskFinished -> {

FirehoseConsumer firehoseConsumer = null;
try {
firehoseConsumer = new FirehoseConsumerFactory(kafkaConsumerConfig, statsDReporter).buildConsumer();
Expand All @@ -52,6 +52,10 @@ private static void multiThreadedConsumers(KafkaConsumerConfig kafkaConsumerConf
break;
}
firehoseConsumer.process();
if (KafkaConsumerMode.DLQ.equals(kafkaConsumerConfig.getSourceKafkaConsumerMode())) {
firehoseInstrumentation.logInfo("DLQ mode is enabled, exiting the loop!");
break;
}
}
} catch (Exception | Error e) {
ensureThreadInterruptStateIsClearedAndClose(firehoseConsumer, firehoseInstrumentation);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package com.gotocompany.firehose.sink.common.blobstorage;

import com.google.cloud.storage.Blob;

import java.util.List;

/**
* Abstraction of any storage that store binary bytes as file.
*/
public interface BlobStorage {
void store(String objectName, String filePath) throws BlobStorageException;

void store(String objectName, byte[] content) throws BlobStorageException;

byte[] get(String filePath);

List<String> list(String prefix);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.gotocompany.firehose.sink.common.blobstorage.gcs;

import com.google.api.gax.paging.Page;
import com.google.api.gax.retrying.RetrySettings;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
Expand All @@ -20,6 +22,10 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;

import java.util.stream.StreamSupport;

public class GoogleCloudStorage implements BlobStorage {
private static final Logger LOGGER = LoggerFactory.getLogger(GoogleCloudStorage.class);
Expand Down Expand Up @@ -109,4 +115,18 @@ public void store(String objectName, byte[] content) throws BlobStorageException
throw new BlobStorageException(gcsErrorType, "GCS Upload failed", e);
}
}

@Override
public byte[] get(String filePath) {
return storage.readAllBytes(BlobId.of(gcsConfig.getGCSBucketName(), filePath));
}

@Override
public List<String> list(String prefix) {
Page<Blob> blobs = storage.list(gcsConfig.getGCSBucketName(), Storage.BlobListOption.prefix(prefix));

return StreamSupport.stream(blobs.iterateAll().spliterator(), false)
.map(BlobInfo::getName)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

import javax.naming.OperationNotSupportedException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;

public class S3 implements BlobStorage {
private static final Logger LOGGER = LoggerFactory.getLogger(S3.class);
Expand Down Expand Up @@ -98,6 +101,16 @@ public void store(String objectName, byte[] content) throws BlobStorageException
}
}

@Override
public byte[] get(String filePath) {
throw new IllegalArgumentException("Not implemented");
}

@Override
public List<String> list(String prefix) {
return null;
}

private String createPath(String objectName) {
String prefix = s3Config.getS3DirectoryPrefix();
return prefix == null || prefix.isEmpty()
Expand Down

0 comments on commit 9dafc98

Please sign in to comment.