Skip to content

Commit

Permalink
chore: dlq-fixes (#18)
Browse files Browse the repository at this point in the history
* chore: dlq-fixes

* chore: version upgrade

* chore: fix tests
  • Loading branch information
lavkesh authored Aug 30, 2023
1 parent 7bc38fc commit 663de0a
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 23 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'com.gotocompany'
version '0.8.12'
version '0.8.13'

def projName = "firehose"

Expand Down Expand Up @@ -101,7 +101,7 @@ dependencies {
implementation platform('com.google.cloud:libraries-bom:20.5.0')
implementation 'com.google.cloud:google-cloud-storage:2.20.1'
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
implementation group: 'com.gotocompany', name: 'depot', version: '0.4.9'
implementation group: 'com.gotocompany', name: 'depot', version: '0.6.0'
implementation group: 'com.networknt', name: 'json-schema-validator', version: '1.0.59' exclude group: 'org.slf4j'

testImplementation group: 'junit', name: 'junit', version: '4.11'
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/gotocompany/firehose/config/GCSConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public interface GCSConfig extends Config {
@Key("${GCS_TYPE}_GCS_BUCKET_NAME")
String getGCSBucketName();

@Key("${GCS_TYPE}_GCS_DIRECTORY_PREFIX")
String getGCSDirectoryPrefix();

@Key("${GCS_TYPE}_GCS_CREDENTIAL_PATH")
String getGCSCredentialPath();

Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/gotocompany/firehose/config/S3Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ public interface S3Config extends Config {
@Key("${S3_TYPE}_S3_BUCKET_NAME")
String getS3BucketName();

@Key("${S3_TYPE}_S3_DIRECTORY_PREFIX")
String getS3DirectoryPrefix();

@Key("${S3_TYPE}_S3_ACCESS_KEY")
String getS3AccessKey();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
import org.threeten.bp.Duration;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;

public class GoogleCloudStorage implements BlobStorage {
private static final Logger LOGGER = LoggerFactory.getLogger(GoogleCloudStorage.class);
private final GCSConfig gcsConfig;
private final Storage storage;

public GoogleCloudStorage(GCSConfig gcsConfig) throws IOException {
this(gcsConfig, GoogleCredentials.fromStream(new FileInputStream(gcsConfig.getGCSCredentialPath())));
this(gcsConfig, GoogleCredentials.fromStream(Files.newInputStream(Paths.get(gcsConfig.getGCSCredentialPath()))));
checkBucket();
logRetentionPolicy();
}
Expand Down Expand Up @@ -73,29 +71,34 @@ private void logRetentionPolicy() {
Storage.BucketGetOption.fields(Storage.BucketField.RETENTION_POLICY),
Storage.BucketGetOption.userProject(gcsConfig.getGCloudProjectID()));
LOGGER.info("Retention Policy for {}", bucketName);
LOGGER.info("Retention Period: {}", bucket.getRetentionPeriod());
LOGGER.info("Retention Period: {}", bucket.getRetentionPeriodDuration());
if (bucket.retentionPolicyIsLocked() != null && bucket.retentionPolicyIsLocked()) {
LOGGER.info("Retention Policy is locked");
}
if (bucket.getRetentionEffectiveTime() != null) {
LOGGER.info("Effective Time: {}", new Date(bucket.getRetentionEffectiveTime()));
}
}

@Override
public void store(String objectName, String filePath) throws BlobStorageException {
String finalPath = createPath(objectName);
try {
byte[] content = Files.readAllBytes(Paths.get(filePath));
store(objectName, content);
store(finalPath, content);
} catch (IOException e) {
LOGGER.error("Failed to read local file {}", filePath);
throw new BlobStorageException("file_io_error", "File Read failed", e);
}
}

private String createPath(String objectName) {
String prefix = gcsConfig.getGCSDirectoryPrefix();
return prefix == null || prefix.isEmpty()
? objectName : Paths.get(prefix, objectName).toString();
}

@Override
public void store(String objectName, byte[] content) throws BlobStorageException {
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(gcsConfig.getGCSBucketName(), objectName)).build();
String finalPath = createPath(objectName);
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(gcsConfig.getGCSBucketName(), finalPath)).build();
String blobPath = String.join(File.separator, blobInfo.getBucket(), blobInfo.getName());
try {
storage.create(blobInfo, content, Storage.BlobTargetOption.userProject(gcsConfig.getGCloudProjectID()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ private void checkBucket() {

@Override
public void store(String objectName, String filePath) throws BlobStorageException {
String finalPath = createPath(objectName);
try {
byte[] content = Files.readAllBytes(Paths.get(filePath));
store(objectName, content);
store(finalPath, content);
} catch (IOException e) {
LOGGER.error("Failed to read local file {}", filePath);
throw new BlobStorageException("file_io_error", "File Read failed", e);
Expand All @@ -83,10 +84,11 @@ public void store(String objectName, String filePath) throws BlobStorageExceptio

@Override
public void store(String objectName, byte[] content) throws BlobStorageException {
String finalPath = createPath(objectName);
try {
PutObjectRequest putObject = PutObjectRequest.builder()
.bucket(s3Config.getS3BucketName())
.key(objectName)
.key(finalPath)
.build();
s3Client.putObject(putObject, RequestBody.fromBytes(content));
LOGGER.info("Created object in S3 {}", objectName);
Expand All @@ -95,4 +97,10 @@ public void store(String objectName, byte[] content) throws BlobStorageException
throw new BlobStorageException(ase.getMessage(), ase.getMessage(), ase);
}
}

private String createPath(String objectName) {
String prefix = s3Config.getS3DirectoryPrefix();
return prefix == null || prefix.isEmpty()
? objectName : Paths.get(prefix, objectName).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ private String convertToString(Message message) {
message.getPartition(),
message.getOffset(),
message.getTimestamp(),
message.getErrorInfo().toString()));
message.getErrorInfo().toString(),
message.getErrorInfo().getErrorType().name()));
} catch (JsonProcessingException e) {
log.warn("Not able to convert message into json", e);
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ public class DlqMessage {
private long timestamp;
@JsonProperty("error")
private String error;
@JsonProperty("error_type")
private String errorType;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,22 @@
public class GoogleCloudStorageTest {

@Test
public void shouldCallStorage() throws BlobStorageException {
public void shouldCallStorageWithPrefix() throws BlobStorageException {
GCSConfig config = ConfigFactory.create(GCSConfig.class, new HashMap<Object, Object>() {{
put("GCS_TYPE", "SOME_TYPE");
put("SOME_TYPE_GCS_BUCKET_NAME", "TestBucket");
put("SOME_TYPE_GCS_GOOGLE_CLOUD_PROJECT_ID", "projectID");
put("SOME_TYPE_GCS_DIRECTORY_PREFIX", "some-name");
}});
Storage storage = Mockito.mock(Storage.class);
GoogleCloudStorage gcs = new GoogleCloudStorage(config, storage);
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of("TestBucket", "some-name/test")).build();
gcs.store("test", new byte[]{});
Mockito.verify(storage, Mockito.times(1)).create(blobInfo, new byte[]{}, Storage.BlobTargetOption.userProject("projectID"));
}

@Test
public void shouldCallStorageWithoutPrefix() throws BlobStorageException {
GCSConfig config = ConfigFactory.create(GCSConfig.class, new HashMap<Object, Object>() {{
put("GCS_TYPE", "SOME_TYPE");
put("SOME_TYPE_GCS_BUCKET_NAME", "TestBucket");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public void shouldWriteMessagesWithoutErrorInfoToObjectStorage() throws IOExcept
Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size());

verify(blobStorage).store(contains("booking/2020-01-02"),
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes()));
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}").getBytes()));
verify(blobStorage).store(contains("booking/2020-01-01"),
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\"}").getBytes()));
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception test, ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}").getBytes()));
}

@Test
Expand All @@ -69,11 +69,11 @@ public void shouldWriteMessageErrorTypesToObjectStorage() throws IOException, Bl
Assert.assertEquals(0, blobStorageDLQWriter.write(messages).size());

verify(blobStorage).store(contains("booking/2020-01-02"),
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes()));
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":3,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":4,\"timestamp\":1577923200000,\"error\":\"Exception , ErrorType: SINK_UNKNOWN_ERROR\",\"error_type\":\"SINK_UNKNOWN_ERROR\"}").getBytes()));
verify(blobStorage).store(contains("booking/2020-01-01"),
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception null, ErrorType: SINK_UNKNOWN_ERROR\"}").getBytes()));
eq(("{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":1,\"timestamp\":1577836800000,\"error\":\"Exception , ErrorType: DESERIALIZATION_ERROR\",\"error_type\":\"DESERIALIZATION_ERROR\"}\n"
+ "{\"key\":\"MTIz\",\"value\":\"YWJj\",\"topic\":\"booking\",\"partition\":1,\"offset\":2,\"timestamp\":1577836800000,\"error\":\"Exception null, ErrorType: SINK_UNKNOWN_ERROR\",\"error_type\":\"SINK_UNKNOWN_ERROR\"}").getBytes()));
}

@Test
Expand Down

0 comments on commit 663de0a

Please sign in to comment.