Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrating s3 put to tm upload #28

Merged
merged 3 commits into from
Dec 5, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions s3/src/main/scala/blobstore/s3/S3Store.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,19 @@ import cats.syntax.flatMap._
import fs2.{Chunk, Sink, Stream}

import scala.collection.JavaConverters._
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.{CopyObjectRequest, ListObjectsRequest, ObjectListing, ObjectMetadata}
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}

import scala.concurrent.ExecutionContext

case class S3Store[F[_] : ConcurrentEffect : ContextShift](s3: AmazonS3, sse: Option[String] = None, blockingExecutionContext: ExecutionContext)(implicit F: Effect[F]) extends Store[F] {
case class S3Store[F[_] : ConcurrentEffect : ContextShift](tm: TransferManager, sse: Option[String] = None, blockingExecutionContext: ExecutionContext)(implicit F: Effect[F]) extends Store[F] {

final val s3: AmazonS3 = tm.getAmazonS3Client
loran-steinberger marked this conversation as resolved.
Show resolved Hide resolved

private def _chunk(ol: ObjectListing): Chunk[Path] = {
val dirs: Chunk[Path] = Chunk.seq(ol.getCommonPrefixes.asScala.map(s =>
Path(ol.getBucketName, s.dropRight(1), None, true, None)
Path(ol.getBucketName, s.dropRight(1), None, isDir = true, None)
))
val files: Chunk[Path] = Chunk.seq(ol.getObjectSummaries.asScala.map(o =>
Path(o.getBucketName, o.getKey, Option(o.getSize), isDir = false, Option(o.getLastModified))
Expand Down Expand Up @@ -79,7 +82,7 @@ case class S3Store[F[_] : ConcurrentEffect : ContextShift](s3: AmazonS3, sse: Op
val meta = new ObjectMetadata()
path.size.foreach(meta.setContentLength)
sse.foreach(meta.setSSEAlgorithm)
s3.putObject(path.root, path.key, ios._2, meta)
tm.upload(path.root, path.key, ios._2, meta).waitForCompletion()
()
})

Expand Down Expand Up @@ -116,17 +119,17 @@ object S3Store {
/**
* Safely initialize S3Store and shutdown Amazon S3 client upon finish.
*
* @param fa F[AmazonS3] how to connect AWS S3 client
* @param fa F[TransferManager] how to connect AWS S3 client
* @param sse Boolean true to force all writes to use SSE algorithm ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION
* @return Stream[ F, S3Store[F] ] stream with one S3Store, AmazonS3 client will disconnect once stream is done.
*/
def apply[F[_] : ContextShift](fa: F[AmazonS3], sse: Boolean, blockingExecutionContext: ExecutionContext)(implicit F: ConcurrentEffect[F]): Stream[F, S3Store[F]] = {
def apply[F[_] : ContextShift](fa: F[TransferManager], sse: Boolean, blockingExecutionContext: ExecutionContext)(implicit F: ConcurrentEffect[F]): Stream[F, S3Store[F]] = {
loran-steinberger marked this conversation as resolved.
Show resolved Hide resolved
val opt = if (sse) Option(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION) else None
apply(fa, opt, blockingExecutionContext)
}

/**
* Safely initialize S3Store using AmazonS3ClientBuilder.standard() and shutdown client upon finish.
* Safely initialize S3Store using TransferManagerBuilder.standard() and shutdown client upon finish.
*
* NOTICE: Standard S3 client builder uses the Default Credential Provider Chain, see docs on how to authenticate:
* https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
Expand All @@ -136,11 +139,11 @@ object S3Store {
*/
def apply[F[_] : ContextShift](sse: Boolean, blockingExecutionContext: ExecutionContext)(implicit F: ConcurrentEffect[F]): Stream[F, S3Store[F]] = {
val opt = if (sse) Option(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION) else None
apply(F.delay(AmazonS3ClientBuilder.standard().build()), opt, blockingExecutionContext)
apply(F.delay(TransferManagerBuilder.standard().build()), opt, blockingExecutionContext)
}

/**
* Safely initialize S3Store using AmazonS3ClientBuilder.standard() and shutdown client upon finish.
* Safely initialize S3Store using TransferManagerBuilder.standard() and shutdown client upon finish.
*
* NOTICE: Standard S3 client builder uses the Default Credential Provider Chain, see docs on how to authenticate:
* https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
Expand All @@ -149,32 +152,32 @@ object S3Store {
* @return Stream[ F, S3Store[F] ] stream with one S3Store, AmazonS3 client will disconnect once stream is done.
*/
def apply[F[_] : ContextShift](sse: String, blockingExecutionContext: ExecutionContext)(implicit F: ConcurrentEffect[F]): Stream[F, S3Store[F]] =
apply(F.delay(AmazonS3ClientBuilder.standard().build()), Option(sse), blockingExecutionContext)
apply(F.delay(TransferManagerBuilder.standard().build()), Option(sse), blockingExecutionContext)


/**
* Safely initialize S3Store using AmazonS3ClientBuilder.standard() and shutdown client upon finish.
* Safely initialize S3Store using TransferManagerBuilder.standard() and shutdown client upon finish.
*
* NOTICE: Standard S3 client builder uses the Default Credential Provider Chain, see docs on how to authenticate:
* https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html
*
* @return Stream[ F, S3Store[F] ] stream with one S3Store, AmazonS3 client will disconnect once stream is done.
*/
def apply[F[_] : ContextShift](blockingExecutionContext: ExecutionContext)(implicit F: ConcurrentEffect[F]): Stream[F, S3Store[F]] =
apply(F.delay(AmazonS3ClientBuilder.standard().build()), None, blockingExecutionContext)
apply(F.delay(TransferManagerBuilder.standard().build()), None, blockingExecutionContext)



/**
* Safely initialize S3Store and shutdown Amazon S3 client upon finish.
*
* @param fa F[AmazonS3] how to connect AWS S3 client
* @param fa F[TransferManager] how to connect AWS S3 client
* @param sse Option[String] sse algorithm
* @return Stream[ F, S3Store[F] ] stream with one S3Store, AmazonS3 client will disconnect once stream is done.
*/
def apply[F[_]: ContextShift](fa: F[AmazonS3], sse: Option[String], blockingExecutionContext: ExecutionContext)(implicit F: ConcurrentEffect[F])
def apply[F[_]: ContextShift](fa: F[TransferManager], sse: Option[String], blockingExecutionContext: ExecutionContext)(implicit F: ConcurrentEffect[F])
: Stream[F, S3Store[F]] = {
fs2.Stream.bracket(fa)(client => F.delay(client.shutdown())).flatMap {
fs2.Stream.bracket(fa)(client => F.delay(client.getAmazonS3Client.shutdown())).flatMap {
client => {
fs2.Stream.eval(F.delay(S3Store[F](client, sse, blockingExecutionContext)))
}
Expand Down
10 changes: 7 additions & 3 deletions s3/src/test/scala/blobstore/s3/S3StoreTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,28 @@ import com.amazonaws.ClientConfiguration
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.transfer.{TransferManager, TransferManagerBuilder}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}

class S3StoreTest extends AbstractStoreTest {

val credentials = new BasicAWSCredentials("my_access_key", "my_secret_key")
val clientConfiguration = new ClientConfiguration()
clientConfiguration.setSignerOverride("AWSS3V4SignerType")
val minioHost = Option(System.getenv("BLOBSTORE_MINIO_HOST")).getOrElse("minio-container")
val minioPort = Option(System.getenv("BLOBSTORE_MINIO_PORT")).getOrElse("9000")
val minioHost: String = Option(System.getenv("BLOBSTORE_MINIO_HOST")).getOrElse("minio-container")
val minioPort: String = Option(System.getenv("BLOBSTORE_MINIO_PORT")).getOrElse("9000")
private val client: AmazonS3 = AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
s"http://$minioHost:$minioPort", Regions.US_EAST_1.name()))
.withPathStyleAccessEnabled(true)
.withClientConfiguration(clientConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build()
private val tm: TransferManager = TransferManagerBuilder.standard()
loran-steinberger marked this conversation as resolved.
Show resolved Hide resolved
.withS3Client(client)
.build()

override val store: Store[IO] = S3Store[IO](client, blockingExecutionContext = blockingExecutionContext)
override val store: Store[IO] = S3Store[IO](tm, blockingExecutionContext = blockingExecutionContext)
override val root: String = "blobstore-test-bucket"

override def beforeAll(): Unit = {
Expand Down