Skip to content

Commit

Permalink
S3 Output support
Browse files Browse the repository at this point in the history
  • Loading branch information
xdev-developer committed Apr 5, 2024
1 parent 9712287 commit caaca05
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 22 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
[![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAA4AAAAQCAMAAAARSr4IAAAAVFBMVEUAAACHjojlOy5NWlrKzcYRKjGFjIbp293YycuLa3pYY2LSqql4f3pCUFTgSjNodYRmcXUsPD/NTTbjRS+2jomhgnzNc223cGvZS0HaSD0XLjbaSjElhIr+AAAAAXRSTlMAQObYZgAAAHlJREFUCNdNyosOwyAIhWHAQS1Vt7a77/3fcxxdmv0xwmckutAR1nkm4ggbyEcg/wWmlGLDAA3oL50xi6fk5ffZ3E2E3QfZDCcCN2YtbEWZt+Drc6u6rlqv7Uk0LdKqqr5rk2UCRXOk0vmQKGfc94nOJyQjouF9H/wCc9gECEYfONoAAAAASUVORK5CYII=)](https://scala-steward.org)
[![Coverage Status](https://coveralls.io/repos/github/xdev-developer/gen4s/badge.svg?branch=main)](https://coveralls.io/github/xdev-developer/gen4s?branch=main)

![Build status](https://github.com/xdev-developer/gen4s/actions/workflows/build.yml/badge.svg)

Gen4s is a powerful data generation tool designed for developers and QA engineers.

Features:
Expand Down Expand Up @@ -377,6 +379,29 @@ output {
}
```

### AWS S3 Output
```properties
output {
writer {
type = s-3-output
bucket = "test-bucket"
key = "key-%s.json"
region = "us-east-1"
endpoint = "http://localhost:4566"
part-size-mb = 5
}
transformers = ["json-minify"]
validators = ["json", "missing-vars"]
}
```

The available options for configuring an S3 output are:

- `bucket`: This is the name of the S3 bucket where the output data will be written.
- `key`: Represents the object key pattern. The `%s` is a placeholder that will be replaced unique identifier.
- `region`: This is the AWS region where the S3 bucket is located.
- `endpoint`: This is the URL of the S3 service endpoint. This can be useful for testing with local S3-compatible services like LocalStack.
- `part-size-mb`: This is used to specify the part size for multipart uploads to the S3 bucket. The value is in megabytes.

#### Transformers

Expand Down
17 changes: 17 additions & 0 deletions app/src/main/scala/io/gen4s/conf/OutputConfig.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.gen4s.conf

import java.net.URI

import scala.util.Try

import cats.implicits.*
import io.gen4s.core.templating.{OutputTransformer, OutputValidator}
import io.gen4s.core.Domain.BootstrapServers
Expand All @@ -11,9 +15,12 @@ import eu.timepit.refined.pureconfig.*
import eu.timepit.refined.types.numeric.PosInt
import eu.timepit.refined.types.string.NonEmptyString
import pureconfig.*
import pureconfig.error.CannotConvert
import pureconfig.error.FailureReason
import pureconfig.generic.derivation.default.*
import pureconfig.module.enumeratum.*
import software.amazon.awssdk.endpoints.Endpoint
import software.amazon.awssdk.regions.Region

given ConfigReader[Topic] = ConfigReader.fromString { value =>
Topic(value).asRight[FailureReason]
Expand All @@ -25,6 +32,16 @@ given ConfigReader[BootstrapServers] = ConfigReader.fromString { value =>

given ConfigReader[KafkaProducerConfig] = ConfigReader.derived[KafkaProducerConfig]

given ConfigReader[Region] = ConfigReader.fromString { value =>
Region.of(value).asRight[FailureReason]
}

given ConfigReader[Endpoint] = ConfigReader.fromString { value =>
Try(new URI(value)).toEither
.map(u => Endpoint.builder().url(u).build)
.leftMap(e => CannotConvert(value, "Endpoint", e.getMessage))
}

final case class OutputConfig(
writer: Output,
transformers: Set[OutputTransformer] = Set.empty[OutputTransformer],
Expand Down
26 changes: 26 additions & 0 deletions app/src/test/scala/io/gen4s/test/OutputLoaderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import io.gen4s.outputs.StdOutput

import eu.timepit.refined.types.numeric.PosInt
import eu.timepit.refined.types.string.NonEmptyString
import software.amazon.awssdk.endpoints.Endpoint
import software.amazon.awssdk.regions.Region

class OutputLoaderTest extends AsyncFreeSpec with AsyncIOSpec with Matchers {

Expand Down Expand Up @@ -119,6 +121,30 @@ class OutputLoaderTest extends AsyncFreeSpec with AsyncIOSpec with Matchers {
)
}
}

"Load s3 output" in {
load[IO]("""
writer: {
type: s-3-output
bucket: "test-bucket"
key: "key-%s.json"
region: "us-east-1"
endpoint: "http://localhost:4566"
part-size-mb: 5
}
transformers = []
validators = []
""".stripMargin)
.asserting { out =>
out.writer shouldBe S3Output(
bucket = NonEmptyString.unsafeFrom("test-bucket"),
key = NonEmptyString.unsafeFrom("key-%s.json"),
region = Region.of("us-east-1"),
endpoint = Some(Endpoint.builder().url(new java.net.URI("http://localhost:4566")).build())
)
}
}
}

}
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ lazy val outputs = project
Dependencies.Fs2,
Dependencies.Fs2Kafka,
Dependencies.Fs2Io,
Dependencies.Fs2S3,
Dependencies.Sttp,
Dependencies.ApacheCommons,
Dependencies.Enumeratum,
Expand Down
101 changes: 101 additions & 0 deletions outputs/src/main/scala/io/gen4s/outputs/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,41 @@ import io.gen4s.core.Domain.*
import enumeratum.EnumEntry
import eu.timepit.refined.types.numeric.PosInt
import eu.timepit.refined.types.string.NonEmptyString
import fs2.aws.s3.models.Models.PartSizeMB
import software.amazon.awssdk.endpoints.Endpoint
import software.amazon.awssdk.regions.Region

sealed trait Output {
def description(): String
}

/**
* Represents the standard output.
*/
final case class StdOutput() extends Output {
override def description(): String = "std-output"
}

/**
* Base trait for Kafka outputs.
*/
trait KafkaOutputBase {
val topic: Topic
val headers: Map[String, String]
val batchSize: PosInt
val decodeInputAsKeyValue: Boolean
}

/**
* Represents a Kafka output.
*
* @param topic the topic to which the data will be published.
* @param bootstrapServers the bootstrap servers for the Kafka cluster.
* @param decodeInputAsKeyValue whether to decode the input as key-value pairs.
* @param headers the headers to be included in the Kafka message.
* @param batchSize the batch size for publishing messages.
* @param producerConfig the configuration for the Kafka producer.
*/
final case class KafkaOutput(
topic: Topic,
bootstrapServers: BootstrapServers,
Expand All @@ -41,6 +60,15 @@ final case class KafkaOutput(
override def description(): String = s"Kafka output: topic: $topic, bootstrap-servers: $bootstrapServers"
}

/**
* Represents the configuration for Avro serialization in Kafka.
*
* @param schemaRegistryUrl The URL of the schema registry. This is where Avro schemas are stored and retrieved.
* @param keySchema An optional file that contains the Avro schema for the key of the Kafka message. If not provided, the key is assumed to be a string.
* @param valueSchema An optional file that contains the Avro schema for the value of the Kafka message. If not provided, the value is assumed to be a string.
* @param autoRegisterSchemas A boolean flag that indicates whether to automatically register new schemas with the schema registry. If set to false, all schemas must be pre-registered.
* @param registryClientMaxCacheSize The maximum number of schemas that the client will cache from the schema registry. This can help improve performance by reducing the number of requests to the schema registry.
*/
final case class AvroConfig(
schemaRegistryUrl: String,
keySchema: Option[File] = None,
Expand All @@ -49,6 +77,17 @@ final case class AvroConfig(
registryClientMaxCacheSize: Int = 1000
)

/**
* Represents a Kafka output with Avro serialization.
*
* @param topic the topic to which the data will be published.
* @param bootstrapServers the bootstrap servers for the Kafka cluster.
* @param avroConfig the configuration for Avro serialization.
* @param decodeInputAsKeyValue whether to decode the input as key-value pairs.
* @param headers the headers to be included in the Kafka message.
* @param batchSize the batch size for publishing messages.
* @param producerConfig the configuration for the Kafka producer.
*/
final case class KafkaAvroOutput(
topic: Topic,
bootstrapServers: BootstrapServers,
Expand All @@ -60,23 +99,54 @@ final case class KafkaAvroOutput(
extends Output
with KafkaOutputBase {

/**
* Provides the Kafka producer configuration.
* If no configuration is provided, it defaults to the KafkaProducerConfig.default.
*
* @return the Kafka producer configuration.
*/
def kafkaProducerConfig: KafkaProducerConfig = producerConfig.getOrElse(KafkaProducerConfig.default)

override def description(): String = s"Kafka avro output: topic: $topic, bootstrap-servers: $bootstrapServers"
}

/**
* Represents the configuration for a Protobuf descriptor.
*
* @param file The file that contains the Protobuf descriptor.
* @param messageType The type of the message in the Protobuf descriptor (for ex. Person).
*/
final case class ProtobufDescriptorConfig(
file: File,
messageType: String
)

/**
* Represents the configuration for Protobuf serialization in Kafka.
*
* @param schemaRegistryUrl The URL of the schema registry. This is where Protobuf schemas are stored and retrieved.
* @param valueDescriptor The configuration for the Protobuf descriptor.
* @param autoRegisterSchemas A boolean flag that indicates whether to automatically register new schemas with the schema registry. If set to false, all schemas must be pre-registered.
* @param registryClientMaxCacheSize The maximum number of schemas that the client will cache from the schema registry. This can help improve performance by reducing the number of requests to the schema registry.
*/
final case class ProtobufConfig(
schemaRegistryUrl: String,
valueDescriptor: ProtobufDescriptorConfig,
autoRegisterSchemas: Boolean = false,
registryClientMaxCacheSize: Int = 1000
)

/**
* Represents a Kafka output with Protobuf serialization.
*
* @param topic the topic to which the data will be published.
* @param bootstrapServers the bootstrap servers for the Kafka cluster.
* @param protoConfig the configuration for Protobuf serialization.
* @param decodeInputAsKeyValue whether to decode the input as key-value pairs.
* @param headers the headers to be included in the Kafka message.
* @param batchSize the batch size for publishing messages.
* @param producerConfig the configuration for the Kafka producer.
*/
final case class KafkaProtobufOutput(
topic: Topic,
bootstrapServers: BootstrapServers,
Expand Down Expand Up @@ -113,6 +183,16 @@ object HttpContentTypes extends enumeratum.Enum[HttpContentTypes] {
case object TextXml extends HttpContentTypes("text/xml")
}

/**
* Represents an HTTP output.
*
* @param url the URL to which the data will be sent.
* @param method the HTTP method to be used (POST, PUT, etc.).
* @param parallelism the level of parallelism for sending data. Defaults to 1.
* @param headers the headers to be included in the HTTP request. Defaults to an empty map.
* @param contentType the content type of the HTTP request. Defaults to TextPlain.
* @param stopOnError a flag indicating whether to stop on error. If true, the process stops when an error occurs. Defaults to true.
*/
final case class HttpOutput(
url: String,
method: HttpMethods,
Expand All @@ -124,10 +204,31 @@ final case class HttpOutput(
override def description(): String = s"Http output: url: $url, method: $method"
}

/**
* Represents a File System output.
*
* @param dir the directory where the output file will be created.
* @param filenamePattern the pattern for the filename. The current system time in milliseconds is used to format the filename.
*/
final case class FsOutput(dir: NonEmptyString, filenamePattern: NonEmptyString) extends Output {

/**
* Constructs the path for the output file.
*
* @return the path of the output file.
*/
def path(): Path =
Paths.get(dir.value, FilenameUtils.getName(filenamePattern.value.format(System.currentTimeMillis())))

override def description(): String = s"File System output: path: ${path()}"
}

final case class S3Output(
bucket: NonEmptyString,
key: NonEmptyString,
region: Region,
endpoint: Option[Endpoint] = None,
partSizeMb: PartSizeMB = PartSizeMB.unsafeFrom(5))
extends Output {
override def description(): String = s"S3 output: region: $region, bucket: $bucket, key: $key"
}
41 changes: 21 additions & 20 deletions outputs/src/main/scala/io/gen4s/outputs/OutputStreamExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import cats.implicits.*
import io.gen4s.core.templating.Template
import io.gen4s.core.Domain.NumberOfSamplesToGenerate
import io.gen4s.outputs.processors.*
import io.gen4s.outputs.processors.kafka.{KafkaAvroOutputProcessor, KafkaOutputProcessor, KafkaProtobufOutputProcessor}
import io.gen4s.outputs.processors.aws.S3OutputProcessor
import io.gen4s.outputs.processors.kafka.*

import fs2.io.file.Files

Expand All @@ -28,6 +29,13 @@ trait OutputStreamExecutor[F[_]] {

object OutputStreamExecutor {

/**
* Factory method for creating an instance of OutputStreamExecutor.
*
* This method creates an instance of OutputStreamExecutor with a set of predefined processors for different output types.
*
* @return an instance of OutputStreamExecutor.
*/
def make[F[_]: Async: EffConsole: Files: Logger](): OutputStreamExecutor[F] = new OutputStreamExecutor[F] {

private val stdProcessor = new StdOutputProcessor[F]()
Expand All @@ -36,26 +44,19 @@ object OutputStreamExecutor {
private val kafkaAvroProcessor = new KafkaAvroOutputProcessor[F]()
private val kafkaProtobufProcessor = new KafkaProtobufOutputProcessor[F]()
private val httpOutputProcessor = new HttpOutputProcessor[F]()

override def write(n: NumberOfSamplesToGenerate, flow: fs2.Stream[F, Template], output: Output): F[Unit] =
output match {
case out: StdOutput => Logger[F].info("Writing data to std-output") *> stdProcessor.process(n, flow, out)
case out: FsOutput => Logger[F].info(s"Writing data file ${out.path()}") *> fsProcessor.process(n, flow, out)
case out: HttpOutput =>
Logger[F].info(s"Writing data to HTTP endpoint ${out.url}") *>
httpOutputProcessor.process(n, flow, out)

case out: KafkaOutput =>
Logger[F].info(s"Writing data to kafka brokers: ${out.bootstrapServers}, topic ${out.topic}") *>
kafkaProcessor.process(n, flow, out)

case out: KafkaAvroOutput =>
Logger[F].info(
s"Writing data to kafka brokers: ${out.bootstrapServers}, topic <${out.topic}>, registry: ${out.avroConfig.schemaRegistryUrl}"
) *>
kafkaAvroProcessor.process(n, flow, out)

private val s3OutputProcessor = new S3OutputProcessor[F]()

override def write(n: NumberOfSamplesToGenerate, flow: fs2.Stream[F, Template], output: Output): F[Unit] = {
val io = output match {
case out: StdOutput => stdProcessor.process(n, flow, out)
case out: FsOutput => fsProcessor.process(n, flow, out)
case out: HttpOutput => httpOutputProcessor.process(n, flow, out)
case out: KafkaOutput => kafkaProcessor.process(n, flow, out)
case out: KafkaAvroOutput => kafkaAvroProcessor.process(n, flow, out)
case out: KafkaProtobufOutput => kafkaProtobufProcessor.process(n, flow, out)
case out: S3Output => s3OutputProcessor.process(n, flow, out)
}
Logger[F].info(s"Writing data to ${output.description()}") *> io
}
}
}
Loading

0 comments on commit caaca05

Please sign in to comment.