Skip to content

Commit

Permalink
#637 remove usage of selectOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-git committed Aug 29, 2024
1 parent 23fe01b commit 10be874
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,37 +137,4 @@ object PointerStatements {
}
}
}

trait SelectOffset[F[_]] {

def apply(topic: Topic, partition: Partition): F[Option[Offset]]
}

object SelectOffset {

def of[F[_]: Monad: CassandraSession](
name: TableName,
consistencyConfig: EventualCassandraConfig.ConsistencyConfig.Read,
): F[SelectOffset[F]] = {

val query =
s"""
|SELECT offset FROM ${name.toCql}
|WHERE topic = ?
|AND partition = ?
|""".stripMargin

query
.prepare
.map { prepared => (topic: Topic, partition: Partition) =>
prepared
.bind()
.encode("topic", topic)
.encode("partition", partition)
.setConsistencyLevel(consistencyConfig.value)
.first
.map { _.map { _.decode[Offset]("offset") } }
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,7 @@ object ReplicatedCassandra {
new Main with ReplicatedPartitionJournal.Offsets[F] {

def get: F[Option[Offset]] = {
for {
offset <- statements.selectOffset2(topic, partition)
offset <- offset.fold {
val meterAndLog = {
for {
_ <- metrics.traverse_(_.selectOffsetFallback(topic, partition))
message = s"`selectOffset` was called for topic $topic and partition: $partition"
_ <- log.traverse_(_.warn(message))
} yield ()
}
statements.selectOffset(topic, partition) <* meterAndLog
} { _.some.pure[F] }
} yield offset
statements.selectOffset2(topic, partition)
}

def create(offset: Offset, timestamp: Instant): F[Unit] = {
Expand Down Expand Up @@ -705,7 +693,6 @@ object ReplicatedCassandra {
deleteRecordsTo: JournalStatements.DeleteTo[F],
deleteRecords: JournalStatements.Delete[F],
metaJournal: MetaJournalStatements[F],
selectOffset: PointerStatements.SelectOffset[F],
selectOffset2: Pointer2Statements.SelectOffset[F],
selectPointer: PointerStatements.Select[F],
selectPointer2: Pointer2Statements.Select[F],
Expand All @@ -732,7 +719,6 @@ object ReplicatedCassandra {
deleteRecordsTo <- JournalStatements.DeleteTo.of[F](schema.journal, consistencyConfig.write)
deleteRecords <- JournalStatements.Delete.of[F](schema.journal, consistencyConfig.write)
metaJournal <- MetaJournalStatements.of[F](schema, consistencyConfig)
selectOffset <- PointerStatements.SelectOffset.of[F](schema.pointer, consistencyConfig.read)
selectOffset2 <- Pointer2Statements.SelectOffset.of[F](schema.pointer2, consistencyConfig.read)
selectPointer <- PointerStatements.Select.of[F](schema.pointer, consistencyConfig.read)
selectPointer2 <- Pointer2Statements.Select.of[F](schema.pointer2, consistencyConfig.read)
Expand All @@ -748,7 +734,6 @@ object ReplicatedCassandra {
deleteRecordsTo,
deleteRecords,
metaJournal,
selectOffset,
selectOffset2,
selectPointer,
selectPointer2,
Expand Down

0 comments on commit 10be874

Please sign in to comment.