Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #SB-7857 chore: Implement multipart uploads for large files #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 49 additions & 36 deletions src/main/scala/org/sunbird/cloud/storage/BaseStorageService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import collection.JavaConverters._
import org.jclouds.blobstore.options.ListContainerOptions.Builder.prefix
import org.sunbird.cloud.storage.Model.Blob
import org.jclouds.blobstore.options.CopyOptions
import org.jclouds.blobstore.options.PutOptions.Builder.multipart
import org.sunbird.cloud.storage.conf.AppConf

trait BaseStorageService extends IStorageService {
Expand All @@ -25,41 +26,43 @@ trait BaseStorageService extends IStorageService {
var maxContentLength = 0
val tika = new Tika()

override def upload(container: String, file: String, objectKey: String, isPublic: Option[Boolean] = Option(false), isDirectory: Option[Boolean] = Option(false), ttl: Option[Int] = None, retryCount: Option[Int] = None): String = {
override def upload(container: String, file: String, objectKey: String, isPublic: Boolean = false,
isDirectory: Boolean = false, ttl: Option[Int] = None,
retryCount: Option[Int] = None): String = {

try {
if(isDirectory.get) {
if(isDirectory) {
val d = new File(file)
val files = if (d.exists && d.isDirectory) {
d.listFiles.filter(_.isFile).toList;
d.listFiles.filter(_.isFile).toList
} else {
List[File]();
List[File]()
}
val list = files.map {f =>
val key = objectKey + f.getName.split("/").last
upload(container, f.getAbsolutePath, key)
}
list.toString()
}
else {
} else {
if (attempt == retryCount.getOrElse(maxRetries)) {
val message = s"Failed to upload. file: $file, key: $objectKey, attempt: $attempt, maxAttempts: $retryCount. Exceeded maximum number of retries"
val message = s"Failed to upload. file: $file, key: $objectKey, attempt: $attempt, " +
s"maxAttempts: $retryCount. Exceeded maximum number of retries"
throw new StorageServiceException(message)
}

blobStore.createContainerInLocation(null, container)
val fileObj = new File(file)
val payload = Files.asByteSource(fileObj)
val contentType = tika.detect(fileObj)
val blob = blobStore.blobBuilder(objectKey).payload(payload).contentType(contentType).contentLength(payload.size()).build()
blobStore.putBlob(container, blob)
if (isPublic.get) {
val contentType = tika.detect(fileObj)
val blob = blobStore.blobBuilder(objectKey).payload(payload)
.contentType(contentType).contentLength(payload.size()).build()
blobStore.putBlob(container, blob, multipart())
if (isPublic) {
getSignedURL(container, objectKey, Option(ttl.getOrElse(maxSignedurlTTL)))
}
else blobStore.getBlob(container, objectKey).getMetadata.getUri.toString
}
}
catch {
} catch {
case e: Exception => {
Thread.sleep(attempt*2000)
attempt += 1
Expand All @@ -68,12 +71,15 @@ trait BaseStorageService extends IStorageService {
}
}

override def put(container: String, content: Array[Byte], objectKey: String, isPublic: Option[Boolean] = Option(false), isDirectory: Option[Boolean] = Option(false), ttl: Option[Int] = None, retryCount: Option[Int] = None): String = {
override def put(container: String, content: Array[Byte], objectKey: String,
isPublic: Option[Boolean] = Option(false), isDirectory: Option[Boolean] = Option(false),
ttl: Option[Int] = None, retryCount: Option[Int] = None): String = {

try {

if (attempt == retryCount.getOrElse(maxRetries)) {
val message = s"Failed to upload. key: $objectKey, attempt: $attempt, maxAttempts: $retryCount. Exceeded maximum number of retries"
val message = s"Failed to upload. key: $objectKey, attempt: $attempt, " +
s"maxAttempts: $retryCount. Exceeded maximum number of retries"
throw new StorageServiceException(message)
}

Expand All @@ -94,37 +100,39 @@ trait BaseStorageService extends IStorageService {
}
}

override def getSignedURL(container: String, objectKey: String, ttl: Option[Int] = None, permission: Option[String] = Option("r")): String = {
override def getSignedURL(container: String, objectKey: String, ttl: Option[Int] = None,
permission: Option[String] = Option("r")): String = {
if (permission.getOrElse("").equalsIgnoreCase("w")) {
context.getSigner.signPutBlob(container, blobStore.blobBuilder(objectKey).forSigning().contentLength(maxContentLength).build(), 600l).getEndpoint.toString
context.getSigner.signPutBlob(container, blobStore.blobBuilder(objectKey).forSigning().
contentLength(maxContentLength).build(), 600l).getEndpoint.toString
} else {
context.getSigner.signGetBlob(container, objectKey, ttl.getOrElse(maxSignedurlTTL)).getEndpoint.toString
}
}

override def download(container: String, objectKey: String, localPath: String, isDirectory: Option[Boolean] = Option(false)) = {
override def download(container: String, objectKey: String, localPath: String, isDirectory: Boolean = false): Unit = {
try {
if(isDirectory.get) {
if(isDirectory) {
val objects = listObjectKeys(container, objectKey, isDirectory)
for (obj <- objects) {
val file = FilenameUtils.getName(obj);
val file = FilenameUtils.getName(obj)
val fileObj = blobStore.getBlob(container, obj)
val downloadPath = localPath + FilenameUtils.getPath(obj).split("/").last + "/";
CommonUtil.copyFile(fileObj.getPayload.getInput, downloadPath.replaceAll("//", "/"), file);
val downloadPath = localPath + FilenameUtils.getPath(obj).split("/").last + "/"
CommonUtil.copyFile(fileObj.getPayload.getInput, downloadPath.replaceAll("//", "/"), file)
}
}
else {
val inStream = blobStore.getBlob(container, objectKey).getPayload.getInput
val fileName = objectKey.split("/").last
CommonUtil.copyFile(inStream, localPath, fileName);
CommonUtil.copyFile(inStream, localPath, fileName)
}
} catch {
case e: Exception =>
throw new StorageServiceException(e.getMessage)
}
}

override def deleteObject(container: String, objectKey: String, isDirectory: Option[Boolean] = Option(false)) = {
override def deleteObject(container: String, objectKey: String, isDirectory: Option[Boolean] = Option(false)): Unit = {
try {
deleteObjects(container, List((objectKey, isDirectory.get)))
} catch {
Expand All @@ -133,7 +141,7 @@ trait BaseStorageService extends IStorageService {
}
}

override def deleteObjects(container: String, objectKeys: List[(String, Boolean)]) = {
override def deleteObjects(container: String, objectKeys: List[(String, Boolean)]): Unit = {
try {
for (obj <- objectKeys) {
if(obj._2) {
Expand Down Expand Up @@ -176,15 +184,17 @@ trait BaseStorageService extends IStorageService {
}
}

override def listObjectKeys(container: String, _prefix: String, isDirectory: Option[Boolean] = Option(false)): List[String] = {
if(isDirectory.get)
override def listObjectKeys(container: String, _prefix: String, isDirectory: Boolean = false): List[String] = {
if(isDirectory)
blobStore.list(container, prefix(_prefix).recursive()).asScala.map(f => f.getName).toList
else
blobStore.list(container, prefix(_prefix)).asScala.map(f => f.getName).toList
}

override def searchObjects(container: String, prefix: String, fromDate: Option[String] = None, toDate: Option[String] = None, delta: Option[Int] = None, pattern: String = "yyyy-MM-dd"): List[Blob] = {
val from = if (delta.nonEmpty) CommonUtil.getStartDate(toDate, delta.get) else fromDate;
override def searchObjects(container: String, prefix: String, fromDate: Option[String] = None,
toDate: Option[String] = None, delta: Option[Int] = None,
pattern: String = "yyyy-MM-dd"): List[Blob] = {
val from = if (delta.nonEmpty) CommonUtil.getStartDate(toDate, delta.get) else fromDate
if (from.nonEmpty) {
val dates = CommonUtil.getDatesBetween(from.get, toDate, pattern);
val paths = for (date <- dates) yield {
Expand All @@ -196,9 +206,11 @@ trait BaseStorageService extends IStorageService {
}
}

override def searchObjectkeys(container: String, prefix: String, fromDate: Option[String] = None, toDate: Option[String] = None, delta: Option[Int] = None, pattern: String = "yyyy-MM-dd"): List[String] = {
// val objectList = searchObjects(container, prefix, fromDate, toDate, delta, pattern)
// getPaths(container, objectList);
override def searchObjectkeys(container: String, prefix: String, fromDate: Option[String] = None,
toDate: Option[String] = None, delta: Option[Int] = None,
pattern: String = "yyyy-MM-dd"): List[String] = {
// val objectList = searchObjects(container, prefix, fromDate, toDate, delta, pattern)
// getPaths(container, objectList);
val from = if (delta.nonEmpty) CommonUtil.getStartDate(toDate, delta.get) else fromDate;
if (from.nonEmpty) {
val dates = CommonUtil.getDatesBetween(from.get, toDate, pattern);
Expand All @@ -211,8 +223,9 @@ trait BaseStorageService extends IStorageService {
}
}

override def copyObjects(fromContainer: String, fromKey: String, toContainer: String, toKey: String, isDirectory: Option[Boolean] = Option(false)): Unit = {
if(isDirectory.get) {
override def copyObjects(fromContainer: String, fromKey: String,
toContainer: String, toKey: String, isDirectory: Boolean = false): Unit = {
if(isDirectory) {
val updatedFromKey = if(fromKey.endsWith("/")) fromKey else fromKey+"/"
val updatedToKey = if(toKey.endsWith("/")) toKey else toKey+"/"
val objectKeys = listObjectKeys(fromContainer, updatedFromKey, isDirectory)
Expand All @@ -227,10 +240,10 @@ trait BaseStorageService extends IStorageService {
override def extractArchive(container: String, objectKey: String, toKey: String): Unit = {
try {
val localPath = AppConf.getConfig("local_extract_path")
download(container, objectKey, localPath, Option(false))
download(container, objectKey, localPath, isDirectory = false)
val localFolder = localPath + "/" + toKey.split("/").last
CommonUtil.unZip(localPath + "/" + objectKey.split("/").last, localFolder)
upload(container, localFolder, toKey, None, Option(true))
upload(container, localFolder, toKey, isDirectory = true)
}
catch {
case e: Exception =>
Expand Down
29 changes: 14 additions & 15 deletions src/main/scala/org/sunbird/cloud/storage/IStorageService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ trait IStorageService {
* @param container String - The container/bucket to upload the file to.
* @param file String - The file path.
* @param objectKey String - The destination key/path to upload the file to. If the path exists it will be overwritten.
* @param isPublic Option[Boolean] - Whether the file should have public read access? Optional and defaults to false.
* @param isDirectory Option[Boolean] - Whether the file is a directory and need to upload folder recursively?
* @param isPublic Boolean - Whether the file should have public read access? Optional and defaults to false.
* @param isDirectory Boolean - Whether the file is a directory and need to upload folder recursively?
* @param ttl Option[Int] - The ttl/expiry for the file. Optional and default is never expires
* @param retryCount Option[Int] - Number of times the upload will be retried before failing. Defaults to global configuration "max.retries"
*
* @return String - The url of the file/folder uploaded
*/
def upload(container: String, file: String, objectKey: String, isPublic: Option[Boolean] = Option(false), isDirectory: Option[Boolean] = Option(false),
def upload(container: String, file: String, objectKey: String, isPublic: Boolean = false, isDirectory: Boolean = false,
ttl: Option[Int] = None, retryCount: Option[Int] = None): String

/**
Expand Down Expand Up @@ -54,9 +54,9 @@ trait IStorageService {
* @param container String - The container/bucket of the file
* @param objectKey String - The key/path of the file to download from
* @param localPath String - The local destination path to download to
* @param isDirectory Option[Boolean] - Whether the file is a directory and need to be downloaded recursively? Optional and defaults to false.
* @param isDirectory Boolean - Whether the file is a directory and need to be downloaded recursively? Optional and defaults to false.
*/
def download(container: String, objectKey: String, localPath: String, isDirectory: Option[Boolean] = Option(false))
def download(container: String, objectKey: String, localPath: String, isDirectory: Boolean = false)

/**
* Delete an object from the cloud store
Expand Down Expand Up @@ -84,7 +84,7 @@ trait IStorageService {
* @param toKey String - The object prefix to copy to.
* @param isDirectory Option[Boolean] - Whether the copy is a file or folder? Defaults to false i.e copy one file.
*/
def copyObjects(fromContainer: String, fromKey: String, toContainer: String, toKey: String, isDirectory: Option[Boolean] = Option(false))
def copyObjects(fromContainer: String, fromKey: String, toContainer: String, toKey: String, isDirectory: Boolean = false)


/**
Expand Down Expand Up @@ -118,15 +118,14 @@ trait IStorageService {
*/
def listObjects(container: String, prefix: String, withPayload: Option[Boolean] = Option(false)): List[Blob]

/**
* List object keys from cloud storage for a given prefix. Similar to <code>listObjects()</code>
*
* @param container String - The container/bucket
* @param _prefix String - The object prefix to list objects. The prefix can be folder or pattern.
*
* @return List[Blob] - The blob objects for the given prefix.
*/
def listObjectKeys(container: String, _prefix: String, isDirectory: Option[Boolean] = Option(false)): List[String]
/**
* List object keys from cloud storage for a given prefix. Similar to <code>listObjects()</code>
* @param container - The container/bucket
* @param _prefix - The object prefix to list objects. The prefix can be folder or pattern.
* @param isDirectory - specify if the prefix is a directory
* @return
*/
def listObjectKeys(container: String, _prefix: String, isDirectory: Boolean = false): List[String]

/**
* Search for objects for a given prefix and return only keys. Specifically used for telemetry files as the files are prefixed by sync date.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@ class TestAzureStorageService extends FlatSpec with Matchers {
azureService.download(storageContainer, "testUpload/test-blob.log", "src/test/resources/test-azure/")

// upload directory
println("url of folder", azureService.upload(storageContainer, "src/test/resources/1234/", "testUpload/1234/", None, Option(true)))
println("url of folder", azureService.upload(container = storageContainer, file = "src/test/resources/1234/",
objectKey = "testUpload/1234/", isDirectory = true))

// downlaod directory
azureService.download(storageContainer, "testUpload/1234/", "src/test/resources/test-azure/", Option(true))
azureService.download(storageContainer, "testUpload/1234/", "src/test/resources/test-azure/", isDirectory = true)

println("azure signed url", azureService.getSignedURL(storageContainer, "testUpload/test-blob.log", Option(600)))

val blob = azureService.getObject(storageContainer, "testUpload/test-blob.log")
println("blob details: ", blob)

println("upload public url", azureService.upload(storageContainer, "src/test/resources/test-data.log", "testUpload/test-data-public.log", Option(true)))
println("upload public with expiry url", azureService.upload(storageContainer, "src/test/resources/test-data.log", "testUpload/test-data-with-expiry.log", Option(true), Option(false), Option(600)))
println("upload public url", azureService.upload(container = storageContainer, file = "src/test/resources/test-data.log",
objectKey = "testUpload/test-data-public.log", isPublic = true))
println("upload public with expiry url", azureService.upload(container = storageContainer,
file ="src/test/resources/test-data.log", objectKey = "testUpload/test-data-with-expiry.log",
isPublic = true, isDirectory = false, Option(600)))
println("signed path to upload from external client", azureService.getSignedURL(storageContainer, "testUpload/test-data-public1.log", Option(600), Option("w")))

val keys = azureService.searchObjectkeys(storageContainer, "testUpload/1234/")
Expand All @@ -48,7 +52,7 @@ class TestAzureStorageService extends FlatSpec with Matchers {

azureService.upload(storageContainer, "src/test/resources/test-extract.zip", "testUpload/test-extract.zip")
azureService.copyObjects(storageContainer, "testUpload/test-extract.zip", storageContainer, "testDuplicate/test-extract.zip")
azureService.copyObjects(storageContainer, "testUpload/1234/", storageContainer, "testDuplicate/1234/", Option(true))
azureService.copyObjects(storageContainer, "testUpload/1234/", storageContainer, "testDuplicate/1234/", isDirectory = true)

azureService.extractArchive(storageContainer, "testUpload/test-extract.zip", "testUpload/test-extract/")

Expand Down
Loading