From f919244daa6ce5968fbcea6e6842c6117e7ad6fe Mon Sep 17 00:00:00 2001 From: Anand Date: Thu, 4 Oct 2018 13:27:27 +0530 Subject: [PATCH] Issue #SB-7857 chore: Implement multipart uploads for large files --- .../cloud/storage/BaseStorageService.scala | 85 +++++++++++-------- .../cloud/storage/IStorageService.scala | 29 +++---- .../service/TestAzureStorageService.scala | 14 +-- .../service/TestS3StorageService.scala | 13 ++- 4 files changed, 81 insertions(+), 60 deletions(-) diff --git a/src/main/scala/org/sunbird/cloud/storage/BaseStorageService.scala b/src/main/scala/org/sunbird/cloud/storage/BaseStorageService.scala index 6f20140..7a2d710 100644 --- a/src/main/scala/org/sunbird/cloud/storage/BaseStorageService.scala +++ b/src/main/scala/org/sunbird/cloud/storage/BaseStorageService.scala @@ -13,6 +13,7 @@ import collection.JavaConverters._ import org.jclouds.blobstore.options.ListContainerOptions.Builder.prefix import org.sunbird.cloud.storage.Model.Blob import org.jclouds.blobstore.options.CopyOptions +import org.jclouds.blobstore.options.PutOptions.Builder.multipart import org.sunbird.cloud.storage.conf.AppConf trait BaseStorageService extends IStorageService { @@ -25,41 +26,43 @@ trait BaseStorageService extends IStorageService { var maxContentLength = 0 val tika = new Tika() - override def upload(container: String, file: String, objectKey: String, isPublic: Option[Boolean] = Option(false), isDirectory: Option[Boolean] = Option(false), ttl: Option[Int] = None, retryCount: Option[Int] = None): String = { + override def upload(container: String, file: String, objectKey: String, isPublic: Boolean = false, + isDirectory: Boolean = false, ttl: Option[Int] = None, + retryCount: Option[Int] = None): String = { try { - if(isDirectory.get) { + if(isDirectory) { val d = new File(file) val files = if (d.exists && d.isDirectory) { - d.listFiles.filter(_.isFile).toList; + d.listFiles.filter(_.isFile).toList } else { - List[File](); + List[File]() } val list = files.map {f => val key = objectKey + f.getName.split("/").last upload(container, f.getAbsolutePath, key) } list.toString() - } - else { + } else { if (attempt == retryCount.getOrElse(maxRetries)) { - val message = s"Failed to upload. file: $file, key: $objectKey, attempt: $attempt, maxAttempts: $retryCount. Exceeded maximum number of retries" + val message = s"Failed to upload. file: $file, key: $objectKey, attempt: $attempt, " + + s"maxAttempts: $retryCount. Exceeded maximum number of retries" throw new StorageServiceException(message) } blobStore.createContainerInLocation(null, container) val fileObj = new File(file) val payload = Files.asByteSource(fileObj) - val contentType = tika.detect(fileObj) - val blob = blobStore.blobBuilder(objectKey).payload(payload).contentType(contentType).contentLength(payload.size()).build() - blobStore.putBlob(container, blob) - if (isPublic.get) { + val contentType = tika.detect(fileObj) + val blob = blobStore.blobBuilder(objectKey).payload(payload) + .contentType(contentType).contentLength(payload.size()).build() + blobStore.putBlob(container, blob, multipart()) + if (isPublic) { getSignedURL(container, objectKey, Option(ttl.getOrElse(maxSignedurlTTL))) } else blobStore.getBlob(container, objectKey).getMetadata.getUri.toString } - } - catch { + } catch { case e: Exception => { Thread.sleep(attempt*2000) attempt += 1 @@ -68,12 +71,15 @@ trait BaseStorageService extends IStorageService { } } - override def put(container: String, content: Array[Byte], objectKey: String, isPublic: Option[Boolean] = Option(false), isDirectory: Option[Boolean] = Option(false), ttl: Option[Int] = None, retryCount: Option[Int] = None): String = { + override def put(container: String, content: Array[Byte], objectKey: String, + isPublic: Option[Boolean] = Option(false), isDirectory: Option[Boolean] = Option(false), + ttl: Option[Int] = None, retryCount: Option[Int] = None): String = { try { if (attempt == retryCount.getOrElse(maxRetries)) { - val message = s"Failed to upload. key: $objectKey, attempt: $attempt, maxAttempts: $retryCount. Exceeded maximum number of retries" + val message = s"Failed to upload. key: $objectKey, attempt: $attempt, " + + s"maxAttempts: $retryCount. Exceeded maximum number of retries" throw new StorageServiceException(message) } @@ -94,29 +100,31 @@ trait BaseStorageService extends IStorageService { } } - override def getSignedURL(container: String, objectKey: String, ttl: Option[Int] = None, permission: Option[String] = Option("r")): String = { + override def getSignedURL(container: String, objectKey: String, ttl: Option[Int] = None, + permission: Option[String] = Option("r")): String = { if (permission.getOrElse("").equalsIgnoreCase("w")) { - context.getSigner.signPutBlob(container, blobStore.blobBuilder(objectKey).forSigning().contentLength(maxContentLength).build(), 600l).getEndpoint.toString + context.getSigner.signPutBlob(container, blobStore.blobBuilder(objectKey).forSigning(). + contentLength(maxContentLength).build(), 600l).getEndpoint.toString } else { context.getSigner.signGetBlob(container, objectKey, ttl.getOrElse(maxSignedurlTTL)).getEndpoint.toString } } - override def download(container: String, objectKey: String, localPath: String, isDirectory: Option[Boolean] = Option(false)) = { + override def download(container: String, objectKey: String, localPath: String, isDirectory: Boolean = false): Unit = { try { - if(isDirectory.get) { + if(isDirectory) { val objects = listObjectKeys(container, objectKey, isDirectory) for (obj <- objects) { - val file = FilenameUtils.getName(obj); + val file = FilenameUtils.getName(obj) val fileObj = blobStore.getBlob(container, obj) - val downloadPath = localPath + FilenameUtils.getPath(obj).split("/").last + "/"; - CommonUtil.copyFile(fileObj.getPayload.getInput, downloadPath.replaceAll("//", "/"), file); + val downloadPath = localPath + FilenameUtils.getPath(obj).split("/").last + "/" + CommonUtil.copyFile(fileObj.getPayload.getInput, downloadPath.replaceAll("//", "/"), file) } } else { val inStream = blobStore.getBlob(container, objectKey).getPayload.getInput val fileName = objectKey.split("/").last - CommonUtil.copyFile(inStream, localPath, fileName); + CommonUtil.copyFile(inStream, localPath, fileName) } } catch { case e: Exception => @@ -124,7 +132,7 @@ trait BaseStorageService extends IStorageService { } } - override def deleteObject(container: String, objectKey: String, isDirectory: Option[Boolean] = Option(false)) = { + override def deleteObject(container: String, objectKey: String, isDirectory: Option[Boolean] = Option(false)): Unit = { try { deleteObjects(container, List((objectKey, isDirectory.get))) } catch { @@ -133,7 +141,7 @@ trait BaseStorageService extends IStorageService { } } - override def deleteObjects(container: String, objectKeys: List[(String, Boolean)]) = { + override def deleteObjects(container: String, objectKeys: List[(String, Boolean)]): Unit = { try { for (obj <- objectKeys) { if(obj._2) { @@ -176,15 +184,17 @@ trait BaseStorageService extends IStorageService { } } - override def listObjectKeys(container: String, _prefix: String, isDirectory: Option[Boolean] = Option(false)): List[String] = { - if(isDirectory.get) + override def listObjectKeys(container: String, _prefix: String, isDirectory: Boolean = false): List[String] = { + if(isDirectory) blobStore.list(container, prefix(_prefix).recursive()).asScala.map(f => f.getName).toList else blobStore.list(container, prefix(_prefix)).asScala.map(f => f.getName).toList } - override def searchObjects(container: String, prefix: String, fromDate: Option[String] = None, toDate: Option[String] = None, delta: Option[Int] = None, pattern: String = "yyyy-MM-dd"): List[Blob] = { - val from = if (delta.nonEmpty) CommonUtil.getStartDate(toDate, delta.get) else fromDate; + override def searchObjects(container: String, prefix: String, fromDate: Option[String] = None, + toDate: Option[String] = None, delta: Option[Int] = None, + pattern: String = "yyyy-MM-dd"): List[Blob] = { + val from = if (delta.nonEmpty) CommonUtil.getStartDate(toDate, delta.get) else fromDate if (from.nonEmpty) { val dates = CommonUtil.getDatesBetween(from.get, toDate, pattern); val paths = for (date <- dates) yield { @@ -196,9 +206,11 @@ trait BaseStorageService extends IStorageService { } } - override def searchObjectkeys(container: String, prefix: String, fromDate: Option[String] = None, toDate: Option[String] = None, delta: Option[Int] = None, pattern: String = "yyyy-MM-dd"): List[String] = { -// val objectList = searchObjects(container, prefix, fromDate, toDate, delta, pattern) -// getPaths(container, objectList); + override def searchObjectkeys(container: String, prefix: String, fromDate: Option[String] = None, + toDate: Option[String] = None, delta: Option[Int] = None, + pattern: String = "yyyy-MM-dd"): List[String] = { + // val objectList = searchObjects(container, prefix, fromDate, toDate, delta, pattern) + // getPaths(container, objectList); val from = if (delta.nonEmpty) CommonUtil.getStartDate(toDate, delta.get) else fromDate; if (from.nonEmpty) { val dates = CommonUtil.getDatesBetween(from.get, toDate, pattern); @@ -211,8 +223,9 @@ trait BaseStorageService extends IStorageService { } } - override def copyObjects(fromContainer: String, fromKey: String, toContainer: String, toKey: String, isDirectory: Option[Boolean] = Option(false)): Unit = { - if(isDirectory.get) { + override def copyObjects(fromContainer: String, fromKey: String, + toContainer: String, toKey: String, isDirectory: Boolean = false): Unit = { + if(isDirectory) { val updatedFromKey = if(fromKey.endsWith("/")) fromKey else fromKey+"/" val updatedToKey = if(toKey.endsWith("/")) toKey else toKey+"/" val objectKeys = listObjectKeys(fromContainer, updatedFromKey, isDirectory) @@ -227,10 +240,10 @@ trait BaseStorageService extends IStorageService { override def extractArchive(container: String, objectKey: String, toKey: String): Unit = { try { val localPath = AppConf.getConfig("local_extract_path") - download(container, objectKey, localPath, Option(false)) + download(container, objectKey, localPath, isDirectory = false) val localFolder = localPath + "/" + toKey.split("/").last CommonUtil.unZip(localPath + "/" + objectKey.split("/").last, localFolder) - upload(container, localFolder, toKey, None, Option(true)) + upload(container, localFolder, toKey, isDirectory = true) } catch { case e: Exception => diff --git a/src/main/scala/org/sunbird/cloud/storage/IStorageService.scala b/src/main/scala/org/sunbird/cloud/storage/IStorageService.scala index 22b64a2..c111ed0 100644 --- a/src/main/scala/org/sunbird/cloud/storage/IStorageService.scala +++ b/src/main/scala/org/sunbird/cloud/storage/IStorageService.scala @@ -10,14 +10,14 @@ trait IStorageService { * @param container String - The container/bucket to upload the file to. * @param file String - The file path. * @param objectKey String - The destination key/path to upload the file to. If the path exists it will be overwritten. - * @param isPublic Option[Boolean] - Whether the file should have public read access? Optional and defaults to false. - * @param isDirectory Option[Boolean] - Whether the file is a directory and need to upload folder recursively? + * @param isPublic Boolean - Whether the file should have public read access? Optional and defaults to false. + * @param isDirectory Boolean - Whether the file is a directory and need to upload folder recursively? * @param ttl Option[Int] - The ttl/expiry for the file. Optional and default is never expires * @param retryCount Option[Int] - Number of times the upload will be retried before failing. Defaults to global configuration "max.retries" * * @return String - The url of the file/folder uploaded */ - def upload(container: String, file: String, objectKey: String, isPublic: Option[Boolean] = Option(false), isDirectory: Option[Boolean] = Option(false), + def upload(container: String, file: String, objectKey: String, isPublic: Boolean = false, isDirectory: Boolean = false, ttl: Option[Int] = None, retryCount: Option[Int] = None): String /** @@ -54,9 +54,9 @@ trait IStorageService { * @param container String - The container/bucket of the file * @param objectKey String - The key/path of the file to download from * @param localPath String - The local destination path to download to - * @param isDirectory Option[Boolean] - Whether the file is a directory and need to be downloaded recursively? Optional and defaults to false. + * @param isDirectory Boolean - Whether the file is a directory and need to be downloaded recursively? Optional and defaults to false. */ - def download(container: String, objectKey: String, localPath: String, isDirectory: Option[Boolean] = Option(false)) + def download(container: String, objectKey: String, localPath: String, isDirectory: Boolean = false) /** * Delete an object from the cloud store @@ -84,7 +84,7 @@ trait IStorageService { * @param toKey String - The object prefix to copy to. * @param isDirectory Option[Boolean] - Whether the copy is a file or folder? Defaults to false i.e copy one file. */ - def copyObjects(fromContainer: String, fromKey: String, toContainer: String, toKey: String, isDirectory: Option[Boolean] = Option(false)) + def copyObjects(fromContainer: String, fromKey: String, toContainer: String, toKey: String, isDirectory: Boolean = false) /** @@ -118,15 +118,14 @@ trait IStorageService { */ def listObjects(container: String, prefix: String, withPayload: Option[Boolean] = Option(false)): List[Blob] - /** - * List object keys from cloud storage for a given prefix. Similar to listObjects() - * - * @param container String - The container/bucket - * @param _prefix String - The object prefix to list objects. The prefix can be folder or pattern. - * - * @return List[Blob] - The blob objects for the given prefix. - */ - def listObjectKeys(container: String, _prefix: String, isDirectory: Option[Boolean] = Option(false)): List[String] + /** + * List object keys from cloud storage for a given prefix. Similar to listObjects() + * @param container - The container/bucket + * @param _prefix - The object prefix to list objects. The prefix can be folder or pattern. + * @param isDirectory - specify if the prefix is a directory + * @return + */ + def listObjectKeys(container: String, _prefix: String, isDirectory: Boolean = false): List[String] /** * Search for objects for a given prefix and return only keys. Specifically used for telemetry files as the files are prefixed by sync date. diff --git a/src/test/scala/org/sunbird/cloud/storage/service/TestAzureStorageService.scala b/src/test/scala/org/sunbird/cloud/storage/service/TestAzureStorageService.scala index d0590cf..ecf0f5b 100644 --- a/src/test/scala/org/sunbird/cloud/storage/service/TestAzureStorageService.scala +++ b/src/test/scala/org/sunbird/cloud/storage/service/TestAzureStorageService.scala @@ -17,18 +17,22 @@ class TestAzureStorageService extends FlatSpec with Matchers { azureService.download(storageContainer, "testUpload/test-blob.log", "src/test/resources/test-azure/") // upload directory - println("url of folder", azureService.upload(storageContainer, "src/test/resources/1234/", "testUpload/1234/", None, Option(true))) + println("url of folder", azureService.upload(container = storageContainer, file = "src/test/resources/1234/", + objectKey = "testUpload/1234/", isDirectory = true)) // downlaod directory - azureService.download(storageContainer, "testUpload/1234/", "src/test/resources/test-azure/", Option(true)) + azureService.download(storageContainer, "testUpload/1234/", "src/test/resources/test-azure/", isDirectory = true) println("azure signed url", azureService.getSignedURL(storageContainer, "testUpload/test-blob.log", Option(600))) val blob = azureService.getObject(storageContainer, "testUpload/test-blob.log") println("blob details: ", blob) - println("upload public url", azureService.upload(storageContainer, "src/test/resources/test-data.log", "testUpload/test-data-public.log", Option(true))) - println("upload public with expiry url", azureService.upload(storageContainer, "src/test/resources/test-data.log", "testUpload/test-data-with-expiry.log", Option(true), Option(false), Option(600))) + println("upload public url", azureService.upload(container = storageContainer, file = "src/test/resources/test-data.log", + objectKey = "testUpload/test-data-public.log", isPublic = true)) + println("upload public with expiry url", azureService.upload(container = storageContainer, + file ="src/test/resources/test-data.log", objectKey = "testUpload/test-data-with-expiry.log", + isPublic = true, isDirectory = false, Option(600))) println("signed path to upload from external client", azureService.getSignedURL(storageContainer, "testUpload/test-data-public1.log", Option(600), Option("w"))) val keys = azureService.searchObjectkeys(storageContainer, "testUpload/1234/") @@ -48,7 +52,7 @@ class TestAzureStorageService extends FlatSpec with Matchers { azureService.upload(storageContainer, "src/test/resources/test-extract.zip", "testUpload/test-extract.zip") azureService.copyObjects(storageContainer, "testUpload/test-extract.zip", storageContainer, "testDuplicate/test-extract.zip") - azureService.copyObjects(storageContainer, "testUpload/1234/", storageContainer, "testDuplicate/1234/", Option(true)) + azureService.copyObjects(storageContainer, "testUpload/1234/", storageContainer, "testDuplicate/1234/", isDirectory = true) azureService.extractArchive(storageContainer, "testUpload/test-extract.zip", "testUpload/test-extract/") diff --git a/src/test/scala/org/sunbird/cloud/storage/service/TestS3StorageService.scala b/src/test/scala/org/sunbird/cloud/storage/service/TestS3StorageService.scala index f214ce4..382672b 100644 --- a/src/test/scala/org/sunbird/cloud/storage/service/TestS3StorageService.scala +++ b/src/test/scala/org/sunbird/cloud/storage/service/TestS3StorageService.scala @@ -17,18 +17,23 @@ class TestS3StorageService extends FlatSpec with Matchers { s3Service.download(storageContainer, "testUpload/test-blob.log", "src/test/resources/test-s3/") // upload directory - println("url of folder", s3Service.upload(storageContainer, "src/test/resources/1234/", "testUpload/1234/", None, Option(true))) + println("url of folder", + s3Service.upload(container = storageContainer, file = "src/test/resources/1234/", + objectKey = "testUpload/1234/", isDirectory = true)) // downlaod directory - s3Service.download(storageContainer, "testUpload/1234/", "src/test/resources/test-s3/", Option(true)) + s3Service.download(storageContainer, "testUpload/1234/", "src/test/resources/test-s3/", isDirectory = true) println("azure signed url", s3Service.getSignedURL(storageContainer, "testUpload/test-blob.log", Option(600))) val blob = s3Service.getObject(storageContainer, "testUpload/test-blob.log") println("blob details: ", blob) - println("upload public url", s3Service.upload(storageContainer, "src/test/resources/test-data.log", "testUpload/test-data-public.log", Option(true))) - println("upload public with expiry url", s3Service.upload(storageContainer, "src/test/resources/test-data.log", "testUpload/test-data-with-expiry.log", Option(true), Option(false), Option(600))) + println("upload public url", s3Service.upload(storageContainer, "src/test/resources/test-data.log", + "testUpload/test-data-public.log", isPublic = true)) + println("upload public with expiry url", + s3Service.upload(container = storageContainer, file = "src/test/resources/test-data.log", + objectKey = "testUpload/test-data-with-expiry.log", isPublic = true, ttl = Option(600))) println("signed path to upload from external client", s3Service.getSignedURL(storageContainer, "testUpload/test-data-public1.log", Option(600), Option("w"))) val keys = s3Service.searchObjectkeys(storageContainer, "testUpload/1234/")