Skip to content

Commit

Permalink
Merge pull request #18 from emeraldpay/fix/double-slash
Browse files Browse the repository at this point in the history
  • Loading branch information
splix authored Dec 24, 2023
2 parents 61c55e5 + 1349259 commit 4dd2e08
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.emeraldpay.dshackle.archive.storage

open class BucketPath(
private val bucketPath: String,
) {

fun fullPathFor(path: String): String {
return listOf(bucketPath, path)
.filter { it != null && it.isNotEmpty() }
.joinToString("/")
.replace(Regex("//+"), "/")
}

fun forBlockchain(blockchainDir: String): BlockchainPath {
return BlockchainPath(blockchainDir, this)
}

class BlockchainPath(
private val blockchainDir: String,
private val bucketPath: BucketPath,
) {

fun fullPathFor(path: String): String {
return bucketPath.fullPathFor(
listOf(blockchainDir, path).joinToString("/"),
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ open class GSStorageAccess(
}

private val blockchainDir = filenameGenerator.parentDir
private val path = googleStorage.bucketPath + "/" + blockchainDir
private val path = googleStorage.bucketPath.forBlockchain(blockchainDir)

override fun getDirBlockSizeL1(): Long {
return filenameGenerator.dirBlockSizeL1
Expand All @@ -48,20 +48,20 @@ open class GSStorageAccess(
// it stops processing stream files when last block is reached and starts processing ranges form given height
val query = listOf(
ListQuery(
prefix = path + filenameGenerator.getLevel0(height) + "/",
rangeStart = googleStorage.bucketPath + "/" + filenameGenerator.getIndividualFilename(FileType.BLOCKS.asTypeSingle(), height),
rangeEnd = path + filenameGenerator.getLevel0(height) + "/" + filenameGenerator.maxLevelValue(), // stop at the max level value, before range-* starts
prefix = path.fullPathFor(filenameGenerator.getLevel0(height) + "/"),
rangeStart = googleStorage.bucketPath.fullPathFor(filenameGenerator.getIndividualFilename(FileType.BLOCKS.asTypeSingle(), height)),
rangeEnd = path.fullPathFor(filenameGenerator.getLevel0(height) + "/" + filenameGenerator.maxLevelValue()), // stop at the max level value, before range-* starts
),
ListQuery(
path + filenameGenerator.getLevel0(height) + "/",
googleStorage.bucketPath + "/" + filenameGenerator.getRangeFilename(FileType.BLOCKS.asTypeSingle(), Chunk(height, 0)),
path.fullPathFor(filenameGenerator.getLevel0(height) + "/"),
googleStorage.bucketPath.fullPathFor(filenameGenerator.getRangeFilename(FileType.BLOCKS.asTypeSingle(), Chunk(height, 0))),
),
)
log.debug("Query lists for for: {}", query)
return Flux.fromIterable(query)
.flatMap { query(it) }
.map {
blockchainDir + it.blobId.name.substring(this.path.length)
blockchainDir + it.blobId.name.substring(this.path.fullPathFor("").length)
}
}

Expand All @@ -71,7 +71,7 @@ open class GSStorageAccess(
override fun deleteArchives(files: List<String>): Mono<Void> {
return Flux.fromIterable(files)
.map {
BlobId.of(googleStorage.bucket, googleStorage.getBucketPath(it))
BlobId.of(googleStorage.bucket, googleStorage.bucketPath.fullPathFor(it))
}
.collectList()
.flatMap {
Expand All @@ -83,18 +83,18 @@ open class GSStorageAccess(
}

override fun getURI(file: String): String {
return "gs://${googleStorage.bucket}/${googleStorage.getBucketPath(file)}"
return "gs://${googleStorage.bucket}/${googleStorage.bucketPath.fullPathFor(file)}"
}

override fun exists(path: String): Boolean {
val blobId = BlobId.of(googleStorage.bucket, googleStorage.getBucketPath(path))
val blobId = BlobId.of(googleStorage.bucket, googleStorage.bucketPath.fullPathFor(path))
val blobInfo = BlobInfo.newBuilder(blobId)
.build()
return googleStorage.storage.get(blobInfo.blobId) != null
}

override fun createWriter(path: String): OutputStream {
val blobId = BlobId.of(googleStorage.bucket, googleStorage.getBucketPath(path))
val blobId = BlobId.of(googleStorage.bucket, googleStorage.bucketPath.fullPathFor(path))
val blobInfo = BlobInfo.newBuilder(blobId).build()
val channel = googleStorage.storage.writer(blobInfo, Storage.BlobWriteOption.disableGzipContent())
?: throw IllegalStateException("Blob ${blobId.toGsUtilUri()} cannot be created")
Expand All @@ -108,7 +108,7 @@ open class GSStorageAccess(
}

override fun createReader(path: String): SeekableInput {
val blobId = BlobId.of(googleStorage.bucket, googleStorage.getBucketPath(path))
val blobId = BlobId.of(googleStorage.bucket, googleStorage.bucketPath.fullPathFor(path))
return createReader(blobId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.google.cloud.storage.Storage
import com.google.cloud.storage.StorageOptions
import io.emeraldpay.dshackle.archive.config.GoogleAuthProvider
import io.emeraldpay.dshackle.archive.config.RunConfig
import io.emeraldpay.dshackle.archive.storage.BucketPath
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Profile
Expand All @@ -23,7 +24,9 @@ class GoogleStorage(

private val export: RunConfig.ExportBucket = runConfig.export.bucket!!
val bucket = export.bucket
val bucketPath = export.path.let { if (it.endsWith("/")) it.substring(0, it.length - 2) else it }
val bucketPath = BucketPath(
export.path.let { if (it.endsWith("/")) it.substring(0, it.length - 2) else it },
)

lateinit var storage: Storage

Expand All @@ -36,11 +39,4 @@ class GoogleStorage(

log.info("Upload archives to Google Storage bucket $bucket into $bucketPath")
}

fun getBucketPath(path: String): String {
return listOf(
bucketPath,
path,
).filter { it.isNotEmpty() }.joinToString("/")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.emeraldpay.dshackle.archive.storage.s3

import io.emeraldpay.dshackle.archive.config.AwsAuthProvider
import io.emeraldpay.dshackle.archive.config.RunConfig
import io.emeraldpay.dshackle.archive.storage.BucketPath
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.FactoryBean
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -43,7 +44,10 @@ class S3Config(

private val s3opts = export.s3!!
val bucket = export.bucket
val bucketPath = export.path.let { if (it.endsWith("/")) it.substring(0, it.length - 2) else it }.trimStart('/')
val bucketPath = BucketPath(
export.path
.let { if (it.endsWith("/")) it.substring(0, it.length - 2) else it },
)

lateinit var storage: S3Client

Expand Down Expand Up @@ -84,11 +88,4 @@ class S3Config(

log.info("Upload archives to S3 bucket `$bucket` into `$bucketPath`")
}

fun getBucketPath(path: String): String {
return listOf(
bucketPath,
path,
).filter { it.isNotEmpty() }.joinToString("/")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,27 @@ open class S3StorageAccess(

private val threads = Executors.newCachedThreadPool()
private val blockchainDir = filenameGenerator.parentDir
private val path = s3Config.bucketPath + "/" + blockchainDir
private val path = s3Config.bucketPath.forBlockchain(blockchainDir)

override fun listArchiveLevel0(height: Long): Flux<String> {
// use separate filters for Stream files and Ranges,
// it stops processing stream files when last block is reached and starts processing ranges form given height
val query = listOf(
ListQuery(
prefix = path + filenameGenerator.getLevel0(height) + "/",
rangeStart = s3Config.bucketPath + "/" + filenameGenerator.getIndividualFilename(FileType.BLOCKS.asTypeSingle(), height),
rangeEnd = path + filenameGenerator.getLevel0(height) + "/" + filenameGenerator.maxLevelValue(), // stop at the max level value, before range-* starts
prefix = path.fullPathFor(filenameGenerator.getLevel0(height) + "/"),
rangeStart = s3Config.bucketPath.fullPathFor(filenameGenerator.getIndividualFilename(FileType.BLOCKS.asTypeSingle(), height)),
rangeEnd = path.fullPathFor(filenameGenerator.getLevel0(height) + "/" + filenameGenerator.maxLevelValue()), // stop at the max level value, before range-* starts
),
ListQuery(
path + filenameGenerator.getLevel0(height) + "/",
s3Config.bucketPath + "/" + filenameGenerator.getRangeFilename(FileType.BLOCKS.asTypeSingle(), Chunk(height, 0)),
path.fullPathFor(filenameGenerator.getLevel0(height) + "/"),
s3Config.bucketPath.fullPathFor(filenameGenerator.getRangeFilename(FileType.BLOCKS.asTypeSingle(), Chunk(height, 0))),
),
)
log.debug("Query lists for for: {}", query)
return Flux.fromIterable(query)
.flatMap { query(it) }
.map {
blockchainDir + it.key().substring(this.path.length)
blockchainDir + it.key().substring(this.path.fullPathFor("").length)
}
}

Expand All @@ -88,7 +88,7 @@ open class S3StorageAccess(
}

override fun getURI(file: String): String {
return "s3://${s3Config.bucket}/${s3Config.getBucketPath(file)}"
return "s3://${s3Config.bucket}/${s3Config.bucketPath.fullPathFor(file)}"
}

fun getBucket(uri: URI): String {
Expand All @@ -108,7 +108,7 @@ open class S3StorageAccess(
try {
val request = HeadObjectRequest.builder()
.bucket(s3Config.bucket)
.key(s3Config.getBucketPath(path))
.key(s3Config.bucketPath.fullPathFor(path))
.build()
val response = s3Config.storage.headObject(request)
return response != null
Expand All @@ -131,7 +131,7 @@ open class S3StorageAccess(
try {
val request = PutObjectRequest.builder()
.bucket(s3Config.bucket)
.key(s3Config.getBucketPath(path))
.key(s3Config.bucketPath.fullPathFor(path))
.build()
val body = RequestBody.fromContentProvider({ pipe }, "application/avro")
s3Config.storage.putObject(request, body)
Expand All @@ -148,7 +148,7 @@ open class S3StorageAccess(
override fun createReader(path: String): SeekableInput {
val request = GetObjectRequest.builder()
.bucket(s3Config.bucket)
.key(s3Config.getBucketPath(path))
.key(s3Config.bucketPath.fullPathFor(path))
.build()
val obj = s3Config.storage.getObject(request)
return SeekableS3Object(obj)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.emeraldpay.dshackle.archive.storage

import spock.lang.Specification

class BucketPathSpec extends Specification {

def "Removes double slash"() {
setup:
def path = new BucketPath("a/b//")
when:
def result = path.fullPathFor("/c.txt")
then:
result == "a/b/c.txt"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ class FilenameGeneratorSpec extends Specification {
"012000000/012001000/012000005.block.avro" | 12000005 | 1
"012000000/012001000/012000012.block.v20220102.avro" | 12000012 | 1
}

def "Removed duplicate slashes"() {
setup:
FilenameGenerator generator = new FilenameGenerator("", "test/", 1_000_000, 1_000)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ class GSStorageAccessSpec extends Specification {

then:
1 * storage.query(
new ListQuery("/dir/002000000/", "/dir/002000000/002000000/002000123.block.v0.avro", "/dir/002000000/999999999")
) >> Flux.fromIterable([BlobInfo.newBuilder("bucket", "/dir/2M/0.txt").build(), BlobInfo.newBuilder("bucket", "/dir/2M/1.txt").build()])
new ListQuery("dir/002000000/", "dir/002000000/002000000/002000123.block.v0.avro", "dir/002000000/999999999")
) >> Flux.fromIterable([BlobInfo.newBuilder("bucket", "dir/2M/0.txt").build(), BlobInfo.newBuilder("bucket", "dir/2M/1.txt").build()])
1 * storage.query(
new ListQuery("/dir/002000000/", "/dir/002000000/range-002000123_002000122.block.v0.avro", null)
new ListQuery("dir/002000000/", "dir/002000000/range-002000123_002000122.block.v0.avro", null)
) >> Flux.empty()
1 * storage.query(
new ListQuery("/dir/003000000/", "/dir/003000000/003000000/003000000.block.v0.avro", "/dir/003000000/999999999")
) >> Flux.fromIterable([BlobInfo.newBuilder("bucket", "/dir/3M/0.txt").build(), BlobInfo.newBuilder("bucket", "/dir/3M/1.txt").build()])
new ListQuery("dir/003000000/", "dir/003000000/003000000/003000000.block.v0.avro", "dir/003000000/999999999")
) >> Flux.fromIterable([BlobInfo.newBuilder("bucket", "dir/3M/0.txt").build(), BlobInfo.newBuilder("bucket", "dir/3M/1.txt").build()])
1 * storage.query(
new ListQuery("/dir/003000000/", "/dir/003000000/range-003000000_002999999.block.v0.avro", null)
) >> Flux.fromIterable([BlobInfo.newBuilder("bucket", "/dir/3M/range-1-2.txt").build()])
new ListQuery("dir/003000000/", "dir/003000000/range-003000000_002999999.block.v0.avro", null)
) >> Flux.fromIterable([BlobInfo.newBuilder("bucket", "dir/3M/range-1-2.txt").build()])
results == ["dir/2M/0.txt", "dir/2M/1.txt", "dir/3M/0.txt", "dir/3M/1.txt", "dir/3M/range-1-2.txt"]
}

Expand Down

0 comments on commit 4dd2e08

Please sign in to comment.