Skip to content

Commit

Permalink
Revert "Make pipeline version per Organism" (#3702)
Browse files Browse the repository at this point in the history
* Revert "Make pipeline version per Organism (#3534)"

This reverts commit 7166188.

* Update schema documentation based on migration changes

* dummy for ci

---------

Co-authored-by: GitHub Action <[email protected]>
Co-authored-by: Cornelius Roemer <[email protected]>
  • Loading branch information
3 people authored Feb 17, 2025
1 parent 946f44e commit 0f24409
Show file tree
Hide file tree
Showing 18 changed files with 125 additions and 435 deletions.
86 changes: 42 additions & 44 deletions backend/docs/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
-- PostgreSQL database dump
--

-- Dumped from database version 15.10 (Debian 15.10-1.pgdg120+1)
-- Dumped by pg_dump version 16.6 (Debian 16.6-1.pgdg120+1)
-- Dumped from database version 15.11 (Debian 15.11-1.pgdg120+1)
-- Dumped by pg_dump version 16.7 (Debian 16.7-1.pgdg120+1)

SET statement_timeout = 0;
SET lock_timeout = 0;
Expand Down Expand Up @@ -171,8 +171,7 @@ ALTER SEQUENCE public.audit_log_id_seq OWNED BY public.audit_log.id;

CREATE TABLE public.current_processing_pipeline (
version bigint NOT NULL,
started_using_at timestamp without time zone NOT NULL,
organism text NOT NULL
started_using_at timestamp without time zone NOT NULL
);


Expand All @@ -193,28 +192,6 @@ CREATE TABLE public.data_use_terms_table (

ALTER TABLE public.data_use_terms_table OWNER TO postgres;

--
-- Name: sequence_entries; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE public.sequence_entries (
accession text NOT NULL,
version bigint NOT NULL,
organism text NOT NULL,
submission_id text NOT NULL,
submitter text NOT NULL,
approver text,
group_id integer NOT NULL,
submitted_at timestamp without time zone NOT NULL,
released_at timestamp without time zone,
is_revocation boolean DEFAULT false NOT NULL,
original_data jsonb,
version_comment text
);


ALTER TABLE public.sequence_entries OWNER TO postgres;

--
-- Name: sequence_entries_preprocessed_data; Type: TABLE; Schema: public; Owner: postgres
--
Expand Down Expand Up @@ -246,18 +223,18 @@ CREATE VIEW public.external_metadata_view AS
WHEN (all_external_metadata.external_metadata IS NULL) THEN jsonb_build_object('metadata', (cpd.processed_data -> 'metadata'::text))
ELSE jsonb_build_object('metadata', ((cpd.processed_data -> 'metadata'::text) || all_external_metadata.external_metadata))
END AS joint_metadata
FROM (( SELECT sepd.accession,
sepd.version,
sepd.pipeline_version,
sepd.processed_data,
sepd.errors,
sepd.warnings,
sepd.processing_status,
sepd.started_processing_at,
sepd.finished_processing_at
FROM ((public.sequence_entries_preprocessed_data sepd
JOIN public.sequence_entries se ON (((sepd.accession = se.accession) AND (sepd.version = se.version))))
JOIN public.current_processing_pipeline cpp ON (((se.organism = cpp.organism) AND (sepd.pipeline_version = cpp.version))))) cpd
FROM (( SELECT sequence_entries_preprocessed_data.accession,
sequence_entries_preprocessed_data.version,
sequence_entries_preprocessed_data.pipeline_version,
sequence_entries_preprocessed_data.processed_data,
sequence_entries_preprocessed_data.errors,
sequence_entries_preprocessed_data.warnings,
sequence_entries_preprocessed_data.processing_status,
sequence_entries_preprocessed_data.started_processing_at,
sequence_entries_preprocessed_data.finished_processing_at
FROM public.sequence_entries_preprocessed_data
WHERE (sequence_entries_preprocessed_data.pipeline_version = ( SELECT current_processing_pipeline.version
FROM public.current_processing_pipeline))) cpd
LEFT JOIN public.all_external_metadata ON (((all_external_metadata.accession = cpd.accession) AND (all_external_metadata.version = cpd.version))));


Expand Down Expand Up @@ -444,6 +421,28 @@ CREATE TABLE public.seqsets (

ALTER TABLE public.seqsets OWNER TO postgres;

--
-- Name: sequence_entries; Type: TABLE; Schema: public; Owner: postgres
--

CREATE TABLE public.sequence_entries (
accession text NOT NULL,
version bigint NOT NULL,
organism text NOT NULL,
submission_id text NOT NULL,
submitter text NOT NULL,
approver text,
group_id integer NOT NULL,
submitted_at timestamp without time zone NOT NULL,
released_at timestamp without time zone,
is_revocation boolean DEFAULT false NOT NULL,
original_data jsonb,
version_comment text
);


ALTER TABLE public.sequence_entries OWNER TO postgres;

--
-- Name: sequence_entries_view; Type: VIEW; Schema: public; Owner: postgres
--
Expand All @@ -467,8 +466,7 @@ CREATE VIEW public.sequence_entries_view AS
(sepd.processed_data || em.joint_metadata) AS joint_metadata,
CASE
WHEN se.is_revocation THEN ( SELECT current_processing_pipeline.version
FROM public.current_processing_pipeline
WHERE (current_processing_pipeline.organism = se.organism))
FROM public.current_processing_pipeline)
ELSE sepd.pipeline_version
END AS pipeline_version,
sepd.errors,
Expand All @@ -486,9 +484,9 @@ CREATE VIEW public.sequence_entries_view AS
WHEN ((sepd.warnings IS NOT NULL) AND (jsonb_array_length(sepd.warnings) > 0)) THEN 'HAS_WARNINGS'::text
ELSE 'NO_ISSUES'::text
END AS processing_result
FROM (((public.sequence_entries se
LEFT JOIN public.sequence_entries_preprocessed_data sepd ON (((se.accession = sepd.accession) AND (se.version = sepd.version))))
LEFT JOIN public.current_processing_pipeline ccp ON (((se.organism = ccp.organism) AND (sepd.pipeline_version = ccp.version))))
FROM ((public.sequence_entries se
LEFT JOIN public.sequence_entries_preprocessed_data sepd ON (((se.accession = sepd.accession) AND (se.version = sepd.version) AND (sepd.pipeline_version = ( SELECT current_processing_pipeline.version
FROM public.current_processing_pipeline)))))
LEFT JOIN public.external_metadata_view em ON (((se.accession = em.accession) AND (se.version = em.version))));


Expand Down Expand Up @@ -603,7 +601,7 @@ ALTER TABLE ONLY public.audit_log
--

ALTER TABLE ONLY public.current_processing_pipeline
ADD CONSTRAINT current_processing_pipeline_pkey PRIMARY KEY (organism);
ADD CONSTRAINT current_processing_pipeline_pkey PRIMARY KEY (version);


--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ import org.flywaydb.core.Flyway
import org.jetbrains.exposed.spring.autoconfigure.ExposedAutoConfiguration
import org.jetbrains.exposed.sql.DatabaseConfig
import org.jetbrains.exposed.sql.Slf4jSqlDebugLogger
import org.jetbrains.exposed.sql.transactions.transaction
import org.loculus.backend.controller.LoculusCustomHeaders
import org.loculus.backend.log.REQUEST_ID_HEADER_DESCRIPTION
import org.loculus.backend.service.submission.CurrentProcessingPipelineTable
import org.loculus.backend.utils.DateProvider
import org.springdoc.core.customizers.OperationCustomizer
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.ImportAutoConfiguration
Expand Down Expand Up @@ -70,25 +67,13 @@ class BackendSpringConfig {

@Bean
@Profile("!test")
fun getFlyway(dataSource: DataSource, backendConfig: BackendConfig, dateProvider: DateProvider): Flyway {
fun getFlyway(dataSource: DataSource): Flyway {
val configuration = Flyway.configure()
.baselineOnMigrate(true)
.dataSource(dataSource)
.validateMigrationNaming(true)
val flyway = Flyway(configuration)
flyway.migrate()

// Since migration V1.10 we need to initialize the CurrentProcessingPipelineTable
// in code, because the configured organisms are not known in the SQL table definitions.
logger.info("Initializing CurrentProcessingPipelineTable")
transaction {
val insertedRows = CurrentProcessingPipelineTable.setV1ForOrganismsIfNotExist(
backendConfig.organisms.keys,
dateProvider.getCurrentDateTime(),
)
logger.info("$insertedRows inserted.")
}

return flyway
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ open class SubmissionController(
@RequestParam pipelineVersion: Long,
@RequestHeader(value = HttpHeaders.IF_NONE_MATCH, required = false) ifNoneMatch: String?,
): ResponseEntity<StreamingResponseBody> {
val currentProcessingPipelineVersion = submissionDatabaseService.getCurrentProcessingPipelineVersion(organism)
val currentProcessingPipelineVersion = submissionDatabaseService.getCurrentProcessingPipelineVersion()
if (pipelineVersion < currentProcessingPipelineVersion) {
throw UnprocessableEntityException(
"The processing pipeline version $pipelineVersion is not accepted " +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.loculus.backend.service.debug

import org.jetbrains.exposed.sql.deleteAll
import org.loculus.backend.config.BackendConfig
import org.jetbrains.exposed.sql.insert
import org.loculus.backend.service.datauseterms.DataUseTermsTable
import org.loculus.backend.service.submission.CurrentProcessingPipelineTable
import org.loculus.backend.service.submission.MetadataUploadAuxTable
Expand All @@ -13,7 +13,7 @@ import org.springframework.stereotype.Component
import org.springframework.transaction.annotation.Transactional

@Component
class DeleteSequenceDataService(private val dateProvider: DateProvider, private val config: BackendConfig) {
class DeleteSequenceDataService(private val dateProvider: DateProvider) {
@Transactional
fun deleteAllSequenceData() {
SequenceEntriesTable.deleteAll()
Expand All @@ -22,9 +22,9 @@ class DeleteSequenceDataService(private val dateProvider: DateProvider, private
SequenceUploadAuxTable.deleteAll()
DataUseTermsTable.deleteAll()
CurrentProcessingPipelineTable.deleteAll()
CurrentProcessingPipelineTable.setV1ForOrganismsIfNotExist(
config.organisms.keys,
dateProvider.getCurrentDateTime(),
)
CurrentProcessingPipelineTable.insert {
it[versionColumn] = 1
it[startedUsingAtColumn] = dateProvider.getCurrentDateTime()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,53 +1,11 @@
package org.loculus.backend.service.submission

import kotlinx.datetime.LocalDateTime
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.andWhere
import org.jetbrains.exposed.sql.batchInsert
import org.jetbrains.exposed.sql.kotlin.datetime.datetime
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.sql.update

const val CURRENT_PROCESSING_PIPELINE_TABLE_NAME = "current_processing_pipeline"

object CurrentProcessingPipelineTable : Table(CURRENT_PROCESSING_PIPELINE_TABLE_NAME) {
val organismColumn = varchar("organism", 255)
val versionColumn = long("version")
val startedUsingAtColumn = datetime("started_using_at")

/**
* Every organism needs to have a current pipeline version in the CurrentProcessingPipelineTable.
* This function sets V1 for all given organisms, if no version is defined yet.
*/
fun setV1ForOrganismsIfNotExist(organisms: Collection<String>, now: LocalDateTime) =
CurrentProcessingPipelineTable.batchInsert(organisms, ignore = true) { organism ->
this[organismColumn] = organism
this[versionColumn] = 1
this[startedUsingAtColumn] = now
}

/**
* Given a version that was found that is potentially newer than the current once, check if the currently stored
* 'current' pipeline version for this organism is less than the one that was found?
* If so, the pipeline needs to 'update' i.e. reprocess older entries.
*/
fun pipelineNeedsUpdate(maybeNewerVersion: Long, organism: String) = CurrentProcessingPipelineTable
.selectAll()
.where { versionColumn less maybeNewerVersion }
.andWhere { organismColumn eq organism }
.empty()
.not()

/**
* Set the pipeline version for the given organism to newVersion.
*/
fun updatePipelineVersion(organism: String, newVersion: Long, startedUsingAt: LocalDateTime) =
CurrentProcessingPipelineTable.update(
where = {
organismColumn eq organism
},
) {
it[versionColumn] = newVersion
it[startedUsingAtColumn] = startedUsingAt
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ object SequenceEntriesTable : Table(SEQUENCE_ENTRIES_TABLE_NAME) {
)
}

fun distinctOrganisms() = SequenceEntriesTable
.select(SequenceEntriesTable.organismColumn)
.withDistinct()
.asSequence()
.map {
it[SequenceEntriesTable.organismColumn]
}

fun accessionVersionIsIn(accessionVersions: List<AccessionVersionInterface>) =
Pair(accessionColumn, versionColumn) inList accessionVersions.toPairs()

Expand Down
Loading

0 comments on commit 0f24409

Please sign in to comment.