Skip to content

Commit

Permalink
GEOMESA-3395 Confluent - remove unused datastore parameters (#3169)
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Sep 16, 2024
1 parent 774f115 commit 0dc435b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,18 @@ object ConfluentKafkaDataStoreFactory extends GeoMesaDataStoreInfo with LazyLogg
largeText = true
)

private val UnusedParams =
Seq(
KafkaDataStoreParams.Brokers, // note: added separately so it's first in the list
KafkaDataStoreParams.Catalog,
KafkaDataStoreParams.Zookeepers,
KafkaDataStoreParams.ZkPath,
KafkaDataStoreParams.SerializationType,
)

override val ParameterInfo: Array[GeoMesaParam[_ <: AnyRef]] =
SchemaRegistryUrl +: KafkaDataStoreFactory.ParameterInfo :+ SchemaOverrides
Array(KafkaDataStoreParams.Brokers, SchemaRegistryUrl, SchemaOverrides) ++
KafkaDataStoreFactory.ParameterInfo.filterNot(UnusedParams.contains)

override def canProcess(params: java.util.Map[String, _]): Boolean =
KafkaDataStoreParams.Brokers.exists(params) && SchemaRegistryUrl.exists(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ConfluentKafkaDataStoreTest extends ConfluentContainerTest {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
// note: serializer will registry schemas with the registry automatically
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[KafkaAvroSerializer])
props.put("schema.registry.url", schemaRegistryUrl)
new KafkaProducer[String, GenericRecord](props)
Expand All @@ -58,7 +59,6 @@ class ConfluentKafkaDataStoreTest extends ConfluentContainerTest {
val params = Map(
"kafka.schema.registry.url" -> schemaRegistryUrl,
"kafka.brokers" -> brokers,
"kafka.zookeepers" -> zookeepers,
"kafka.topic.partitions" -> 1,
"kafka.topic.replication" -> 1,
"kafka.consumer.read-back" -> "Inf",
Expand Down

0 comments on commit 0dc435b

Please sign in to comment.