diff --git a/README.md b/README.md index ee47945..9627d22 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ [![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAA4AAAAQCAMAAAARSr4IAAAAVFBMVEUAAACHjojlOy5NWlrKzcYRKjGFjIbp293YycuLa3pYY2LSqql4f3pCUFTgSjNodYRmcXUsPD/NTTbjRS+2jomhgnzNc223cGvZS0HaSD0XLjbaSjElhIr+AAAAAXRSTlMAQObYZgAAAHlJREFUCNdNyosOwyAIhWHAQS1Vt7a77/3fcxxdmv0xwmckutAR1nkm4ggbyEcg/wWmlGLDAA3oL50xi6fk5ffZ3E2E3QfZDCcCN2YtbEWZt+Drc6u6rlqv7Uk0LdKqqr5rk2UCRXOk0vmQKGfc94nOJyQjouF9H/wCc9gECEYfONoAAAAASUVORK5CYII=)](https://scala-steward.org) [![Coverage Status](https://coveralls.io/repos/github/xdev-developer/gen4s/badge.svg?branch=main)](https://coveralls.io/github/xdev-developer/gen4s?branch=main) +![Build status](https://github.com/xdev-developer/gen4s/actions/workflows/build.yml/badge.svg) + Gen4s is a powerful data generation tool designed for developers and QA engineers. Features: @@ -377,6 +379,29 @@ output { } ``` +### AWS S3 Output +```properties +output { + writer { + type = s-3-output + bucket = "test-bucket" + key = "key-%s.json" + region = "us-east-1" + endpoint = "http://localhost:4566" + part-size-mb = 5 + } + transformers = ["json-minify"] + validators = ["json", "missing-vars"] +} +``` + +The available options for configuring an S3 output are: + +- `bucket`: This is the name of the S3 bucket where the output data will be written. +- `key`: Represents the object key pattern. The `%s` is a placeholder that will be replaced unique identifier. +- `region`: This is the AWS region where the S3 bucket is located. +- `endpoint`: This is the URL of the S3 service endpoint. This can be useful for testing with local S3-compatible services like LocalStack. +- `part-size-mb`: This is used to specify the part size for multipart uploads to the S3 bucket. The value is in megabytes. #### Transformers diff --git a/app/src/main/scala/io/gen4s/conf/OutputConfig.scala b/app/src/main/scala/io/gen4s/conf/OutputConfig.scala index d0370d1..644e101 100644 --- a/app/src/main/scala/io/gen4s/conf/OutputConfig.scala +++ b/app/src/main/scala/io/gen4s/conf/OutputConfig.scala @@ -1,5 +1,9 @@ package io.gen4s.conf +import java.net.URI + +import scala.util.Try + import cats.implicits.* import io.gen4s.core.templating.{OutputTransformer, OutputValidator} import io.gen4s.core.Domain.BootstrapServers @@ -11,9 +15,12 @@ import eu.timepit.refined.pureconfig.* import eu.timepit.refined.types.numeric.PosInt import eu.timepit.refined.types.string.NonEmptyString import pureconfig.* +import pureconfig.error.CannotConvert import pureconfig.error.FailureReason import pureconfig.generic.derivation.default.* import pureconfig.module.enumeratum.* +import software.amazon.awssdk.endpoints.Endpoint +import software.amazon.awssdk.regions.Region given ConfigReader[Topic] = ConfigReader.fromString { value => Topic(value).asRight[FailureReason] @@ -25,6 +32,16 @@ given ConfigReader[BootstrapServers] = ConfigReader.fromString { value => given ConfigReader[KafkaProducerConfig] = ConfigReader.derived[KafkaProducerConfig] +given ConfigReader[Region] = ConfigReader.fromString { value => + Region.of(value).asRight[FailureReason] +} + +given ConfigReader[Endpoint] = ConfigReader.fromString { value => + Try(new URI(value)).toEither + .map(u => Endpoint.builder().url(u).build) + .leftMap(e => CannotConvert(value, "Endpoint", e.getMessage)) +} + final case class OutputConfig( writer: Output, transformers: Set[OutputTransformer] = Set.empty[OutputTransformer], diff --git a/app/src/test/scala/io/gen4s/test/OutputLoaderTest.scala b/app/src/test/scala/io/gen4s/test/OutputLoaderTest.scala index 7383b57..53d27e8 100644 --- a/app/src/test/scala/io/gen4s/test/OutputLoaderTest.scala +++ b/app/src/test/scala/io/gen4s/test/OutputLoaderTest.scala @@ -14,6 +14,8 @@ import io.gen4s.outputs.StdOutput import eu.timepit.refined.types.numeric.PosInt import eu.timepit.refined.types.string.NonEmptyString +import software.amazon.awssdk.endpoints.Endpoint +import software.amazon.awssdk.regions.Region class OutputLoaderTest extends AsyncFreeSpec with AsyncIOSpec with Matchers { @@ -119,6 +121,30 @@ class OutputLoaderTest extends AsyncFreeSpec with AsyncIOSpec with Matchers { ) } } + + "Load s3 output" in { + load[IO](""" + writer: { + type: s-3-output + bucket: "test-bucket" + key: "key-%s.json" + region: "us-east-1" + endpoint: "http://localhost:4566" + part-size-mb: 5 + } + + transformers = [] + validators = [] + """.stripMargin) + .asserting { out => + out.writer shouldBe S3Output( + bucket = NonEmptyString.unsafeFrom("test-bucket"), + key = NonEmptyString.unsafeFrom("key-%s.json"), + region = Region.of("us-east-1"), + endpoint = Some(Endpoint.builder().url(new java.net.URI("http://localhost:4566")).build()) + ) + } + } } } diff --git a/build.sbt b/build.sbt index 9e90fca..14d3472 100644 --- a/build.sbt +++ b/build.sbt @@ -62,6 +62,7 @@ lazy val outputs = project Dependencies.Fs2, Dependencies.Fs2Kafka, Dependencies.Fs2Io, + Dependencies.Fs2S3, Dependencies.Sttp, Dependencies.ApacheCommons, Dependencies.Enumeratum, diff --git a/outputs/src/main/scala/io/gen4s/outputs/Output.scala b/outputs/src/main/scala/io/gen4s/outputs/Output.scala index 8c900f7..0917902 100644 --- a/outputs/src/main/scala/io/gen4s/outputs/Output.scala +++ b/outputs/src/main/scala/io/gen4s/outputs/Output.scala @@ -10,15 +10,24 @@ import io.gen4s.core.Domain.* import enumeratum.EnumEntry import eu.timepit.refined.types.numeric.PosInt import eu.timepit.refined.types.string.NonEmptyString +import fs2.aws.s3.models.Models.PartSizeMB +import software.amazon.awssdk.endpoints.Endpoint +import software.amazon.awssdk.regions.Region sealed trait Output { def description(): String } +/** + * Represents the standard output. + */ final case class StdOutput() extends Output { override def description(): String = "std-output" } +/** + * Base trait for Kafka outputs. + */ trait KafkaOutputBase { val topic: Topic val headers: Map[String, String] @@ -26,6 +35,16 @@ trait KafkaOutputBase { val decodeInputAsKeyValue: Boolean } +/** + * Represents a Kafka output. + * + * @param topic the topic to which the data will be published. + * @param bootstrapServers the bootstrap servers for the Kafka cluster. + * @param decodeInputAsKeyValue whether to decode the input as key-value pairs. + * @param headers the headers to be included in the Kafka message. + * @param batchSize the batch size for publishing messages. + * @param producerConfig the configuration for the Kafka producer. + */ final case class KafkaOutput( topic: Topic, bootstrapServers: BootstrapServers, @@ -41,6 +60,15 @@ final case class KafkaOutput( override def description(): String = s"Kafka output: topic: $topic, bootstrap-servers: $bootstrapServers" } +/** + * Represents the configuration for Avro serialization in Kafka. + * + * @param schemaRegistryUrl The URL of the schema registry. This is where Avro schemas are stored and retrieved. + * @param keySchema An optional file that contains the Avro schema for the key of the Kafka message. If not provided, the key is assumed to be a string. + * @param valueSchema An optional file that contains the Avro schema for the value of the Kafka message. If not provided, the value is assumed to be a string. + * @param autoRegisterSchemas A boolean flag that indicates whether to automatically register new schemas with the schema registry. If set to false, all schemas must be pre-registered. + * @param registryClientMaxCacheSize The maximum number of schemas that the client will cache from the schema registry. This can help improve performance by reducing the number of requests to the schema registry. + */ final case class AvroConfig( schemaRegistryUrl: String, keySchema: Option[File] = None, @@ -49,6 +77,17 @@ final case class AvroConfig( registryClientMaxCacheSize: Int = 1000 ) +/** + * Represents a Kafka output with Avro serialization. + * + * @param topic the topic to which the data will be published. + * @param bootstrapServers the bootstrap servers for the Kafka cluster. + * @param avroConfig the configuration for Avro serialization. + * @param decodeInputAsKeyValue whether to decode the input as key-value pairs. + * @param headers the headers to be included in the Kafka message. + * @param batchSize the batch size for publishing messages. + * @param producerConfig the configuration for the Kafka producer. + */ final case class KafkaAvroOutput( topic: Topic, bootstrapServers: BootstrapServers, @@ -60,16 +99,36 @@ final case class KafkaAvroOutput( extends Output with KafkaOutputBase { + /** + * Provides the Kafka producer configuration. + * If no configuration is provided, it defaults to the KafkaProducerConfig.default. + * + * @return the Kafka producer configuration. + */ def kafkaProducerConfig: KafkaProducerConfig = producerConfig.getOrElse(KafkaProducerConfig.default) override def description(): String = s"Kafka avro output: topic: $topic, bootstrap-servers: $bootstrapServers" } +/** + * Represents the configuration for a Protobuf descriptor. + * + * @param file The file that contains the Protobuf descriptor. + * @param messageType The type of the message in the Protobuf descriptor (for ex. Person). + */ final case class ProtobufDescriptorConfig( file: File, messageType: String ) +/** + * Represents the configuration for Protobuf serialization in Kafka. + * + * @param schemaRegistryUrl The URL of the schema registry. This is where Protobuf schemas are stored and retrieved. + * @param valueDescriptor The configuration for the Protobuf descriptor. + * @param autoRegisterSchemas A boolean flag that indicates whether to automatically register new schemas with the schema registry. If set to false, all schemas must be pre-registered. + * @param registryClientMaxCacheSize The maximum number of schemas that the client will cache from the schema registry. This can help improve performance by reducing the number of requests to the schema registry. + */ final case class ProtobufConfig( schemaRegistryUrl: String, valueDescriptor: ProtobufDescriptorConfig, @@ -77,6 +136,17 @@ final case class ProtobufConfig( registryClientMaxCacheSize: Int = 1000 ) +/** + * Represents a Kafka output with Protobuf serialization. + * + * @param topic the topic to which the data will be published. + * @param bootstrapServers the bootstrap servers for the Kafka cluster. + * @param protoConfig the configuration for Protobuf serialization. + * @param decodeInputAsKeyValue whether to decode the input as key-value pairs. + * @param headers the headers to be included in the Kafka message. + * @param batchSize the batch size for publishing messages. + * @param producerConfig the configuration for the Kafka producer. + */ final case class KafkaProtobufOutput( topic: Topic, bootstrapServers: BootstrapServers, @@ -113,6 +183,16 @@ object HttpContentTypes extends enumeratum.Enum[HttpContentTypes] { case object TextXml extends HttpContentTypes("text/xml") } +/** + * Represents an HTTP output. + * + * @param url the URL to which the data will be sent. + * @param method the HTTP method to be used (POST, PUT, etc.). + * @param parallelism the level of parallelism for sending data. Defaults to 1. + * @param headers the headers to be included in the HTTP request. Defaults to an empty map. + * @param contentType the content type of the HTTP request. Defaults to TextPlain. + * @param stopOnError a flag indicating whether to stop on error. If true, the process stops when an error occurs. Defaults to true. + */ final case class HttpOutput( url: String, method: HttpMethods, @@ -124,10 +204,31 @@ final case class HttpOutput( override def description(): String = s"Http output: url: $url, method: $method" } +/** + * Represents a File System output. + * + * @param dir the directory where the output file will be created. + * @param filenamePattern the pattern for the filename. The current system time in milliseconds is used to format the filename. + */ final case class FsOutput(dir: NonEmptyString, filenamePattern: NonEmptyString) extends Output { + /** + * Constructs the path for the output file. + * + * @return the path of the output file. + */ def path(): Path = Paths.get(dir.value, FilenameUtils.getName(filenamePattern.value.format(System.currentTimeMillis()))) override def description(): String = s"File System output: path: ${path()}" } + +final case class S3Output( + bucket: NonEmptyString, + key: NonEmptyString, + region: Region, + endpoint: Option[Endpoint] = None, + partSizeMb: PartSizeMB = PartSizeMB.unsafeFrom(5)) + extends Output { + override def description(): String = s"S3 output: region: $region, bucket: $bucket, key: $key" +} diff --git a/outputs/src/main/scala/io/gen4s/outputs/OutputStreamExecutor.scala b/outputs/src/main/scala/io/gen4s/outputs/OutputStreamExecutor.scala index 7d5ccf1..6d68194 100644 --- a/outputs/src/main/scala/io/gen4s/outputs/OutputStreamExecutor.scala +++ b/outputs/src/main/scala/io/gen4s/outputs/OutputStreamExecutor.scala @@ -8,7 +8,8 @@ import cats.implicits.* import io.gen4s.core.templating.Template import io.gen4s.core.Domain.NumberOfSamplesToGenerate import io.gen4s.outputs.processors.* -import io.gen4s.outputs.processors.kafka.{KafkaAvroOutputProcessor, KafkaOutputProcessor, KafkaProtobufOutputProcessor} +import io.gen4s.outputs.processors.aws.S3OutputProcessor +import io.gen4s.outputs.processors.kafka.* import fs2.io.file.Files @@ -28,6 +29,13 @@ trait OutputStreamExecutor[F[_]] { object OutputStreamExecutor { + /** + * Factory method for creating an instance of OutputStreamExecutor. + * + * This method creates an instance of OutputStreamExecutor with a set of predefined processors for different output types. + * + * @return an instance of OutputStreamExecutor. + */ def make[F[_]: Async: EffConsole: Files: Logger](): OutputStreamExecutor[F] = new OutputStreamExecutor[F] { private val stdProcessor = new StdOutputProcessor[F]() @@ -36,26 +44,19 @@ object OutputStreamExecutor { private val kafkaAvroProcessor = new KafkaAvroOutputProcessor[F]() private val kafkaProtobufProcessor = new KafkaProtobufOutputProcessor[F]() private val httpOutputProcessor = new HttpOutputProcessor[F]() - - override def write(n: NumberOfSamplesToGenerate, flow: fs2.Stream[F, Template], output: Output): F[Unit] = - output match { - case out: StdOutput => Logger[F].info("Writing data to std-output") *> stdProcessor.process(n, flow, out) - case out: FsOutput => Logger[F].info(s"Writing data file ${out.path()}") *> fsProcessor.process(n, flow, out) - case out: HttpOutput => - Logger[F].info(s"Writing data to HTTP endpoint ${out.url}") *> - httpOutputProcessor.process(n, flow, out) - - case out: KafkaOutput => - Logger[F].info(s"Writing data to kafka brokers: ${out.bootstrapServers}, topic ${out.topic}") *> - kafkaProcessor.process(n, flow, out) - - case out: KafkaAvroOutput => - Logger[F].info( - s"Writing data to kafka brokers: ${out.bootstrapServers}, topic <${out.topic}>, registry: ${out.avroConfig.schemaRegistryUrl}" - ) *> - kafkaAvroProcessor.process(n, flow, out) - + private val s3OutputProcessor = new S3OutputProcessor[F]() + + override def write(n: NumberOfSamplesToGenerate, flow: fs2.Stream[F, Template], output: Output): F[Unit] = { + val io = output match { + case out: StdOutput => stdProcessor.process(n, flow, out) + case out: FsOutput => fsProcessor.process(n, flow, out) + case out: HttpOutput => httpOutputProcessor.process(n, flow, out) + case out: KafkaOutput => kafkaProcessor.process(n, flow, out) + case out: KafkaAvroOutput => kafkaAvroProcessor.process(n, flow, out) case out: KafkaProtobufOutput => kafkaProtobufProcessor.process(n, flow, out) + case out: S3Output => s3OutputProcessor.process(n, flow, out) } + Logger[F].info(s"Writing data to ${output.description()}") *> io + } } } diff --git a/outputs/src/main/scala/io/gen4s/outputs/processors/aws/S3OutputProcessor.scala b/outputs/src/main/scala/io/gen4s/outputs/processors/aws/S3OutputProcessor.scala new file mode 100644 index 0000000..b67f0bd --- /dev/null +++ b/outputs/src/main/scala/io/gen4s/outputs/processors/aws/S3OutputProcessor.scala @@ -0,0 +1,53 @@ +package io.gen4s.outputs.processors.aws + +import java.util.UUID + +import org.typelevel.log4cats.Logger + +import cats.effect.kernel.Async +import io.gen4s.core.templating.Template +import io.gen4s.core.Domain +import io.gen4s.outputs.processors.OutputProcessor +import io.gen4s.outputs.S3Output +import io.laserdisc.pure.s3.tagless.{Interpreter as S3Interpreter, S3AsyncClientOp} + +import eu.timepit.refined.types.string.NonEmptyString +import fs2.aws.s3.models.Models.{BucketName, FileKey} +import fs2.aws.s3.S3 +import software.amazon.awssdk.services.s3.S3AsyncClient + +class S3OutputProcessor[F[_]: Async: Logger] extends OutputProcessor[F, S3Output] { + + override def process( + n: Domain.NumberOfSamplesToGenerate, + flow: fs2.Stream[F, Template], + output: S3Output): F[Unit] = { + + s3Resource(output).map(S3.create[F]).use { s3 => + flow + .flatMap { t => + fs2.Stream + .emits(t.render().asByteArray) + .through( + s3.uploadFileMultipart(BucketName(output.bucket), formatKey(output.key), partSize = output.partSizeMb) + ) + .evalMap(t => Logger[F].debug(s"eTag: $t")) + } + .compile + .drain + } + } + + private def formatKey(key: NonEmptyString): FileKey = { + FileKey(NonEmptyString.unsafeFrom(key.value.format(UUID.randomUUID().toString))) + } + + private def s3Resource(output: S3Output) = { + S3Interpreter[F].S3AsyncClientOpResource { + output.endpoint match { + case Some(e) => S3AsyncClient.builder().endpointOverride(e.url()).region(output.region) + case None => S3AsyncClient.builder().region(output.region) + } + } + } +} diff --git a/outputs/src/test/scala/io/gen4s/test/outputs/AWSOutputStreamTest.scala b/outputs/src/test/scala/io/gen4s/test/outputs/AWSOutputStreamTest.scala new file mode 100644 index 0000000..8e8e2a2 --- /dev/null +++ b/outputs/src/test/scala/io/gen4s/test/outputs/AWSOutputStreamTest.scala @@ -0,0 +1,108 @@ +package io.gen4s.test.outputs + +import java.net.URI + +import org.scalatest.funspec.AsyncFunSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.OptionValues +import org.testcontainers.containers.localstack.LocalStackContainer as JavaLocalStackContainer +import org.typelevel.log4cats.slf4j.Slf4jLogger +import org.typelevel.log4cats.Logger + +import com.dimafeng.testcontainers.scalatest.TestContainersForAll +import com.dimafeng.testcontainers.LocalStackV2Container + +import cats.data.NonEmptyList +import cats.effect.{IO, Sync} +import cats.effect.testing.scalatest.AsyncIOSpec +import io.gen4s.core.generators.Variable +import io.gen4s.core.streams.GeneratorStream +import io.gen4s.core.templating.{SourceTemplate, TemplateBuilder} +import io.gen4s.core.Domain.NumberOfSamplesToGenerate +import io.gen4s.generators.impl.TimestampGenerator +import io.gen4s.outputs.{OutputStreamExecutor, S3Output} +import io.laserdisc.pure.s3.tagless.{Interpreter as S3Interpreter, S3AsyncClientOp} + +import eu.timepit.refined.types.string.NonEmptyString +import software.amazon.awssdk.endpoints.Endpoint +import software.amazon.awssdk.services.s3.model.{CreateBucketRequest, ListObjectsRequest} +import software.amazon.awssdk.services.s3.S3AsyncClient + +class AWSOutputStreamTest + extends AsyncFunSpec + with AsyncIOSpec + with TestContainersForAll + with Matchers + with OptionValues { + + private val template = SourceTemplate("{ timestamp: {{ts}} }") + + override type Containers = LocalStackV2Container + + override def startContainers(): Containers = { + LocalStackV2Container + .Def( + services = List(JavaLocalStackContainer.Service.S3) + ) + .start() + } + + implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] + + describe("AWS output stream") { + + it("Send records to AWS S3 bucket") { + withContainers { localStack => + + val credentials = localStack.staticCredentialsProvider.resolveCredentials() + + System.setProperty("aws.accessKeyId", credentials.accessKeyId()) + System.setProperty("aws.secretAccessKey", credentials.secretAccessKey()) + + val executor = OutputStreamExecutor.make[IO]() + + val builder = TemplateBuilder.make( + NonEmptyList.one(template), + List(TimestampGenerator(Variable("ts"))) + ) + + val output = S3Output( + bucket = NonEmptyString.unsafeFrom("test-bucket"), + key = NonEmptyString.unsafeFrom("test-key-%s.json"), + region = localStack.region, + endpoint = Some( + Endpoint + .builder() + .url(localStack.endpointOverride(JavaLocalStackContainer.Service.S3)) + .build() + ) + ) + + val n = NumberOfSamplesToGenerate(5) + + s3Resource(output, localStack.endpointOverride(JavaLocalStackContainer.Service.S3)).use { s3 => + for { + _ <- createBucket(s3, output.bucket) + _ <- executor.write(n, GeneratorStream.stream[IO](n, builder), output) + objects <- listObjects(s3, output.bucket) + } yield objects.contents().size() shouldBe n.value + } + } + } + } + + private def s3Resource(output: S3Output, endpoint: URI) = S3Interpreter[IO] + .S3AsyncClientOpResource( + S3AsyncClient + .builder() + .endpointOverride(endpoint) + .region(output.region) + ) + + private def createBucket(s3: S3AsyncClientOp[IO], bucket: NonEmptyString) = + s3.createBucket(CreateBucketRequest.builder().bucket(bucket.value).build()) + + private def listObjects(s3: S3AsyncClientOp[IO], bucket: NonEmptyString) = + s3.listObjects(ListObjectsRequest.builder().bucket(bucket.value).build()) + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2229d3c..30f1015 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -64,6 +64,10 @@ object Dependencies { "com.github.fd4s" %% "fs2-kafka-vulcan" % V.fs2Kafka ) + val Fs2S3: Seq[ModuleID] = List( + "io.laserdisc" %% "fs2-aws-s3" % "6.1.2" + ) + val CirceCore = circe("core") val CirceParser = circe("parser") val CirceRefined = circe("refined") @@ -113,6 +117,7 @@ object Dependencies { val AvroConverter: Seq[ModuleID] = List( "io.github.agolovenko" % "avro-tools-json_2.13" % "0.8.0" + exclude ("org.scala-lang.modules", "scala-collection-compat_2.13") ) val Monocle: Seq[ModuleID] = List( @@ -132,8 +137,9 @@ object Dependencies { val CatsEffectTest: Seq[ModuleID] = List("org.typelevel" %% "cats-effect-testing-scalatest" % "1.5.0" % Test) val TestContainers: Seq[ModuleID] = List( - "com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainers % Test, - "com.dimafeng" %% "testcontainers-scala-kafka" % V.testContainers % Test + "com.dimafeng" %% "testcontainers-scala-scalatest" % V.testContainers % Test, + "com.dimafeng" %% "testcontainers-scala-kafka" % V.testContainers % Test, + "com.dimafeng" %% "testcontainers-scala-localstack-v2" % V.testContainers % Test ) // Runtime