diff --git a/Dockerfile b/Dockerfile index 6610f87..ee3cc86 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ -FROM jboss/keycloak:10.0.1 +FROM jboss/keycloak:14.0.0 -ADD ./keycloak-kafka-1.0.0-jar-with-dependencies.jar /opt/jboss/keycloak/standalone/deployments/ +ADD ./keycloak-kafka-1.1.0-jar-with-dependencies.jar /opt/jboss/keycloak/standalone/deployments/ -ADD kafka-module.cli /opt/jboss/startup-scripts/ +ADD add-kafka-config.cli /opt/jboss/startup-scripts/ #ADD realm-export.json /init/ @@ -17,4 +17,4 @@ EXPOSE 8443 ENTRYPOINT [ "/opt/jboss/tools/docker-entrypoint.sh" ] -CMD ["-b", "0.0.0.0", "-Dkeycloak.import=/init/realm-export.json"] \ No newline at end of file +CMD ["-b", "0.0.0.0"] \ No newline at end of file diff --git a/README.md b/README.md index 70a48f9..e33288e 100644 --- a/README.md +++ b/README.md @@ -4,38 +4,45 @@ Simple module for [Keycloak](https://www.keycloak.org/) to produce keycloak even - [Keycloak Kafka Module](#keycloak-kafka-module) * [Build](#build) * [Installation](#installation) - * [Configuration](#configuration) - + [Enable Events in keycloak](#enable-events-in-keycloak) - + [Kafka module](#kafka-module) - * [Docker Container](#configuration) + * [Module Configuration](#module-configuration) + + [Kafka client configuration](#kafka-client-configuration) + + [Kafka client using secure connection](#kafka-client-using-secure-connection) + * [Module Deployment](#module-deployment) + * [Keycloak Configuration](#keycloak-configuration) + + [Enable Events in keycloak](#enable-events-in-keycloak) + * [Docker Container](#docker-container) * [Sample Client](#sample-client) **Tested with** -Kafka version: `2.12-2.1.x`, `2.12-2.4.x`, `2.12-2.5.x` +Kafka version: `2.12-2.1.x`, `2.12-2.4.x`, `2.12-2.5.x`, `2.13-2.8` -Keycloak version: `4.8.3`, `6.0.x`, `7.0.0`, `9.0.x`, `10.0.x` +Keycloak version: `4.8.3`, `6.0.x`, `7.0.0`, `9.0.x`, `10.0.x`, `13.0.x`, `14.0.x` Java version: `11`, `13` ## Build +You can simply use Maven to build the jar file. Thanks to the assembly plugin the build process will create a fat jar that includes all dependencies and makes the deployment quite easy. +Just use the following command to build the jar file. -`mvn clean package` +```bash +mvn clean package +``` ## Installation -First you have to build or [download](https://github.com/SnuK87/keycloak-kafka/releases) the keycloak-kafka module. +First you need to build or [download](https://github.com/SnuK87/keycloak-kafka/releases) the keycloak-kafka module. -To install the module to your keycloak server first you have to configure the module and then deploy the it. -If you deploy the module without configuration your keycloak server will fail to start up with a NullPointerException. +To install the module to your keycloak server you have to configure the module and deploy it. +If you deploy the module without configuration, your keycloak server will fail to start throwing a `NullPointerException`. If you want to install the module manually as described in the initial version you can follow this [guide](https://github.com/SnuK87/keycloak-kafka/wiki/Manual-Installation). -### Module configuration -Download the [CLI script](kafka-module.cli) from this repository and edit the properties to fit your environment. Also make sure that you use the right -server config (line 1). As a default the script will change the `standalone.xml`. +## Module Configuration +Download the [CLI script](add-kafka-config.cli) from this repository and edit the properties to fit your environment. Also make sure to use the right +server config (line 1). As default the script will configure the module in the `standalone.xml`. (Be aware that the docker image uses the `standalone-ha.xml` by default) -Currently the following properties are available and should be changed to fit your environemnt: +The following properties are mandatory and can be set via environment variables (e.g. `${env.KAFKA_TOPIC}`) `topicEvents`: The name of the kafka topic to where the events will be produced to. @@ -45,16 +52,41 @@ Currently the following properties are available and should be changed to fit y `events`: (Optional; default=REGISTER) The events that will be send to kafka. -`topicAdminEvents`: (Optional) The name of the kafka topic to where the admin events will be produced to. +`topicAdminEvents`: (Optional) The name of the kafka topic to where the admin events will be produced to. No events will be produced when this property isn't set. + +A list of available events can be found [here](https://www.keycloak.org/docs/latest/server_admin/#event-types) + +Run the CLI script using the following command and check the output on the console. You should see some server logs and lines of `{"outcome" => "success"}`. -Run the CLI script using the following command and check the output on the console. You should see some server logs and 6 lines of `{"outcome" => "success"}`. ```bash -$KEYCLOAK_HOME/bin/jboss-cli.sh --file=/path/to/kafka-module.cli +$KEYCLOAK_HOME/bin/jboss-cli.sh --file=/path/to/add-kafka-config.cli +``` + +If you want to remove the configuration of the keycloak-kafka module from your server you can run [this](remove-kafka-config.cli). + +### Kafka client configuration +It's also possible to configure the kafka client by adding parameters to the cli script. This makes it possible to connect this module to a kafka broker that requires SSL/TLS connections. +For example to change the timeout of how long the producer will block the thread to 10 seconds you just have to add the following line to the cli script. + +``` +/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=max.block.ms,value=10000) +``` + +Note the difference of `kafka:map-put` for kafka client parameters compared to `kafka:write-attribute` for module parameters. +A full list of available configurations can be found in the [official kafka docs](https://kafka.apache.org/documentation/#producerconfigs). + +### Kafka client using secure connection +As mentioned above the kafka client can be configured through the cli script. To make the kafka open a SSL/TLS secured connection you can add the following lines to the script: + +``` +/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=security.protocol,value=SSL) +/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=ssl.truststore.location,value=kafka.client.truststore.jks) +/subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=ssl.truststore.password,value=test1234) ``` -### Module deployment -Copy the `keycloak-kafka--jar-with-dependencies.jar` into the $KEYCLOAK_HOME/standalone/deployments folder. Keycloak will automatically -install the module with all dependencies on start up. To verify that the deployment of the module was successful you can check if a new file +## Module Deployment +Copy the `keycloak-kafka--jar-with-dependencies.jar` into the `$KEYCLOAK_HOME/standalone/deployments` folder. Keycloak will automatically +install the module with all it's dependencies on start up. To verify that the deployment of the module was successful you can check if a new file with the name `keycloak-kafka--jar-with-dependencies.jar.deployed` was created in the same folder. @@ -66,16 +98,17 @@ with the name `keycloak-kafka--jar-with-dependencies.jar.deployed` was 3. Go to Events 4. Open `Config` tab and add `kafka` to Event Listeners. +![Admin console config](images/event_config.png) ## Docker Container The simplest way to enable the kafka module in a docker container is to create a custom docker image from the [keycloak base image](https://hub.docker.com/r/jboss/keycloak/). The `keycloak-kafka--jar-with-dependencies.jar` must be added to the `/standalone/deployments` folder and the CLI script must be added to the `/opt/jboss/startup-scripts/` folder -as explained in [Installation](#installation). The only difference is that the CLI script will be executed automatically in start up and doesn't have to be executed manually. +as explained in [Installation](#installation). The only difference is that the CLI script will be executed automatically on start up and doesn't have to be executed manually. An example can be found in this [Dockerfile](Dockerfile). ## Sample Client -The following snippet shows a minimal Spring Boot Kafka client to consume keycloak events. Additional properties can be added to `KeycloakEvent`. +The following snippet shows a minimal Spring Boot Kafka client to consume keycloak events. Additional properties can be added to the `KeycloakEvent` class. ```java @SpringBootApplication diff --git a/kafka-module.cli b/add-kafka-config.cli similarity index 57% rename from kafka-module.cli rename to add-kafka-config.cli index 0cfb673..e3462ea 100644 --- a/kafka-module.cli +++ b/add-kafka-config.cli @@ -3,9 +3,10 @@ embed-server --server-config=standalone.xml --std-out=echo if (outcome != success) of /subsystem=keycloak-server/spi=eventsListener:read-resource() /subsystem=keycloak-server/spi=eventsListener:add() /subsystem=keycloak-server/spi=eventsListener/provider=kafka:add(enabled=true) - /subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.topicEvents,value=keycloak-events) - /subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.clientId,value=keycloak) - /subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.bootstrapServers,value=192.168.0.1:9092) - /subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.events,value=REGISTER) + /subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.topicEvents,value=${env.KAFKA_TOPIC}) + /subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.clientId,value=${env.KAFKA_CLIENT_ID}) + /subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.bootstrapServers,value=${env.KAFKA_BOOTSTRAP_SERVERS}) + /subsystem=keycloak-server/spi=eventsListener/provider=kafka:write-attribute(name=properties.events,value=${env.KAFKA_EVENTS}) + /subsystem=keycloak-server/spi=eventsListener/provider=kafka:map-put(name=properties,key=max.block.ms,value=10000) end-if stop-embedded-server diff --git a/images/event_config.png b/images/event_config.png new file mode 100644 index 0000000..94aa561 Binary files /dev/null and b/images/event_config.png differ diff --git a/pom.xml b/pom.xml index 9615fcb..6098557 100644 --- a/pom.xml +++ b/pom.xml @@ -4,13 +4,13 @@ 4.0.0 com.github.snuk87.keycloak keycloak-kafka - 1.0.0 + 1.1.0 11 11 - 10.0.1 - 2.5.0 + 14.0.0 + 2.8.0 diff --git a/remove-kafka-config.cli b/remove-kafka-config.cli new file mode 100644 index 0000000..582e1c3 --- /dev/null +++ b/remove-kafka-config.cli @@ -0,0 +1,6 @@ +embed-server --server-config=standalone.xml --std-out=echo + +if (outcome == success) of /subsystem=keycloak-server/spi=eventsListener:read-resource() + /subsystem=keycloak-server/spi=eventsListener:remove() +end-if +stop-embedded-server \ No newline at end of file diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java index 1f05e29..18d8a93 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProvider.java @@ -22,76 +22,76 @@ public class KafkaEventListenerProvider implements EventListenerProvider { - private static final Logger LOG = Logger.getLogger(KafkaEventListenerProvider.class); + private static final Logger LOG = Logger.getLogger(KafkaEventListenerProvider.class); - private String topicEvents; + private String topicEvents; - private List events; + private List events; - private String topicAdminEvents; + private String topicAdminEvents; - private Producer producer; + private Producer producer; - private ObjectMapper mapper; + private ObjectMapper mapper; - public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events, + public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events, String topicAdminEvents, Map kafkaProducerProperties) { - this.topicEvents = topicEvents; - this.events = new ArrayList<>(); - this.topicAdminEvents = topicAdminEvents; - - for (String event : events) { - try { - EventType eventType = EventType.valueOf(event.toUpperCase()); - this.events.add(eventType); - } catch (IllegalArgumentException e) { - LOG.debug("Ignoring event >" + event + "<. Event does not exist."); - } + this.topicEvents = topicEvents; + this.events = new ArrayList<>(); + this.topicAdminEvents = topicAdminEvents; + + for (String event : events) { + try { + EventType eventType = EventType.valueOf(event.toUpperCase()); + this.events.add(eventType); + } catch (IllegalArgumentException e) { + LOG.debug("Ignoring event >" + event + "<. Event does not exist."); + } + } + + producer = KafkaProducerFactory.createProducer(clientId, bootstrapServers, kafkaProducerProperties); + mapper = new ObjectMapper(); } - producer = KafkaProducerFactory.createProducer(clientId, bootstrapServers, kafkaProducerProperties); - mapper = new ObjectMapper(); - } - - private void produceEvent(String eventAsString, String topic) + private void produceEvent(String eventAsString, String topic) throws InterruptedException, ExecutionException, TimeoutException { - LOG.debug("Produce to topic: " + topicEvents + " ..."); - ProducerRecord record = new ProducerRecord<>(topic, eventAsString); - Future metaData = producer.send(record); - RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS); - LOG.debug("Produced to topic: " + recordMetadata.topic()); - } - - @Override - public void onEvent(Event event) { - if (events.contains(event.getType())) { - try { - produceEvent(mapper.writeValueAsString(event), topicEvents); - } catch (JsonProcessingException | ExecutionException | TimeoutException e) { - LOG.error(e.getMessage(), e); - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - Thread.currentThread().interrupt(); - } + LOG.debug("Produce to topic: " + topicEvents + " ..."); + ProducerRecord record = new ProducerRecord<>(topic, eventAsString); + Future metaData = producer.send(record); + RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS); + LOG.debug("Produced to topic: " + recordMetadata.topic()); } - } - - @Override - public void onEvent(AdminEvent event, boolean includeRepresentation) { - if (topicAdminEvents != null) { - try { - produceEvent(mapper.writeValueAsString(event), topicAdminEvents); - } catch (JsonProcessingException | ExecutionException | TimeoutException e) { - LOG.error(e.getMessage(), e); - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - Thread.currentThread().interrupt(); - } + + @Override + public void onEvent(Event event) { + if (events.contains(event.getType())) { + try { + produceEvent(mapper.writeValueAsString(event), topicEvents); + } catch (JsonProcessingException | ExecutionException | TimeoutException e) { + LOG.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); + Thread.currentThread().interrupt(); + } + } } - } - @Override - public void close() { - // ignore - } + @Override + public void onEvent(AdminEvent event, boolean includeRepresentation) { + if (topicAdminEvents != null) { + try { + produceEvent(mapper.writeValueAsString(event), topicAdminEvents); + } catch (JsonProcessingException | ExecutionException | TimeoutException e) { + LOG.error(e.getMessage(), e); + } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); + Thread.currentThread().interrupt(); + } + } + } + + @Override + public void close() { + // ignore + } } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java index ba6a752..e90474a 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderFactory.java @@ -11,74 +11,74 @@ public class KafkaEventListenerProviderFactory implements EventListenerProviderFactory { - private static final Logger LOG = Logger.getLogger(KafkaEventListenerProviderFactory.class); - private static final String ID = "kafka"; - - private KafkaEventListenerProvider instance; - - private String bootstrapServers; - private String topicEvents; - private String topicAdminEvents; - private String clientId; - private String[] events; - private Map kafkaProducerProperties; - - @Override - public EventListenerProvider create(KeycloakSession session) { - if (instance == null) { - instance = new KafkaEventListenerProvider(bootstrapServers, clientId, topicEvents, events, topicAdminEvents, - kafkaProducerProperties); + private static final Logger LOG = Logger.getLogger(KafkaEventListenerProviderFactory.class); + private static final String ID = "kafka"; + + private KafkaEventListenerProvider instance; + + private String bootstrapServers; + private String topicEvents; + private String topicAdminEvents; + private String clientId; + private String[] events; + private Map kafkaProducerProperties; + + @Override + public EventListenerProvider create(KeycloakSession session) { + if (instance == null) { + instance = new KafkaEventListenerProvider(bootstrapServers, clientId, topicEvents, events, topicAdminEvents, + kafkaProducerProperties); + } + + return instance; } - return instance; - } + @Override + public String getId() { + return ID; + } - @Override - public String getId() { - return ID; - } + @Override + public void init(Scope config) { + LOG.info("Init kafka module ..."); + topicEvents = config.get("topicEvents"); + clientId = config.get("clientId", "keycloak"); + bootstrapServers = config.get("bootstrapServers"); + topicAdminEvents = config.get("topicAdminEvents"); - @Override - public void init(Scope config) { - LOG.info("Init kafka module ..."); - topicEvents = config.get("topicEvents"); - clientId = config.get("clientId", "keycloak"); - bootstrapServers = config.get("bootstrapServers"); - topicAdminEvents = config.get("topicAdminEvents"); + String eventsString = config.get("events"); - String eventsString = config.get("events"); + if (eventsString != null) { + events = eventsString.split(","); + } - if (eventsString != null) { - events = eventsString.split(","); - } + if (topicEvents == null) { + throw new NullPointerException("topic must not be null."); + } - if (topicEvents == null) { - throw new NullPointerException("topic must not be null."); - } + if (clientId == null) { + throw new NullPointerException("clientId must not be null."); + } - if (clientId == null) { - throw new NullPointerException("clientId must not be null."); - } + if (bootstrapServers == null) { + throw new NullPointerException("bootstrapServers must not be null"); + } - if (bootstrapServers == null) { - throw new NullPointerException("bootstrapServers must not be null"); - } + if (events == null || events.length == 0) { + events = new String[1]; + events[0] = "REGISTER"; + } - if (events == null || events.length == 0) { - events = new String[1]; - events[0] = "REGISTER"; + kafkaProducerProperties = KafkaProducerConfig.init(config); } - kafkaProducerProperties = KafkaProducerConfig.init(config); - } - - @Override - public void postInit(KeycloakSessionFactory arg0) { - // ignore - } + @Override + public void postInit(KeycloakSessionFactory arg0) { + // ignore + } - @Override - public void close() { - // ignore - } + @Override + public void close() { + // ignore + } } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java index 77cc981..b7020dc 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java @@ -7,92 +7,92 @@ public class KafkaProducerConfig { - // https://kafka.apache.org/documentation/#producerconfigs + // https://kafka.apache.org/documentation/#producerconfigs - public static Map init(Scope scope) { - Map propertyMap = new HashMap<>(); - KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values(); + public static Map init(Scope scope) { + Map propertyMap = new HashMap<>(); + KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values(); - for (KafkaProducerProperty property : producerProperties) { - if (property.getName() != null) { - propertyMap.put(property.getName(), scope.get(property.getName())); - } - } + for (KafkaProducerProperty property : producerProperties) { + if (property.getName() != null && scope.get(property.getName()) != null) { + propertyMap.put(property.getName(), scope.get(property.getName())); + } + } - return propertyMap; - } + return propertyMap; + } - enum KafkaProducerProperty { - ACKS("acks"), // - BUFFER_MEMORY("buffer.memory"), // - COMPRESSION_TYPE("compression.type"), // - RETRIES("retries"), // - SSL_KEY_PASSWORD("ssl.key.password"), // - SSL_KEYSTORE_LOCATION("ssl.keystore.location"), // - SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), // - SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), // - SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), // - BATCH_SIZE("batch.size"), // - CLIENT_DNS_LOOKUP("client.dns.lookup"), // - CONNECTION_MAX_IDLE_MS("connections.max.idle.ms"), // - DELIVERY_TIMEOUT_MS("delivery.timeout.ms"), // - LINGER_MS("linger.ms"), // - MAX_BLOCK_MS("max.block.ms"), // - MAX_REQUEST_SIZE("max.request.size"), // - PARTITIONER_CLASS("partitioner.class"), // - RECEIVE_BUFFER_BYTES("receive.buffer.bytes"), // - REQUEST_TIMEOUT_MS("request.timeout.ms"), // - SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), // - SASL_JAAS_CONFIG("sasl.jaas.config"), // - SASL_KERBEROS_SERVICE_NAME("sasl.kerberos.service.name"), // - SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"), // - SASL_LOGIN_CLASS("sasl.login.class"), // - SASL_MECHANISM("sasl.mechanism"), // - SECURITY_PROTOCOL("security.protocol"), // - SEND_BUFFER_BYTES("send.buffer.bytes"), // - SSL_ENABLED_PROTOCOLS("ssl.enabled.protocols"), // - SSL_KEYSTORE_TYPE("ssl.keystore.type"), // - SSL_PROTOCOL("ssl.protocol"), // - SSL_PROVIDER("ssl.provider"), // - SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), // - ENABLE_IDEMPOTENCE("enable.idempotence"), // - INTERCEPTOR_CLASS("interceptor.classes"), // - MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION("max.in.flight.requests.per.connection"), // - METADATA_MAX_AGE_MS("metadata.max.age.ms"), // - METADATA_MAX_IDLE_MS("metadata.max.idle.ms"), // - METRIC_REPORTERS("metric.reporters"), // - METRIC_NUM_SAMPLES("metrics.num.samples"), // - METRICS_RECORDING_LEVEL("metrics.recording.level"), // - METRICS_SAMPLE_WINDOW_MS("metrics.sample.window.ms"), // - RECONNECT_BACKOFF_MAX_MS("reconnect.backoff.max.ms"), // - RECONNECT_BACKOFF_MS("reconnect.backoff.ms"), // - RETRY_BACKOFF_MS("retry.backoff.ms"), // - SASL_KERBEROS_KINIT_CMD("sasl.kerberos.kinit.cmd"), // - SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN("sasl.kerberos.min.time.before.relogin"), // - SASL_KERBEROS_TICKET_RENEW_JITTER("sasl.kerberos.ticket.renew.jitter"), // - SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR("sasl.kerberos.ticket.renew.window.factor"), // - SASL_LOGIN_REFRESH_BUFFER_SECONDS("sasl.login.refresh.buffer.seconds"), // - SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS("sasl.login.refresh.min.period.seconds"), // - SASL_LOGIN_REFRESH_WINDOW_FACTOR("sasl.login.refresh.window.factor"), // - SASL_LOGIN_REFRESH_WINDOW_JITTER("sasl.login.refresh.window.jitter"), // - SECURITY_PROVIDERS("security.providers"), // - SSL_CIPHER_SUITES("ssl.cipher.suites"), // - SSL_ENDPOINT_IDENTIFICATION_ALGORITHM("ssl.endpoint.identification.algorithm"), // - SSL_KEYMANAGER_ALGORITHM("ssl.keymanager.algorithm"), // - SSL_SECURE_RANDOM_IMPLEMENTATION("ssl.secure.random.implementation"), // - SSL_TRUSTMANAGER_ALGORITHM("ssl.trustmanager.algorithm"), // - TRANSACTION_TIMEOUT_MS("transaction.timeout.ms"), // - TRANSACTION_ID("transactional.id"); + enum KafkaProducerProperty { + ACKS("acks"), // + BUFFER_MEMORY("buffer.memory"), // + COMPRESSION_TYPE("compression.type"), // + RETRIES("retries"), // + SSL_KEY_PASSWORD("ssl.key.password"), // + SSL_KEYSTORE_LOCATION("ssl.keystore.location"), // + SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), // + SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), // + SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), // + BATCH_SIZE("batch.size"), // + CLIENT_DNS_LOOKUP("client.dns.lookup"), // + CONNECTION_MAX_IDLE_MS("connections.max.idle.ms"), // + DELIVERY_TIMEOUT_MS("delivery.timeout.ms"), // + LINGER_MS("linger.ms"), // + MAX_BLOCK_MS("max.block.ms"), // + MAX_REQUEST_SIZE("max.request.size"), // + PARTITIONER_CLASS("partitioner.class"), // + RECEIVE_BUFFER_BYTES("receive.buffer.bytes"), // + REQUEST_TIMEOUT_MS("request.timeout.ms"), // + SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), // + SASL_JAAS_CONFIG("sasl.jaas.config"), // + SASL_KERBEROS_SERVICE_NAME("sasl.kerberos.service.name"), // + SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"), // + SASL_LOGIN_CLASS("sasl.login.class"), // + SASL_MECHANISM("sasl.mechanism"), // + SECURITY_PROTOCOL("security.protocol"), // + SEND_BUFFER_BYTES("send.buffer.bytes"), // + SSL_ENABLED_PROTOCOLS("ssl.enabled.protocols"), // + SSL_KEYSTORE_TYPE("ssl.keystore.type"), // + SSL_PROTOCOL("ssl.protocol"), // + SSL_PROVIDER("ssl.provider"), // + SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), // + ENABLE_IDEMPOTENCE("enable.idempotence"), // + INTERCEPTOR_CLASS("interceptor.classes"), // + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION("max.in.flight.requests.per.connection"), // + METADATA_MAX_AGE_MS("metadata.max.age.ms"), // + METADATA_MAX_IDLE_MS("metadata.max.idle.ms"), // + METRIC_REPORTERS("metric.reporters"), // + METRIC_NUM_SAMPLES("metrics.num.samples"), // + METRICS_RECORDING_LEVEL("metrics.recording.level"), // + METRICS_SAMPLE_WINDOW_MS("metrics.sample.window.ms"), // + RECONNECT_BACKOFF_MAX_MS("reconnect.backoff.max.ms"), // + RECONNECT_BACKOFF_MS("reconnect.backoff.ms"), // + RETRY_BACKOFF_MS("retry.backoff.ms"), // + SASL_KERBEROS_KINIT_CMD("sasl.kerberos.kinit.cmd"), // + SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN("sasl.kerberos.min.time.before.relogin"), // + SASL_KERBEROS_TICKET_RENEW_JITTER("sasl.kerberos.ticket.renew.jitter"), // + SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR("sasl.kerberos.ticket.renew.window.factor"), // + SASL_LOGIN_REFRESH_BUFFER_SECONDS("sasl.login.refresh.buffer.seconds"), // + SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS("sasl.login.refresh.min.period.seconds"), // + SASL_LOGIN_REFRESH_WINDOW_FACTOR("sasl.login.refresh.window.factor"), // + SASL_LOGIN_REFRESH_WINDOW_JITTER("sasl.login.refresh.window.jitter"), // + SECURITY_PROVIDERS("security.providers"), // + SSL_CIPHER_SUITES("ssl.cipher.suites"), // + SSL_ENDPOINT_IDENTIFICATION_ALGORITHM("ssl.endpoint.identification.algorithm"), // + SSL_KEYMANAGER_ALGORITHM("ssl.keymanager.algorithm"), // + SSL_SECURE_RANDOM_IMPLEMENTATION("ssl.secure.random.implementation"), // + SSL_TRUSTMANAGER_ALGORITHM("ssl.trustmanager.algorithm"), // + TRANSACTION_TIMEOUT_MS("transaction.timeout.ms"), // + TRANSACTION_ID("transactional.id"); - private String name; + private String name; - private KafkaProducerProperty(String name) { - this.name = name; - } + private KafkaProducerProperty(String name) { + this.name = name; + } - public String getName() { - return name; + public String getName() { + return name; + } } - } } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java index 5cd35a8..a483d43 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java @@ -10,24 +10,24 @@ public final class KafkaProducerFactory { - private KafkaProducerFactory() { + private KafkaProducerFactory() { - } + } - public static Producer createProducer(String clientId, String bootstrapServer, + public static Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties) { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - optionalProperties.forEach(props::put); + optionalProperties.forEach(props::put); - // fix Class org.apache.kafka.common.serialization.StringSerializer could not be - // found. see https://stackoverflow.com/a/50981469 - Thread.currentThread().setContextClassLoader(null); + // fix Class org.apache.kafka.common.serialization.StringSerializer could not be + // found. see https://stackoverflow.com/a/50981469 + Thread.currentThread().setContextClassLoader(null); - return new KafkaProducer<>(props); - } + return new KafkaProducer<>(props); + } }