Skip to content

Commit

Permalink
remove reading of pointer table, but keep writing (#637) (#658)
Browse files Browse the repository at this point in the history
closes #637
  • Loading branch information
mr-git authored Aug 30, 2024
1 parent df9ecc4 commit 4d40d42
Show file tree
Hide file tree
Showing 10 changed files with 520 additions and 773 deletions.
134 changes: 130 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,138 @@ lazy val commonSettings = Seq(
},
)

// TODO MR remove after 3.5.0 release
import com.typesafe.tools.mima.core.*
ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.CreateSchema.apply"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.CreateSchema.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.SetupSchema.migrate"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("com.evolutiongaming.kafka.journal.eventual.cassandra.SetupSchema.migrate"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.selectOffset",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.copy",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.copy$default$3",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.copy$default$4",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.this",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.apply",
),
ProblemFilters.exclude[IncompatibleSignatureProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandra#Statements.unapply",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements$SelectOffset",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements$SelectOffset$",
),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements#SelectTopics.of",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.selectOffset",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.selectTopics",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$10",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$11",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$12",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$13",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$14",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$15",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$5",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$6",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$7",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$8",
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$9",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.this",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.apply",
),
ProblemFilters.exclude[IncompatibleSignatureProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.unapply",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements$SelectTopics",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements$SelectTopics$",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.Pointer2Statements$UpdateCreated",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.Pointer2Statements$UpdateCreated$",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements$Select",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements$Select$",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements$Select$Result",
),
ProblemFilters.exclude[MissingClassProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.PointerStatements$Select$Result$",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.selectPointer",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.updatePointerCreated2",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$12",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.cassandra.ReplicatedCassandra#Statements.copy$default$13",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.ReplicatedJournal#Metrics.topicsFallback",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.ReplicatedJournal#Metrics.selectOffsetFallback",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.ReplicatedJournal#Metrics.selectPointerFallback",
),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.kafka.journal.eventual.ReplicatedJournal#Metrics.updatePointerCreated2Fallback",
),
)

val alias: Seq[sbt.Def.Setting[?]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,14 @@ object EventualCassandra {
}

def offset(topic: Topic, partition: Partition): F[Option[Offset]] = {
for {
offset <- statements.selectOffset2(topic, partition)
offset <- offset.fold { statements.selectOffset(topic, partition) } { _.some.pure[F] }
} yield offset
statements.selectOffset2(topic, partition)
}
}
}

final case class Statements[F[_]](
records: JournalStatements.SelectRecords[F],
metaJournal: MetaJournalStatements[F],
selectOffset: PointerStatements.SelectOffset[F],
selectOffset2: Pointer2Statements.SelectOffset[F],
)

Expand All @@ -254,10 +250,9 @@ object EventualCassandra {
for {
selectRecords <- JournalStatements.SelectRecords.of[F](schema.journal, consistencyConfig)
metaJournal <- MetaJournalStatements.of(schema, segmentNrsOf, segments, consistencyConfig)
selectOffset <- PointerStatements.SelectOffset.of[F](schema.pointer, consistencyConfig)
selectOffset2 <- Pointer2Statements.SelectOffset.of[F](schema.pointer2, consistencyConfig)
} yield {
Statements(selectRecords, metaJournal, selectOffset, selectOffset2)
Statements(selectRecords, metaJournal, selectOffset2)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,39 +200,4 @@ object Pointer2Statements {
}
}
}

// TODO MR remove with next major release
trait UpdateCreated[F[_]] {

def apply(topic: Topic, partition: Partition, offset: Offset, created: Instant, updated: Instant): F[Unit]
}

object UpdateCreated {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Write,
): F[UpdateCreated[F]] = {
s"""
|UPDATE ${name.toCql}
|SET offset = ?, created = ?, updated = ?
|WHERE topic = ?
|AND partition = ?
|"""
.stripMargin
.prepare
.map { prepared => (topic: Topic, partition: Partition, offset: Offset, created: Instant, updated: Instant) =>
prepared
.bind()
.encode("topic", topic)
.encode("partition", partition)
.encode("offset", offset)
.encode("created", created)
.encode("updated", updated)
.setConsistencyLevel(consistencyConfig.value)
.first
.void
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package com.evolutiongaming.kafka.journal.eventual.cassandra

import cats.Monad
import cats.syntax.all.*
import com.datastax.driver.core.GettableByNameData
import com.evolutiongaming.catshelper.DataHelper.*
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraHelper.*
import com.evolutiongaming.kafka.journal.util.SkafkaHelper.*
import com.evolutiongaming.scassandra.TableName
import com.evolutiongaming.scassandra.syntax.*
import com.evolutiongaming.scassandra.{DecodeRow, TableName}
import com.evolutiongaming.skafka.{Offset, Partition, Topic}

import java.time.Instant
import scala.collection.immutable.SortedSet

object PointerStatements {

Expand Down Expand Up @@ -97,106 +94,4 @@ object PointerStatements {
}
}
}

trait Select[F[_]] {

def apply(topic: Topic, partition: Partition): F[Option[Select.Result]]
}

object Select {

final case class Result(created: Option[Instant])

object Result {
implicit val decodeResult: DecodeRow[Result] = { (row: GettableByNameData) =>
{
Result(row.decode[Option[Instant]]("created"))
}
}
}

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read,
): F[Select[F]] = {
s"""
|SELECT created FROM ${name.toCql}
|WHERE topic = ?
|AND partition = ?
|"""
.stripMargin
.prepare
.map { prepared => (topic: Topic, partition: Partition) =>
prepared
.bind()
.encode("topic", topic)
.encode("partition", partition)
.setConsistencyLevel(consistencyConfig.value)
.first
.map { _.map { _.decode[Result] } }
}
}
}

trait SelectOffset[F[_]] {

def apply(topic: Topic, partition: Partition): F[Option[Offset]]
}

object SelectOffset {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read,
): F[SelectOffset[F]] = {

val query =
s"""
|SELECT offset FROM ${name.toCql}
|WHERE topic = ?
|AND partition = ?
|""".stripMargin

query
.prepare
.map { prepared => (topic: Topic, partition: Partition) =>
prepared
.bind()
.encode("topic", topic)
.encode("partition", partition)
.setConsistencyLevel(consistencyConfig.value)
.first
.map { _.map { _.decode[Offset]("offset") } }
}
}
}

trait SelectTopics[F[_]] {
def apply(): F[SortedSet[Topic]]
}

object SelectTopics {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read,
): F[SelectTopics[F]] = {

val query = s"""SELECT DISTINCT topic FROM ${name.toCql}""".stripMargin
query
.prepare
.map { prepared => () =>
{
prepared
.bind()
.setConsistencyLevel(consistencyConfig.value)
.execute
.toList
.map { records =>
records.map { _.decode[Topic]("topic") }.toSortedSet
}
}
}
}
}
}
Loading

0 comments on commit 4d40d42

Please sign in to comment.