Skip to content

Commit

Permalink
Update README examples
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Meisel committed Jan 23, 2025
1 parent 38a4b87 commit fffe286
Showing 1 changed file with 68 additions and 14 deletions.
82 changes: 68 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,18 @@ object MyApp {

## Configuring the KPL
```scala
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider}
import software.amazon.awssdk.auth.credentials.{AwsCredentials,AwsCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.kinesis.producer._

object MyApp {
// A mock credentials provider
final case class AwsCreds(accessKey: String, secretKey: String)
extends AWSCredentials
with AWSCredentialsProvider {
override def getAWSAccessKeyId: String = accessKey
override def getAWSSecretKey: String = secretKey
override def getCredentials: AWSCredentials = this
override def refresh(): Unit = ()
extends AwsCredentials
with AwsCredentialsProvider {
override def accessKeyId(): String = accessKey
override def secretAccessKey(): String = secretKey
override def resolveCredentials(): AwsCredentials = this
}

object AwsCreds {
Expand All @@ -185,7 +186,7 @@ object MyApp {
val kplProducer = new KinesisProducer(
new KinesisProducerConfiguration()
.setCredentialsProvider(AwsCreds.LocalCreds)
.setRegion(Regions.US_EAST_1.getName)
.setRegion(Region.US_EAST_1.id())
.setKinesisEndpoint("localhost")
.setKinesisPort(4567L)
.setCloudwatchEndpoint("localhost")
Expand All @@ -212,6 +213,10 @@ import software.amazon.kinesis.checkpoint.CheckpointConfig
import software.amazon.kinesis.common._
import software.amazon.kinesis.coordinator.{CoordinatorConfig, Scheduler}
import software.amazon.kinesis.leases.LeaseManagementConfig
import software.amazon.kinesis.leases.dynamodb.{
DynamoDBLeaseManagementFactory,
DynamoDBLeaseSerializer
}
import software.amazon.kinesis.lifecycle.LifecycleConfig
import software.amazon.kinesis.lifecycle.events._
import software.amazon.kinesis.metrics.MetricsConfig
Expand Down Expand Up @@ -297,18 +302,67 @@ object MyApp {
val streamName = "some-stream-name"
// kinesis-mock only supports polling consumers today
val retrievalSpecificConfig = new PollingConfig(streamName, kinesisClient)
// Local setups require us to tweak the lease management configuration
val defaultLeaseManagement = new LeaseManagementConfig(
appName,
appName,
dynamoClient,
kinesisClient,
workerId
).shardSyncIntervalMillis(1000L)
.failoverTimeMillis(1000L)
val leaseManagementConfig = defaultLeaseManagement.leaseManagementFactory(
new DynamoDBLeaseManagementFactory(
defaultLeaseManagement.kinesisClient(),
defaultLeaseManagement.dynamoDBClient(),
defaultLeaseManagement.tableName(),
defaultLeaseManagement.workerIdentifier(),
defaultLeaseManagement.executorService(),
defaultLeaseManagement.failoverTimeMillis(),
defaultLeaseManagement.enablePriorityLeaseAssignment(),
defaultLeaseManagement.epsilonMillis(),
defaultLeaseManagement.maxLeasesForWorker(),
defaultLeaseManagement.maxLeasesToStealAtOneTime(),
defaultLeaseManagement.maxLeaseRenewalThreads(),
defaultLeaseManagement.cleanupLeasesUponShardCompletion(),
defaultLeaseManagement.ignoreUnexpectedChildShards(),
defaultLeaseManagement.shardSyncIntervalMillis(),
defaultLeaseManagement.consistentReads(),
defaultLeaseManagement.listShardsBackoffTimeInMillis(),
defaultLeaseManagement.maxListShardsRetryAttempts(),
defaultLeaseManagement.maxCacheMissesBeforeReload(),
defaultLeaseManagement.listShardsCacheAllowedAgeInSeconds(),
defaultLeaseManagement.cacheMissWarningModulus(),
defaultLeaseManagement.initialLeaseTableReadCapacity().toLong,
defaultLeaseManagement.initialLeaseTableWriteCapacity().toLong,
defaultLeaseManagement.tableCreatorCallback(),
defaultLeaseManagement.dynamoDbRequestTimeout(),
defaultLeaseManagement.billingMode(),
defaultLeaseManagement.leaseTableDeletionProtectionEnabled(),
defaultLeaseManagement.leaseTablePitrEnabled(),
defaultLeaseManagement.tags(),
new DynamoDBLeaseSerializer(),
defaultLeaseManagement.customShardDetectorProvider(),
false,
LeaseCleanupConfig
.builder()
.completedLeaseCleanupIntervalMillis(500L)
.garbageLeaseCleanupIntervalMillis(500L)
.leaseCleanupIntervalMillis(10.seconds.toMillis)
.build(),
defaultLeaseManagement
.workerUtilizationAwareAssignmentConfig()
.disableWorkerMetrics(true),
defaultLeaseManagement.gracefulLeaseHandoffConfig()
)
)

// Consumer can be executed from this by running scheduler.run()
val scheduler = new Scheduler(
new CheckpointConfig(),
new CoordinatorConfig(appName)
.parentShardPollIntervalMillis(1000L),
new LeaseManagementConfig(
appName,
dynamoClient,
kinesisClient,
workerId
).shardSyncIntervalMillis(1000L),
leaseManagementConfig,
new LifecycleConfig(),
new MetricsConfig(cloudwatchClient, appName),
new ProcessorConfig(KCLRecordProcessorFactory),
Expand Down

0 comments on commit fffe286

Please sign in to comment.