Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuring persistence plugins at runtime #194

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ jobs:
# TODO: could we poll the port instead of sleep?
sleep 10
docker exec -i docker-postgres-db-1 psql -U postgres -t < ddl-scripts/create_tables_postgres.sql
docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE DATABASE database1;'
docker exec -i docker-postgres-db-1 psql -U postgres -t -d database1 < ddl-scripts/create_tables_postgres.sql
docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE DATABASE database2;'
docker exec -i docker-postgres-db-1 psql -U postgres -t -d database2 < ddl-scripts/create_tables_postgres.sql

- name: test
run: sbt ++${{ matrix.SCALA_VERSION }} test
Expand Down Expand Up @@ -128,6 +132,10 @@ jobs:
# TODO: could we poll the port instead of sleep?
sleep 10
docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h yb-tserver-n1 -t < ddl-scripts/create_tables_yugabyte.sql
docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h yb-tserver-n1 -t -c 'CREATE DATABASE database1;'
docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h yb-tserver-n1 -t -d database1 < ddl-scripts/create_tables_yugabyte.sql
docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h yb-tserver-n1 -t -c 'CREATE DATABASE database2;'
docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h yb-tserver-n1 -t -d database2 < ddl-scripts/create_tables_yugabyte.sql

- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte -Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} test
Expand Down Expand Up @@ -183,6 +191,10 @@ jobs:
run: |-
docker compose -f docker/docker-compose-mysql.yml up -d --wait
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root -e 'CREATE SCHEMA database1;'
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=database1 < ddl-scripts/create_tables_mysql.sql
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root -e 'CREATE SCHEMA database2;'
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=database2 < ddl-scripts/create_tables_mysql.sql

- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}
Expand Down
78 changes: 45 additions & 33 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

