Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stitching Upload Components to support multiple Upload Strategies #2286

Merged
merged 9 commits into from
Dec 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.google.android.fhir.demo.FhirApplication
import com.google.android.fhir.sync.AcceptLocalConflictResolver
import com.google.android.fhir.sync.DownloadWorkManager
import com.google.android.fhir.sync.FhirSyncWorker
import com.google.android.fhir.sync.upload.UploadStrategy

class DemoFhirSyncWorker(appContext: Context, workerParams: WorkerParameters) :
FhirSyncWorker(appContext, workerParams) {
Expand All @@ -32,5 +33,7 @@ class DemoFhirSyncWorker(appContext: Context, workerParams: WorkerParameters) :

override fun getConflictResolver() = AcceptLocalConflictResolver

override fun getUploadStrategy(): UploadStrategy = UploadStrategy.AllChangesSquashedBundlePut

override fun getFhirEngine() = FhirApplication.fhirEngine(applicationContext)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.google.android.fhir.sync.AcceptRemoteConflictResolver
import com.google.android.fhir.sync.DownloadWorkManager
import com.google.android.fhir.sync.FhirSyncWorker
import com.google.android.fhir.sync.download.DownloadRequest
import com.google.android.fhir.sync.upload.UploadStrategy
import com.google.common.truth.Truth.assertThat
import java.math.BigDecimal
import java.util.LinkedList
Expand Down Expand Up @@ -88,6 +89,8 @@ class FhirSyncWorkerBenchmark {
override fun getDownloadWorkManager(): DownloadWorkManager = BenchmarkTestDownloadManagerImpl()

override fun getConflictResolver() = AcceptRemoteConflictResolver

override fun getUploadStrategy(): UploadStrategy = UploadStrategy.AllChangesSquashedBundlePut
}

open class BenchmarkTestDownloadManagerImpl(queries: List<String> = listOf("List/sync-list")) :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import com.google.android.fhir.search.has
import com.google.android.fhir.search.include
import com.google.android.fhir.search.revInclude
import com.google.android.fhir.sync.upload.LocalChangesFetchMode
import com.google.android.fhir.sync.upload.ResourceUploadResponse
import com.google.android.fhir.sync.upload.UploadSyncResult
import com.google.android.fhir.testing.assertJsonArrayEqualsIgnoringOrder
import com.google.android.fhir.testing.assertResourceEquals
Expand Down Expand Up @@ -553,12 +554,14 @@ class DatabaseImplTest {
.first { it.resourceId == "remote-patient-3" }
.let {
UploadSyncResult.Success(
listOf(it),
listOf(
Patient().apply {
id = it.resourceId
meta = remoteMeta
},
ResourceUploadResponse(
listOf(it),
Patient().apply {
id = it.resourceId
meta = remoteMeta
},
),
),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import androidx.work.WorkManager
import androidx.work.WorkerParameters
import androidx.work.testing.WorkManagerTestInitHelper
import com.google.android.fhir.FhirEngine
import com.google.android.fhir.sync.upload.UploadStrategy
import com.google.android.fhir.testing.TestDataSourceImpl
import com.google.android.fhir.testing.TestDownloadManagerImpl
import com.google.android.fhir.testing.TestFhirEngineImpl
Expand Down Expand Up @@ -56,6 +57,8 @@ class SyncInstrumentedTest {
override fun getDownloadWorkManager(): DownloadWorkManager = TestDownloadManagerImpl()

override fun getConflictResolver() = AcceptRemoteConflictResolver

override fun getUploadStrategy(): UploadStrategy = UploadStrategy.AllChangesSquashedBundlePut
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import com.google.android.fhir.FhirEngine
import com.google.android.fhir.FhirEngineProvider
import com.google.android.fhir.OffsetDateTimeTypeAdapter
import com.google.android.fhir.sync.download.DownloaderImpl
import com.google.android.fhir.sync.upload.UploadStrategy
import com.google.android.fhir.sync.upload.Uploader
import com.google.android.fhir.sync.upload.patch.PatchGeneratorFactory
import com.google.android.fhir.sync.upload.request.UploadRequestGeneratorFactory
import com.google.gson.ExclusionStrategy
import com.google.gson.FieldAttributes
import com.google.gson.GsonBuilder
Expand All @@ -45,6 +48,8 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter

abstract fun getConflictResolver(): ConflictResolver

abstract fun getUploadStrategy(): UploadStrategy

private val gson =
GsonBuilder()
.registerTypeAdapter(OffsetDateTime::class.java, OffsetDateTimeTypeAdapter().nullSafe())
Expand All @@ -69,9 +74,18 @@ abstract class FhirSyncWorker(appContext: Context, workerParams: WorkerParameter
FhirSynchronizer(
applicationContext,
getFhirEngine(),
Uploader(dataSource),
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver(),
UploadConfiguration(
Uploader(
dataSource = dataSource,
patchGenerator = PatchGeneratorFactory.byMode(getUploadStrategy().patchGeneratorMode),
requestGenerator =
UploadRequestGeneratorFactory.byMode(getUploadStrategy().requestGeneratorMode),
),
),
DownloadConfiguration(
DownloaderImpl(dataSource, getDownloadWorkManager()),
getConflictResolver(),
),
)

val job =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,21 @@ private sealed class SyncResult {

data class ResourceSyncException(val resourceType: ResourceType, val exception: Exception)

internal data class UploadConfiguration(
val uploader: Uploader,
)

internal class DownloadConfiguration(
val downloader: Downloader,
val conflictResolver: ConflictResolver,
)

/** Class that helps synchronize the data source and save it in the local database */
internal class FhirSynchronizer(
context: Context,
private val fhirEngine: FhirEngine,
private val uploader: Uploader,
private val downloader: Downloader,
private val conflictResolver: ConflictResolver,
private val uploadConfiguration: UploadConfiguration,
private val downloadConfiguration: DownloadConfiguration,
) {

private val _syncState = MutableSharedFlow<SyncJobStatus>()
Expand Down Expand Up @@ -91,9 +99,9 @@ internal class FhirSynchronizer(

private suspend fun download(): SyncResult {
val exceptions = mutableListOf<ResourceSyncException>()
fhirEngine.syncDownload(conflictResolver) {
fhirEngine.syncDownload(downloadConfiguration.conflictResolver) {
flow {
downloader.download().collect {
downloadConfiguration.downloader.download().collect {
when (it) {
is DownloadState.Started -> {
setSyncState(SyncJobStatus.InProgress(SyncOperation.DOWNLOAD, it.total))
Expand All @@ -119,7 +127,8 @@ internal class FhirSynchronizer(
private suspend fun upload(): SyncResult {
val exceptions = mutableListOf<ResourceSyncException>()
val localChangesFetchMode = LocalChangesFetchMode.AllChanges
fhirEngine.syncUpload(localChangesFetchMode, uploader::upload).collect { progress ->
fhirEngine.syncUpload(localChangesFetchMode, uploadConfiguration.uploader::upload).collect {
progress ->
progress.uploadError?.let { exceptions.add(it) }
?: setSyncState(
SyncJobStatus.InProgress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,11 @@

package com.google.android.fhir.sync.upload

/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import com.google.android.fhir.LocalChangeToken
import com.google.android.fhir.db.Database
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.DomainResource
import org.hl7.fhir.r4.model.ResourceType
import timber.log.Timber

/**
* Represents a mechanism to consolidate resources after they are uploaded.
Expand All @@ -62,12 +45,16 @@ internal class DefaultResourceConsolidator(private val database: Database) : Res
when (uploadSyncResult) {
is UploadSyncResult.Success -> {
database.deleteUpdates(
LocalChangeToken(uploadSyncResult.localChanges.flatMap { it.token.ids }),
LocalChangeToken(
uploadSyncResult.uploadResponses.flatMap {
it.localChanges.flatMap { localChange -> localChange.token.ids }
},
),
)
uploadSyncResult.responseResources.forEach {
uploadSyncResult.uploadResponses.forEach {
when (it) {
is Bundle -> updateVersionIdAndLastUpdated(it)
else -> updateVersionIdAndLastUpdated(it)
is BundleComponentUploadResponse -> updateVersionIdAndLastUpdated(it.output)
is ResourceUploadResponse -> updateVersionIdAndLastUpdated(it.output)
}
}
}
Expand All @@ -78,23 +65,6 @@ internal class DefaultResourceConsolidator(private val database: Database) : Res
}
}

private suspend fun updateVersionIdAndLastUpdated(bundle: Bundle) {
when (bundle.type) {
Bundle.BundleType.TRANSACTIONRESPONSE -> {
bundle.entry.forEach {
when {
it.hasResource() -> updateVersionIdAndLastUpdated(it.resource)
it.hasResponse() -> updateVersionIdAndLastUpdated(it.response)
}
}
}
else -> {
// Leave it for now.
Timber.i("Received request to update meta values for ${bundle.type}")
}
}
}

private suspend fun updateVersionIdAndLastUpdated(response: Bundle.BundleEntryResponseComponent) {
if (response.hasEtag() && response.hasLastModified() && response.hasLocation()) {
response.resourceIdAndType?.let { (id, type) ->
Expand All @@ -108,7 +78,7 @@ internal class DefaultResourceConsolidator(private val database: Database) : Res
}
}

private suspend fun updateVersionIdAndLastUpdated(resource: Resource) {
private suspend fun updateVersionIdAndLastUpdated(resource: DomainResource) {
if (resource.hasMeta() && resource.meta.hasVersionId() && resource.meta.hasLastUpdated()) {
database.updateVersionIdAndLastUpdated(
resource.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package com.google.android.fhir.sync.upload
import com.google.android.fhir.sync.upload.patch.PatchGeneratorMode
import com.google.android.fhir.sync.upload.request.UploadRequestGeneratorMode
import org.hl7.fhir.r4.model.Bundle
import org.hl7.fhir.r4.model.codesystems.HttpVerb

/**
* Strategy to define how to upload the [LocalChange]s to the FHIR server.
Expand All @@ -35,33 +34,33 @@ private constructor(
internal val patchGeneratorMode: PatchGeneratorMode,
internal val requestGeneratorMode: UploadRequestGeneratorMode,
) {
object SingleChangePut :
UploadStrategy(
LocalChangesFetchMode.EarliestChange,
PatchGeneratorMode.PerChange,
UploadRequestGeneratorMode.UrlRequest(HttpVerb.PUT, HttpVerb.PATCH),
)
// object SingleChangePut :
// UploadStrategy(
// LocalChangesFetchMode.EarliestChange,
// PatchGeneratorMode.PerChange,
// UploadRequestGeneratorMode.UrlRequest(HttpVerb.PUT, HttpVerb.PATCH),
// )

object SingleChangePost :
UploadStrategy(
LocalChangesFetchMode.EarliestChange,
PatchGeneratorMode.PerChange,
UploadRequestGeneratorMode.UrlRequest(HttpVerb.POST, HttpVerb.PATCH),
)
// object SingleChangePost :
// UploadStrategy(
// LocalChangesFetchMode.EarliestChange,
// PatchGeneratorMode.PerChange,
// UploadRequestGeneratorMode.UrlRequest(HttpVerb.POST, HttpVerb.PATCH),
// )

object SingleResourcePut :
UploadStrategy(
LocalChangesFetchMode.PerResource,
PatchGeneratorMode.PerResource,
UploadRequestGeneratorMode.UrlRequest(HttpVerb.PUT, HttpVerb.PATCH),
)
// object SingleResourcePut :
// UploadStrategy(
// LocalChangesFetchMode.PerResource,
// PatchGeneratorMode.PerResource,
// UploadRequestGeneratorMode.UrlRequest(HttpVerb.PUT, HttpVerb.PATCH),
// )

object SingleResourcePost :
UploadStrategy(
LocalChangesFetchMode.PerResource,
PatchGeneratorMode.PerResource,
UploadRequestGeneratorMode.UrlRequest(HttpVerb.POST, HttpVerb.PATCH),
)
// object SingleResourcePost :
// UploadStrategy(
// LocalChangesFetchMode.PerResource,
// PatchGeneratorMode.PerResource,
// UploadRequestGeneratorMode.UrlRequest(HttpVerb.POST, HttpVerb.PATCH),
// )
aditya-07 marked this conversation as resolved.
Show resolved Hide resolved

object AllChangesBundlePut :
UploadStrategy(
Expand All @@ -70,12 +69,12 @@ private constructor(
UploadRequestGeneratorMode.BundleRequest(Bundle.HTTPVerb.PUT, Bundle.HTTPVerb.PATCH),
)

object AllChangesBundlePost :
UploadStrategy(
LocalChangesFetchMode.AllChanges,
PatchGeneratorMode.PerChange,
UploadRequestGeneratorMode.BundleRequest(Bundle.HTTPVerb.POST, Bundle.HTTPVerb.PATCH),
)
// object AllChangesBundlePost :
// UploadStrategy(
// LocalChangesFetchMode.AllChanges,
// PatchGeneratorMode.PerChange,
// UploadRequestGeneratorMode.BundleRequest(Bundle.HTTPVerb.POST, Bundle.HTTPVerb.PATCH),
// )

object AllChangesSquashedBundlePut :
UploadStrategy(
Expand All @@ -84,10 +83,10 @@ private constructor(
UploadRequestGeneratorMode.BundleRequest(Bundle.HTTPVerb.PUT, Bundle.HTTPVerb.PATCH),
)

object AllChangesSquashedBundlePost :
UploadStrategy(
LocalChangesFetchMode.AllChanges,
PatchGeneratorMode.PerResource,
UploadRequestGeneratorMode.BundleRequest(Bundle.HTTPVerb.POST, Bundle.HTTPVerb.PATCH),
)
// object AllChangesSquashedBundlePost :
// UploadStrategy(
// LocalChangesFetchMode.AllChanges,
// PatchGeneratorMode.PerResource,
// UploadRequestGeneratorMode.BundleRequest(Bundle.HTTPVerb.POST, Bundle.HTTPVerb.PATCH),
// )
}
Loading