Skip to content

Commit

Permalink
#637 adjust unit-tests by moving code from removed stubs to currently…
Browse files Browse the repository at this point in the history
… active stubs
  • Loading branch information
mr-git committed Aug 29, 2024
1 parent 07e1caa commit 72b1576
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ package com.evolutiongaming.kafka.journal.eventual.cassandra
import cats.Parallel
import cats.effect.IO
import cats.implicits.*
import com.evolutiongaming.catshelper.DataHelper.IterableOps1DataHelper
import com.evolutiongaming.kafka.journal.*
import com.evolutiongaming.kafka.journal.Journal.DataIntegrityConfig
import com.evolutiongaming.kafka.journal.eventual.*
import com.evolutiongaming.kafka.journal.eventual.EventualJournalSpec.*
import com.evolutiongaming.kafka.journal.util.Fail
import com.evolutiongaming.kafka.journal.util.TestTemporal.*
import com.evolutiongaming.skafka.{Offset, Topic}
import com.evolutiongaming.skafka.{Partition, Topic}
import com.evolutiongaming.sstream.FoldWhile.*
import com.evolutiongaming.sstream.Stream

import java.time.{Instant, ZoneOffset}
import scala.collection.immutable.SortedSet

// TODO expiry: test purge
class EventualCassandraSpec extends EventualJournalSpec {
Expand Down Expand Up @@ -64,8 +64,17 @@ object EventualCassandraSpec {
}
}

val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (_, _) =>
none[Offset].pure[StateT]
val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (topic: Topic, partition: Partition) =>
{
StateT { state =>
val offset = state
.pointers
.getOrElse(topic, TopicPointers.empty)
.values
.get(partition)
(state, offset)
}
}
}

val selectPointer: PointerStatements.Select[StateT] = { (_, _) =>
Expand Down Expand Up @@ -339,7 +348,11 @@ object EventualCassandraSpec {
}

val selectTopics2: Pointer2Statements.SelectTopics[StateT] = { () =>
SortedSet.empty[Topic].pure[StateT]
{
StateT { state =>
(state, state.pointers.keySet.toSortedSet)
}
}
}

val metaJournal = ReplicatedCassandra.MetaJournalStatements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,6 @@ object EventualCassandraTest {
}
}

val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (_, _) =>
none[Offset].pure[StateT]
}

val insertPointer: PointerStatements.Insert[StateT] = {
(topic: Topic, partition: Partition, offset: Offset, created: Instant, updated: Instant) =>
{
Expand All @@ -394,6 +390,20 @@ object EventualCassandraTest {
}
}

val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (topic: Topic, partition: Partition) =>
{
StateT.success { state =>
val offset = for {
pointers <- state.pointers.get(topic)
pointer <- pointers.get(partition)
} yield {
pointer.offset
}
(state, offset)
}
}
}

val updatePointer: PointerStatements.Update[StateT] = {
(topic: Topic, partition: Partition, offset: Offset, timestamp: Instant) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.effect.{Poll, Sync}
import cats.implicits.*
import cats.syntax.all.none
import cats.{Id, Parallel}
import com.evolutiongaming.catshelper.DataHelper.IterableOps1DataHelper
import com.evolutiongaming.kafka.journal.*
import com.evolutiongaming.kafka.journal.ExpireAfter.implicits.*
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
Expand All @@ -20,7 +21,6 @@ import play.api.libs.json.Json

import java.time.{Instant, LocalDate, ZoneOffset}
import java.util.concurrent.TimeUnit
import scala.collection.immutable.SortedSet
import scala.concurrent.duration.*
import scala.util.Try

Expand Down Expand Up @@ -1269,8 +1269,18 @@ object ReplicatedCassandraTest {
}
}

val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (_, _) =>
none[Offset].pure[StateT]
val selectOffset2: Pointer2Statements.SelectOffset[StateT] = { (topic: Topic, partition: Partition) =>
{
StateT.success { state =>
val offset = for {
pointers <- state.pointers.get(topic)
pointer <- pointers.get(partition)
} yield {
pointer.offset
}
(state, offset)
}
}
}

val selectPointer: PointerStatements.Select[StateT] = { (_, _) =>
Expand Down Expand Up @@ -1317,7 +1327,12 @@ object ReplicatedCassandraTest {
}

val selectTopics2: Pointer2Statements.SelectTopics[StateT] = { () =>
SortedSet.empty[Topic].pure[StateT]
{
StateT.success { state =>
val topics = state.pointers.keySet.toSortedSet
(state, topics)
}
}
}

val statements: ReplicatedCassandra.Statements[StateT] = {
Expand Down

0 comments on commit 72b1576

Please sign in to comment.