Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Finalize Backfill Hook for All Backfila Clients #358

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,14 @@ class BackfilaClientServiceHandler @Inject constructor(
return loggingSetupProvider.withBackfillRunLogging(request.backfill_name, request.backfill_id) {
logger.info { "Finalizing backfill `${request.backfill_name}::${request.backfill_id}`" }

// This is a stub for now
return@withBackfillRunLogging FinalizeBackfillResponse()
val operator = operatorFactory.create(request.backfill_name, request.backfill_id)
try {
return@withBackfillRunLogging operator.finalizeBackfill(request)
} catch (exception: Exception) {
return@withBackfillRunLogging FinalizeBackfillResponse.Builder()
.error_message(exception.message)
.build()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import app.cash.backfila.client.BackfilaDefault
import app.cash.backfila.client.BackfilaRequired
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.Description
import app.cash.backfila.client.FinalizeBackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
import app.cash.backfila.protos.clientservice.RunBatchRequest
Expand Down Expand Up @@ -60,6 +62,13 @@ class BackfilaParametersOperator<T : Any>(
request.dry_run,
)

fun constructBackfillConfig(request: FinalizeBackfillRequest): FinalizeBackfillConfig<T> =
FinalizeBackfillConfig(
constructParameters(request.parameters),
request.backfill_id,
request.dry_run,
)

fun constructParameters(
parameters: MutableMap<String, ByteString>,
): T {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package app.cash.backfila.client.spi

import app.cash.backfila.client.Backfill
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
Expand All @@ -17,4 +19,5 @@ interface BackfillOperator {
fun prepareBackfill(request: PrepareBackfillRequest): PrepareBackfillResponse
fun getNextBatchRange(request: GetNextBatchRangeRequest): GetNextBatchRangeResponse
fun runBatch(request: RunBatchRequest): RunBatchResponse
fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app.cash.backfila.client.fixedset

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.FinalizeBackfillConfig
import app.cash.backfila.client.ValidateResult

abstract class FixedSetBackfill<Param : Any> : Backfill {
Expand All @@ -10,4 +11,9 @@ abstract class FixedSetBackfill<Param : Any> : Backfill {
}

abstract fun runOne(row: FixedSetRow, backfillConfig: BackfillConfig<Param>)

/**
* Override this to do any work after the backfill completes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI This is a test only implementation.

*/
open fun finalize(config: FinalizeBackfillConfig<Param>) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package app.cash.backfila.client.fixedset
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.client.spi.parametersToBytes
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.KeyRange
Expand Down Expand Up @@ -89,4 +91,12 @@ class FixedSetBackfillOperator<Param : Any>(
.end(end.toString().encodeUtf8())
.build()
}

override fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse {
val config = parametersOperator.constructBackfillConfig(request)
backfill.finalize(config)

return FinalizeBackfillResponse.Builder()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the // Return empty 200 to indicate it was successful similar comment above. Does it make sense to make that more apparant?

.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app.cash.backfila.client.dynamodbv2

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.FinalizeBackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import com.google.inject.TypeLiteral
import com.squareup.moshi.Types
Expand Down Expand Up @@ -87,4 +88,9 @@ abstract class DynamoDbBackfill<I : Any, P : Any> : Backfill {

/** See [ScanRequest.setExpressionAttributeNames]. */
open fun expressionAttributeNames(config: BackfillConfig<P>): Map<String, String>? = null

/**
* Override this to do any work after the backfill completes.
*/
open fun finalize(config: FinalizeBackfillConfig<P>) {}
mminkoffs marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package app.cash.backfila.client.dynamodbv2.internal
import app.cash.backfila.client.dynamodbv2.DynamoDbBackfill
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.KeyRange
Expand Down Expand Up @@ -160,6 +162,14 @@ class DynamoDbBackfillOperator<I : Any, P : Any>(
.build()
}

override fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse {
val config = parametersOperator.constructBackfillConfig(request)
backfill.finalize(config)

return FinalizeBackfillResponse.Builder()
.build()
}

private fun Map<String, AttributeValue>.toKeyRange(originalRange: DynamoDbKeyRange): KeyRange {
require(originalRange.start + 1 == originalRange.end)
return keyRangeCodec.encodeKeyRange(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app.cash.backfila.client.dynamodb

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.FinalizeBackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import com.amazonaws.services.dynamodbv2.model.ScanRequest
Expand Down Expand Up @@ -79,4 +80,9 @@ abstract class DynamoDbBackfill<I : Any, P : Any> : Backfill {

/** See [ScanRequest.setIndexName]. */
open fun indexName(config: BackfillConfig<P>): String? = null

/**
* Override this to do any work after the backfill completes.
*/
open fun finalize(config: FinalizeBackfillConfig<P>) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package app.cash.backfila.client.dynamodb.internal
import app.cash.backfila.client.dynamodb.DynamoDbBackfill
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.KeyRange
Expand Down Expand Up @@ -134,6 +136,14 @@ class DynamoDbBackfillOperator<I : Any, P : Any>(
.build()
}

override fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse {
val config = parametersOperator.constructBackfillConfig(request)
backfill.finalize(config)

return FinalizeBackfillResponse.Builder()
.build()
}

private fun Map<String, AttributeValue>.toKeyRange(originalRange: DynamoDbKeyRange): KeyRange {
require(originalRange.start + 1 == originalRange.end)
return keyRangeCodec.encodeKeyRange(originalRange.start, originalRange.end, originalRange.count, this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app.cash.backfila.client.jooq

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.FinalizeBackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import app.cash.backfila.protos.clientservice.KeyRange
import okio.ByteString
Expand Down Expand Up @@ -151,4 +152,9 @@ abstract class JooqBackfill<K, Param : Any> : Backfill {
fun compoundKeyComparer(): CompoundKeyComparer<K> {
return CompoundKeyComparer(compoundKeyFields)
}

/**
* Override this to do any work after the backfill completes.
*/
open fun finalize(config: FinalizeBackfillConfig<Param>) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import app.cash.backfila.client.jooq.CompoundKeyComparer
import app.cash.backfila.client.jooq.JooqBackfill
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.KeyRange
Expand Down Expand Up @@ -144,4 +146,12 @@ class JooqBackfillOperator<K, Param : Any> internal constructor(
.end(rangeEnd)
.build()
}

override fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse {
val config = parametersOperator.constructBackfillConfig(request)
backfill.finalize(config)

return FinalizeBackfillResponse.Builder()
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app.cash.backfila.client.misk.hibernate

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.FinalizeBackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import com.squareup.moshi.Types
import java.lang.reflect.ParameterizedType
Expand Down Expand Up @@ -83,4 +84,9 @@ abstract class HibernateBackfill<E : DbEntity<E>, Pkey : Any, Param : Any> : Bac
*/
open fun runOne(pkey: Pkey, config: BackfillConfig<Param>) {
}

/**
* Override this to do any work after the backfill completes.
*/
open fun finalizeBackfill(config: FinalizeBackfillConfig<Param>) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import app.cash.backfila.client.misk.hibernate.HibernateBackfill
import app.cash.backfila.client.misk.hibernate.PrimaryKeyCursorMapper
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse.Batch
Expand Down Expand Up @@ -307,6 +309,14 @@ internal class HibernateBackfillOperator<E : DbEntity<E>, Pkey : Any, Param : An
return RunBatchResponse.Builder().build()
}

override fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse {
val config = parametersOperator.constructBackfillConfig(request)
backfill.finalizeBackfill(config)

return FinalizeBackfillResponse.Builder()
.build()
}

private fun HibernateBackfill<*, *, *>.getPrimaryKeyPath(queryRoot: Root<*>): Path<Number> {
val fields = primaryKeyHibernateName().split('.')
var path = queryRoot as Path<Number>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app.cash.backfila.client.s3

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.FinalizeBackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import app.cash.backfila.client.s3.record.RecordStrategy

Expand Down Expand Up @@ -48,4 +49,9 @@ abstract class S3DatasourceBackfill<R : Any, P : Any> : Backfill {
* Produces records from the S3 file.
*/
abstract val recordStrategy: RecordStrategy<R>

/**
* Override this to do any work after the backfill completes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to update the comment a bit:
Override this to do any final work after the backfill completes. Only one successful call is expected in your distributed system and this call must be idempotent.

Does that make sense?

*/
open fun finalize(config: FinalizeBackfillConfig<P>) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import app.cash.backfila.client.s3.S3DatasourceBackfill
import app.cash.backfila.client.s3.shim.S3Service
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse.Batch
Expand Down Expand Up @@ -181,6 +183,14 @@ class S3DatasourceBackfillOperator<R : Any, P : Any>(
.build()
}

override fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse {
val config = parametersOperator.constructBackfillConfig(request)
backfill.finalize(config)

return FinalizeBackfillResponse.Builder()
.build()
}

data class DecodedRange(
val start: Long,
val end: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app.cash.backfila.client.stat

import app.cash.backfila.client.Backfill
import app.cash.backfila.client.BackfillConfig
import app.cash.backfila.client.FinalizeBackfillConfig
import app.cash.backfila.client.PrepareBackfillConfig
import com.google.inject.TypeLiteral
import com.squareup.moshi.Types
Expand Down Expand Up @@ -54,4 +55,9 @@ abstract class StaticDatasourceBackfill<I : Any, P : Any> : Backfill {
* This invokes the static list of items that the backfill will iterate over.
*/
abstract val staticDatasource: List<I>

/**
* Override this to do any work after the backfill completes.
*/
open fun finalize(config: FinalizeBackfillConfig<P>) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package app.cash.backfila.client.stat.internal
import app.cash.backfila.client.spi.BackfilaParametersOperator
import app.cash.backfila.client.spi.BackfillOperator
import app.cash.backfila.client.stat.StaticDatasourceBackfill
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.KeyRange
Expand Down Expand Up @@ -112,4 +114,12 @@ class StaticDatasourceBackfillOperator<I : Any, P : Any>(
companion object {
private const val PARTITION = "only"
}

override fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse {
val config = parametersOperator.constructBackfillConfig(request)
backfill.finalize(config)

return FinalizeBackfillResponse.Builder()
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@ data class PrepareBackfillConfig<Param : Any>(
val parameters: Param,
val dryRun: Boolean,
)

data class FinalizeBackfillConfig<Param : Any>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need better names and/or structure for these.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionlessBackfillConfig? Or WholeBackfillConfig?

This is config that is meant to represent the whole run. WholeRunBackfillConfig? Also add the accessor above. And probably an accessor to Prepare here too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with it as is

val parameters: Param,
val backfillId: String,
val dryRun: Boolean,
)
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,5 @@ message FinalizeBackfillRequest {
}

message FinalizeBackfillResponse {
optional string error_message = 1;
mminkoffs marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ internal class BackfillRegisteredParameters @Inject constructor(
private val queryFactory: Query.Factory,
) : HibernateBackfill<DbRegisteredBackfill, Id<DbRegisteredBackfill>, NoParameters>() {

override fun runOne(id: Id<DbRegisteredBackfill>, config: BackfillConfig<NoParameters>) {
override fun runOne(pkey: Id<DbRegisteredBackfill>, config: BackfillConfig<NoParameters>) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

undo the changes to this class

transacter.transaction { session ->
val backfill = session.load(id)
val backfill = session.load(pkey)
if (backfill.parameters.size == backfill.parameterNames().size) {
return@transaction
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package app.cash.backfila.service.selfbackfill

import app.cash.backfila.client.BackfilaClientServiceClient
import app.cash.backfila.client.spi.BackfilaClientServiceHandler
import app.cash.backfila.protos.clientservice.FinalizeBackfillRequest
import app.cash.backfila.protos.clientservice.FinalizeBackfillResponse
import app.cash.backfila.protos.clientservice.GetNextBatchRangeRequest
import app.cash.backfila.protos.clientservice.GetNextBatchRangeResponse
import app.cash.backfila.protos.clientservice.PrepareBackfillRequest
Expand All @@ -24,4 +26,8 @@ internal class LocalClientServiceClient @Inject internal constructor(
override suspend fun runBatch(request: RunBatchRequest): RunBatchResponse {
return backfilaClientServiceHandler.runBatch(request)
}

override suspend fun finalizeBackfill(request: FinalizeBackfillRequest): FinalizeBackfillResponse {
return backfilaClientServiceHandler.finalizeBackfill(request)
}
}
Loading
Loading