// #journal-settings
pekko.persistence.r2dbc {
journal = ${pekko.persistence.r2dbc}
journal {
class = "org.apache.pekko.persistence.r2dbc.journal.R2dbcJournal"

Expand All @@ -13,34 +14,33 @@ pekko.persistence.r2dbc {

# event replay is using pekko.persistence.r2dbc.query.buffer-size

# Enable this to reduce latency of eventsBySlices. The persisted events will be
# published as Pekko messages and consumed directly by running eventsBySlices
# queries. Tradeoff is more CPU and network resources that are used. The events
# must still be retrieved from the database, but at a lower polling frequency,
# because delivery of published messages are not guaranteed.
publish-events = off

# replay filter not needed for this plugin
replay-filter.mode = off

use-connection-factory = "pekko.persistence.r2dbc.connection-factory"
}
}
// #journal-settings

// #snapshot-settings
pekko.persistence.r2dbc {
snapshot = ${pekko.persistence.r2dbc}
snapshot {
class = "org.apache.pekko.persistence.r2dbc.snapshot.R2dbcSnapshotStore"
table = "snapshot"

# Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058
plugin-dispatcher = "pekko.actor.default-dispatcher"

use-connection-factory = "pekko.persistence.r2dbc.connection-factory"
}
}
// #snapshot-settings

// #durable-state-settings
pekko.persistence.r2dbc {
# Durable state store
state = ${pekko.persistence.r2dbc}
state {
class = "org.apache.pekko.persistence.r2dbc.state.R2dbcDurableStateStoreProvider"

Expand All @@ -50,41 +50,19 @@ pekko.persistence.r2dbc {
# previous revision. There might be a small performance gain if
# this is disabled.
assert-single-writer = on

use-connection-factory = "pekko.persistence.r2dbc.connection-factory"
}
}
// #durable-state-settings

// #query-settings
pekko.persistence.r2dbc {
query = ${pekko.persistence.r2dbc}
query {
class = "org.apache.pekko.persistence.r2dbc.query.R2dbcReadJournalProvider"

# When live queries return no results or <= 10% of buffer-size, the next query
# to db will be delayed for this duration.
# When the number of rows from previous query is >= 90% of buffer-size, the next
# query will be emitted immediately.
# Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
# for half of this duration.
refresh-interval = 3s

# Live queries read events up to this duration from the current database time.
behind-current-time = 100 millis

backtracking {
enabled = on
# Backtracking queries will look back for this amount of time. It should
# not be larger than the pekko.projection.r2dbc.offset-store.time-window.
window = 2 minutes
# Backtracking queries read events up to this duration from the current database time.
behind-current-time = 10 seconds
}

# In-memory buffer holding events when reading from database.
buffer-size = 1000

persistence-ids {
buffer-size = 1000
}
table = ${pekko.persistence.r2dbc.journal.table}

# When journal publish-events is enabled a best effort deduplication can be enabled by setting
# this property to the size of the deduplication buffer in the `eventsBySlices` query.
Expand All @@ -94,6 +72,7 @@ pekko.persistence.r2dbc {
# the backtracking queries.
deduplicate-capacity = 0

use-connection-factory = "pekko.persistence.r2dbc.connection-factory"
}
}
// #query-settings
Expand Down Expand Up @@ -192,5 +171,38 @@ pekko.persistence.r2dbc {
# Set to 0 to log all calls.
log-db-calls-exceeding = 300 ms

# Enable this to reduce latency of eventsBySlices. The persisted events will be
# published as Pekko messages and consumed directly by running eventsBySlices
# queries. Tradeoff is more CPU and network resources that are used. The events
# must still be retrieved from the database, but at a lower polling frequency,
# because delivery of published messages are not guaranteed.
publish-events = off

# In-memory buffer holding events when reading from database.
buffer-size = 1000

# When live queries return no results or <= 10% of buffer-size, the next query
# to db will be delayed for this duration.
# When the number of rows from previous query is >= 90% of buffer-size, the next
# query will be emitted immediately.
# Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
# for half of this duration.
refresh-interval = 3s

# Live queries read events up to this duration from the current database time.
behind-current-time = 100 millis

backtracking {
enabled = on
# Backtracking queries will look back for this amount of time. It should
# not be larger than the pekko.projection.r2dbc.offset-store.time-window.
window = 2 minutes
# Backtracking queries read events up to this duration from the current database time.
behind-current-time = 10 seconds
}

persistence-ids {
buffer-size = 1000
}
}
// #connection-settings
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,8 @@ import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }
import com.typesafe.config.Config
import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.ConnectionPoolConfiguration
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
import io.r2dbc.postgresql.client.SSLMode
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions
import io.r2dbc.spi.Option
import scala.util.Failure
import scala.util.Success
import org.apache.pekko
import pekko.Done
import pekko.actor.CoordinatedShutdown
Expand All @@ -36,8 +28,18 @@ import pekko.actor.typed.Extension
import pekko.actor.typed.ExtensionId
import pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer
import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps
import pekko.util.ccompat.JavaConverters._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import io.r2dbc.pool.ConnectionPool
import io.r2dbc.pool.ConnectionPoolConfiguration
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
import io.r2dbc.postgresql.client.SSLMode
import io.r2dbc.spi.ConnectionFactories
import io.r2dbc.spi.ConnectionFactory
import io.r2dbc.spi.ConnectionFactoryOptions
import io.r2dbc.spi.Option

object ConnectionFactoryProvider extends ExtensionId[ConnectionFactoryProvider] {
def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new ConnectionFactoryProvider(system)
Expand Down Expand Up @@ -75,7 +77,6 @@ object ConnectionFactoryProvider extends ExtensionId[ConnectionFactoryProvider]

class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {

import R2dbcExecutor.PublisherOps
private val sessions = new ConcurrentHashMap[String, ConnectionPool]

CoordinatedShutdown(system)
Expand All @@ -86,15 +87,20 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {
.map(_ => Done)
}

def connectionFactoryFor(configLocation: String): ConnectionFactory = {
def connectionFactoryFor(configPath: String): ConnectionFactory = {
connectionFactoryFor(configPath, ConfigFactory.empty())
}

def connectionFactoryFor(configPath: String, config: Config): ConnectionFactory = {
sessions
.computeIfAbsent(
configLocation,
configLocation => {
val config = system.settings.config.getConfig(configLocation)
val settings = new ConnectionFactorySettings(config)
configPath,
_ => {
val fullConfig = config.withFallback(system.settings.config)
val settings =
new ConnectionFactorySettings(fullConfig.getConfig(configPath))
val customizer = createConnectionFactoryOptionsCustomizer(settings)
createConnectionPoolFactory(settings, customizer, config)
createConnectionPoolFactory(settings, customizer, fullConfig)
})
.asInstanceOf[ConnectionFactory]
}
Expand Down
Loading
Loading