diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraSpec.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraSpec.scala index 9dd3215a8..10cfd75b0 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraSpec.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraSpec.scala @@ -3,18 +3,18 @@ package com.evolutiongaming.kafka.journal.eventual.cassandra import cats.Parallel import cats.effect.IO import cats.implicits.* +import com.evolutiongaming.catshelper.DataHelper.IterableOps1DataHelper import com.evolutiongaming.kafka.journal.* import com.evolutiongaming.kafka.journal.Journal.DataIntegrityConfig import com.evolutiongaming.kafka.journal.eventual.* import com.evolutiongaming.kafka.journal.eventual.EventualJournalSpec.* import com.evolutiongaming.kafka.journal.util.Fail import com.evolutiongaming.kafka.journal.util.TestTemporal.* -import com.evolutiongaming.skafka.{Offset, Topic} +import com.evolutiongaming.skafka.{Partition, Topic} import com.evolutiongaming.sstream.FoldWhile.* import com.evolutiongaming.sstream.Stream import java.time.{Instant, ZoneOffset} -import scala.collection.immutable.SortedSet // TODO expiry: test purge class EventualCassandraSpec extends EventualJournalSpec { @@ -64,8 +64,17 @@ object EventualCassandraSpec { } } - val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (_, _) => - none[Offset].pure[StateT] + val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (topic: Topic, partition: Partition) => + { + StateT { state => + val offset = state + .pointers + .getOrElse(topic, TopicPointers.empty) + .values + .get(partition) + (state, offset) + } + } } val selectPointer: PointerStatements.Select[StateT] = { (_, _) => @@ -339,7 +348,11 @@ object EventualCassandraSpec { } val selectTopics2: Pointer2Statements.SelectTopics[StateT] = { () => - SortedSet.empty[Topic].pure[StateT] + { + StateT { state => + (state, state.pointers.keySet.toSortedSet) + } + } } val metaJournal = ReplicatedCassandra.MetaJournalStatements( diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala index 47e282f8f..dc1f76b12 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/EventualCassandraTest.scala @@ -376,10 +376,6 @@ object EventualCassandraTest { } } - val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (_, _) => - none[Offset].pure[StateT] - } - val insertPointer: PointerStatements.Insert[StateT] = { (topic: Topic, partition: Partition, offset: Offset, created: Instant, updated: Instant) => { @@ -394,6 +390,20 @@ object EventualCassandraTest { } } + val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (topic: Topic, partition: Partition) => + { + StateT.success { state => + val offset = for { + pointers <- state.pointers.get(topic) + pointer <- pointers.get(partition) + } yield { + pointer.offset + } + (state, offset) + } + } + } + val updatePointer: PointerStatements.Update[StateT] = { (topic: Topic, partition: Partition, offset: Offset, timestamp: Instant) => { diff --git a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala index 55477e0e5..054bc74a6 100644 --- a/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala +++ b/eventual-cassandra/src/test/scala/com/evolutiongaming/kafka/journal/eventual/cassandra/ReplicatedCassandraTest.scala @@ -6,6 +6,7 @@ import cats.effect.{Poll, Sync} import cats.implicits.* import cats.syntax.all.none import cats.{Id, Parallel} +import com.evolutiongaming.catshelper.DataHelper.IterableOps1DataHelper import com.evolutiongaming.kafka.journal.* import com.evolutiongaming.kafka.journal.ExpireAfter.implicits.* import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType @@ -20,7 +21,6 @@ import play.api.libs.json.Json import java.time.{Instant, LocalDate, ZoneOffset} import java.util.concurrent.TimeUnit -import scala.collection.immutable.SortedSet import scala.concurrent.duration.* import scala.util.Try @@ -1269,8 +1269,18 @@ object ReplicatedCassandraTest { } } - val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (_, _) => - none[Offset].pure[StateT] + val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (topic: Topic, partition: Partition) => + { + StateT.success { state => + val offset = for { + pointers <- state.pointers.get(topic) + pointer <- pointers.get(partition) + } yield { + pointer.offset + } + (state, offset) + } + } } val selectPointer: PointerStatements.Select[StateT] = { (_, _) => @@ -1317,7 +1327,12 @@ object ReplicatedCassandraTest { } val selectTopics2: Pointer2Statements.SelectTopics[StateT] = { () => - SortedSet.empty[Topic].pure[StateT] + { + StateT.success { state => + val topics = state.pointers.keySet.toSortedSet + (state, topics) + } + } } val statements: ReplicatedCassandra.Statements[StateT] = {