Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for Google Spanner #271

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions buildSrc/src/main/kotlin/Dependencies.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ object Dependencies {
val aws2DynamodbEnhanced = "software.amazon.awssdk:dynamodb-enhanced:2.16.99"
val dokkaGradlePlugin = "org.jetbrains.dokka:dokka-gradle-plugin:1.5.0"
val flywayGradleBuildscriptDep = "gradle.plugin.com.boxfuse.client:flyway-release:5.0.2"
val gcpSpanner = "com.google.cloud:google-cloud-spanner:6.13.0"
val guava = "com.google.guava:guava:31.0.1-jre"
val guice = "com.google.inject:guice:5.1.0"
val jCommander = "com.beust:jcommander:1.72"
Expand Down Expand Up @@ -41,6 +42,8 @@ object Dependencies {
val miskAws2Dynamodb = "com.squareup.misk:misk-aws2-dynamodb:${Versions.misk}"
val miskAws2DynamodbTesting = "com.squareup.misk:misk-aws2-dynamodb-testing:${Versions.misk}"
val miskCore = "com.squareup.misk:misk-core:${Versions.misk}"
val miskGcp = "com.squareup.misk:misk-gcp:${Versions.misk}"
val miskGcpTesting = "com.squareup.misk:misk-gcp-testing:${Versions.misk}"
val miskHibernate = "com.squareup.misk:misk-hibernate:${Versions.misk}"
val miskHibernateTesting = "com.squareup.misk:misk-hibernate-testing:${Versions.misk}"
val miskJdbc = "com.squareup.misk:misk-jdbc:${Versions.misk}"
Expand Down
45 changes: 45 additions & 0 deletions client-misk-spanner/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apply(plugin = "kotlin")

dependencies {
implementation(Dependencies.guava)
implementation(Dependencies.moshiCore)
implementation(Dependencies.moshiKotlin)
implementation(Dependencies.wireRuntime)
implementation(Dependencies.gcpSpanner)
implementation(Dependencies.guice)
implementation(Dependencies.okio)
implementation(Dependencies.kotlinStdLib)
implementation(Dependencies.loggingApi)
implementation(Dependencies.wireMoshiAdapter)

api(project(":client"))
// We do not want to leak client-base implementation details to customers.
implementation(project(":client-base"))

implementation(Dependencies.misk)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we limit our use of misk at least in non-test? Do we really need it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking through your code I think these only need to be testImplementation dependencies. Let's move those dependencies to test, rename the module, and add a comment so they don't leak to the main implementation.

implementation(Dependencies.miskActions)
implementation(Dependencies.miskCore)
implementation(Dependencies.miskGcp)
implementation(Dependencies.miskInject)

testImplementation(Dependencies.assertj)
testImplementation(Dependencies.miskTesting)
testImplementation(Dependencies.miskGcpTesting)
testImplementation(project(":client-misk"))
testImplementation(Dependencies.kotlinTest)
testImplementation(Dependencies.junitEngine)
testImplementation(Dependencies.okHttp)

testImplementation(project(":backfila-embedded"))
testImplementation(project(":client-testing"))
}

val jar by tasks.getting(Jar::class) {
baseName = "backfila-client-misk-spanner"
}

if (rootProject.file("hooks.gradle").exists()) {
apply(from = rootProject.file("hooks.gradle"))
}

apply(from = "$rootDir/gradle-mvn-publish.gradle")
4 changes: 4 additions & 0 deletions client-misk-spanner/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
POM_ARTIFACT_ID=client-misk-spanner
POM_NAME=client-misk-spanner
POM_DESCRIPTION=Misk spanner backfila client backend implementation library
POM_PACKAGING=jar
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package app.cash.backfila.client.misk.spanner

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import com.google.cloud.spanner.DatabaseClient
import com.google.cloud.spanner.KeyRange

abstract class SpannerBackfill<Param : Any> : Backfill {
/**
* A previously established connection to the DB that owns the table.
*/
abstract val dbClient: DatabaseClient

/**
* The name of the table to be used as the source of the backfill.
*/
abstract val tableName: String

/**
* A list of names of columns that make up the table's primary keys.
* Only primary key columns that are strings are supported.
*/
abstract val primaryKeyColumns: List<String>

/**
* Override this and throw an exception to prevent the backfill from being created.
* This is also a good place to do any prep work before batches are run.
*/
open fun validate(config: BackfillConfig<Param>) {}

/**
* Run a backfill operation based on the provided range of primary keys from `tableName`.
*/
abstract fun runBatch(range: KeyRange, config: BackfillConfig<Param>)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package app.cash.backfila.client.misk.spanner

import app.cash.backfila.client.spi.BackfillBackend
import app.cash.backfila.client.misk.spanner.internal.SpannerBackend
import com.google.inject.Binder
import com.google.inject.Provides
import com.google.inject.Singleton
import com.google.inject.TypeLiteral
import com.google.inject.multibindings.MapBinder
import com.google.inject.multibindings.Multibinder
import com.squareup.moshi.Moshi
import com.squareup.moshi.kotlin.reflect.KotlinJsonAdapterFactory
import javax.inject.Qualifier
import kotlin.reflect.KClass
import misk.inject.KAbstractModule
import kotlin.reflect.jvm.jvmName

class SpannerBackfillModule<T : SpannerBackfill<*>> private constructor(
private val backfillClass: KClass<T>
) : KAbstractModule() {
override fun configure() {
install(SpannerBackfillBackendModule)
// Ensures that the backfill class is injectable. If you are failing this check you probably
// want to add an @Inject annotation to your class or check that all of your dependencies are provided.
binder().getProvider(backfillClass.java)
mapBinder(binder()).addBinding(backfillClass.jvmName).toInstance(backfillClass)
}

companion object {
inline fun <reified T : SpannerBackfill<*>> create(): SpannerBackfillModule<T> =
create(T::class)

@JvmStatic
fun <T : SpannerBackfill<*>> create(backfillClass: KClass<T>): SpannerBackfillModule<T> {
return SpannerBackfillModule(backfillClass)
}

@JvmStatic
fun <T : SpannerBackfill<*>> create(backfillClass: Class<T>): SpannerBackfillModule<T> {
return SpannerBackfillModule(backfillClass.kotlin)
}
}
}

/**
* This is a kotlin object so these dependencies are only installed once.
*/
private object SpannerBackfillBackendModule : KAbstractModule() {
override fun configure() {
Multibinder.newSetBinder(binder(), BackfillBackend::class.java).addBinding()
.to(SpannerBackend::class.java)
}

@Provides @Singleton @ForSpannerBackend
fun provideSpannerMoshi(): Moshi {
return Moshi.Builder()
.add(KotlinJsonAdapterFactory())
.build()
}
}

private fun mapBinder(binder: Binder) = MapBinder.newMapBinder(
binder,
object : TypeLiteral<String>() {},
object : TypeLiteral<KClass<out SpannerBackfill<*>>>() {},
ForSpannerBackend::class.java
)

/** Annotation for specifying dependencies specifically for this Backend. */
@Qualifier annotation class ForSpannerBackend
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package app.cash.backfila.client.misk.spanner.internal

import app.cash.backfila.client.DeleteBy
import app.cash.backfila.client.Description
import app.cash.backfila.client.misk.spanner.ForSpannerBackend
import app.cash.backfila.client.misk.spanner.SpannerBackfill
import app.cash.backfila.client.parseDeleteByDate
import app.cash.backfila.client.spi.BackfillBackend
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillRegistration
import com.google.cloud.spanner.Spanner
import com.google.inject.Injector
import com.google.inject.TypeLiteral
import com.squareup.moshi.Moshi
import com.squareup.moshi.Types
import java.lang.reflect.ParameterizedType
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.reflect.KClass
import kotlin.reflect.full.findAnnotation

@Singleton
internal class SpannerBackend @Inject constructor(
jdm-square marked this conversation as resolved.
Show resolved Hide resolved
private val injector: Injector,
@ForSpannerBackend private val backfills: MutableMap<String, KClass<out SpannerBackfill<*>>>,
@ForSpannerBackend internal val moshi: Moshi,
internal val spanner: Spanner,
) : BackfillBackend {

private fun getBackfill(name: String): SpannerBackfill<*>? {
val backfillClass = backfills[name] ?: return null
return injector.getInstance(backfillClass.java) as SpannerBackfill<*>
}

private fun <Param : Any> createSpannerOperator(
backfill: SpannerBackfill<Param>
) = SpannerBackfillOperator(
backfill,
BackfilaParametersOperator(parametersClass(backfill::class)),
this,
)

override fun create(backfillName: String, backfillId: String): BackfillOperator? {
val backfill = getBackfill(backfillName)

if (backfill != null) {
@Suppress("UNCHECKED_CAST") // We don't know the types statically, so fake them.
return createSpannerOperator(backfill as SpannerBackfill<Any>)
}

return null
}

override fun backfills(): Set<BackfillRegistration> {
return backfills.map {
BackfillRegistration(
name = it.key,
description = it.value.findAnnotation<Description>()?.text,
parametersClass = parametersClass(it.value as KClass<SpannerBackfill<Any>>),
deleteBy = it.value.findAnnotation<DeleteBy>()?.parseDeleteByDate(),
)
}.toSet()
}

private fun <T : Any> parametersClass(backfillClass: KClass<out SpannerBackfill<T>>): KClass<T> {
// Like MyBackfill.
val thisType = TypeLiteral.get(backfillClass.java)

// Like Backfill<MyDataClass>.
val supertype = thisType.getSupertype(SpannerBackfill::class.java).type as ParameterizedType

// Like MyDataClass
return (Types.getRawType(supertype.actualTypeArguments[0]) as Class<T>).kotlin
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package app.cash.backfila.client.misk.spanner.internal

import app.cash.backfila.client.misk.spanner.SpannerBackfill
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
import app.cash.backfila.protos.clientservice.PrepareBackfillResponse
import app.cash.backfila.protos.clientservice.RunBatchRequest
import app.cash.backfila.protos.clientservice.RunBatchResponse
import com.google.cloud.spanner.Key
import com.google.cloud.spanner.KeyRange
import com.google.cloud.spanner.KeySet
import com.squareup.moshi.Moshi
import misk.moshi.adapter
import okio.ByteString
import okio.ByteString.Companion.encodeUtf8

class SpannerBackfillOperator<Param : Any> internal constructor(
override val backfill: SpannerBackfill<Param>,
private val parametersOperator: BackfilaParametersOperator<Param>,
backend: SpannerBackend,
) : BackfillOperator {
private var moshi: Moshi = backend.moshi
private val adapter = moshi.adapter<List<String>>()

override fun name() = backfill.javaClass.toString()

override fun prepareBackfill(request: PrepareBackfillRequest): PrepareBackfillResponse {
val config =
parametersOperator.constructBackfillConfig(request.parameters, request.dry_run)
backfill.validate(config)

val partitions = listOf(
PrepareBackfillResponse.Partition.Builder()
.backfill_range(request.range)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you requiring range to be passed in? In other implementations we compute the ranges if you don't pass it in

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The range is actually completely ignored. Spanner is unlike many other DBs, where for optimal performance primary keys really can't be in anything like a monotonic increasing range. I don't know how to compute a range without doing a full table scan, which seems... suboptimal.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't ask for min/max primary key value?

Copy link
Collaborator Author

@jdm-square jdm-square Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primary keys are often random values like UUIDs and unordered for optimal performance. Min/max aren't valid concepts, as far as I can tell. Source: https://cloud.google.com/spanner/docs/schema-design#primary-key-prevent-hotspots

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backfila requires ordered key values to operate. I'm curious how you would use it if that's not the case. I haven't used spanner but my understanding was its ordered, you just want to avoid sequential writes

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to answer the original question - we don’t require a range to be passed in. That’s optional.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm well aware how primary key design works in spanner, and you can have items added within the range. That's true even in auto increment, technically. It doesn't matter since the expectation is you are inserting new items that don't need backfilling.

It sounds like you are able to just ask spanner for records and it will give in some order, that should be fine I guess

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this work like dynamo backfills? Dynamo is somewhat different, but has a scan mechanism we use, and I believe we don't do ranges on it either? You could check that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that you will essentially run your backfill single threaded?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there must be some distributed way to process the whole data set in bulk? In Dynamo it is this idea of segments.

.partition_name("partition")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer something like single or only. This is exposed to the customer.

.build()
)

return PrepareBackfillResponse.Builder()
.partitions(partitions)
.build()
}

override fun getNextBatchRange(request: GetNextBatchRangeRequest): GetNextBatchRangeResponse {
// Establish a range to scan - either we want to start at the first key,
// or start from (and exclude) the last key that was scanned.
val range = if (request.previous_end_key == null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we're not using the backfill_range at all, that's what would be passed in by the user (or I missed it somewhere)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If I'm not mistaken, the DynamoDB backend also ignores it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DynamoDb is pretty limited because of dynamo itself, the hibernate one is pretty good to copy from. Obviously, build whatever features you want, I won't be using it :P

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need some guarantees around the end key otherwise you may be missing items, no? This was tricky with DynamoDb as well. We figured out some optimizations but since they weren't really documented we didn't add those to the client. In Dynamo we split up by segment but then don't complete the "batch" until the range is completed. Maybe Google has some better guarantees?

KeySet.all()
} else {
val previousEndKey = adapter.fromJson(request.previous_end_key.utf8())!!
KeySet.range(
KeyRange.openClosed(
Key.of(*previousEndKey.toTypedArray()),
Key.of(),
)
)
}

// Query the table with the desired range, only fetching the components of the primary key.
val query = backfill.dbClient.singleUseReadOnlyTransaction()
.read(backfill.tableName, range, backfill.primaryKeyColumns)
jdm-square marked this conversation as resolved.
Show resolved Hide resolved

val keys = mutableListOf<ByteString>()

// For each result, until we reach the maximum scan size, create a key representation that
// can be used to uniquely identify a result row.
var numberToScan = request.scan_size
while (numberToScan > 0 && query.next()) {
val newKey = adapter.toJson(
backfill.primaryKeyColumns.map { query.getString(it) }
).encodeUtf8()
keys.add(newKey)
numberToScan -= 1
}
query.close()

// Return the starting and ending keys obtained from the scan.
val batches = keys.chunked(request.batch_size.toInt()).map {
GetNextBatchRangeResponse.Batch.Builder()
.batch_range(
app.cash.backfila.protos.clientservice.KeyRange.Builder()
.start(it.first())
.end(it.last())
.build()
)
.scanned_record_count(it.size.toLong())
.matching_record_count(it.size.toLong())
.build()
}

return GetNextBatchRangeResponse.Builder()
.batches(batches)
.build()
}

override fun runBatch(request: RunBatchRequest): RunBatchResponse {
val config =
parametersOperator.constructBackfillConfig(request.parameters, request.dry_run)

// Create a range that encompasses the batch's starting and ending keys.
val startKey = adapter.fromJson(request.batch_range.start.utf8())!!.toTypedArray()
val endKey = adapter.fromJson(request.batch_range.end.utf8())!!.toTypedArray()
val keyRange = KeyRange.closedClosed(
Key.of(*startKey),
Key.of(*endKey),
)

// Let the backfill do whatever it wants for the given batch.
backfill.runBatch(keyRange, config)

return RunBatchResponse.Builder()
.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package app.cash.backfila.client.misk.spanner

import app.cash.backfila.client.BackfilaHttpClientConfig
import app.cash.backfila.client.misk.MiskBackfillModule
import misk.inject.KAbstractModule

class BackfillsModule : KAbstractModule() {
override fun configure() {
install(
MiskBackfillModule(
BackfilaHttpClientConfig(
url = "test.url", slack_channel = "#test"
),
dependsOn = listOf()
)
)
install(SpannerBackfillModule.create<SpannerBackfillTest.MakeTracksExplicitBackfill>())
}
}
Loading