Skip to content

Commit

Permalink
add support for generic kafka producer configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
SnuK87 committed Jun 23, 2021
1 parent e48ffca commit 3911555
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -19,75 +22,76 @@

public class KafkaEventListenerProvider implements EventListenerProvider {

private static final Logger LOG = Logger.getLogger(KafkaEventListenerProvider.class);
private static final Logger LOG = Logger.getLogger(KafkaEventListenerProvider.class);

private String topicEvents;
private String topicEvents;

private List<EventType> events;
private List<EventType> events;

private String topicAdminEvents;
private String topicAdminEvents;

private Producer<String, String> producer;
private Producer<String, String> producer;

private ObjectMapper mapper;
private ObjectMapper mapper;

public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events,
String topicAdminEvents) {
this.topicEvents = topicEvents;
this.events = new ArrayList<>();
this.topicAdminEvents = topicAdminEvents;
public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events,
String topicAdminEvents, Map<String, Object> kafkaProducerProperties) {
this.topicEvents = topicEvents;
this.events = new ArrayList<>();
this.topicAdminEvents = topicAdminEvents;

for (int i = 0; i < events.length; i++) {
try {
EventType eventType = EventType.valueOf(events[i].toUpperCase());
this.events.add(eventType);
} catch (IllegalArgumentException e) {
LOG.debug("Ignoring event >" + events[i] + "<. Event does not exist.");
}
}

producer = KafkaProducerFactory.createProducer(clientId, bootstrapServers);
mapper = new ObjectMapper();
for (String event : events) {
try {
EventType eventType = EventType.valueOf(event.toUpperCase());
this.events.add(eventType);
} catch (IllegalArgumentException e) {
LOG.debug("Ignoring event >" + event + "<. Event does not exist.");
}
}

private void produceEvent(String eventAsString, String topic) throws InterruptedException, ExecutionException {
LOG.debug("Produce to topic: " + topicEvents + " ...");
ProducerRecord<String, String> record = new ProducerRecord<>(topic, eventAsString);
Future<RecordMetadata> metaData = producer.send(record);
RecordMetadata recordMetadata = metaData.get();
LOG.debug("Produced to topic: " + recordMetadata.topic());
producer = KafkaProducerFactory.createProducer(clientId, bootstrapServers, kafkaProducerProperties);
mapper = new ObjectMapper();
}

private void produceEvent(String eventAsString, String topic)
throws InterruptedException, ExecutionException, TimeoutException {
LOG.debug("Produce to topic: " + topicEvents + " ...");
ProducerRecord<String, String> record = new ProducerRecord<>(topic, eventAsString);
Future<RecordMetadata> metaData = producer.send(record);
RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS);
LOG.debug("Produced to topic: " + recordMetadata.topic());
}

@Override
public void onEvent(Event event) {
if (events.contains(event.getType())) {
try {
produceEvent(mapper.writeValueAsString(event), topicEvents);
} catch (JsonProcessingException | ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}

@Override
public void onEvent(Event event) {
if (events.contains(event.getType())) {
try {
produceEvent(mapper.writeValueAsString(event), topicEvents);
} catch (JsonProcessingException | ExecutionException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}

@Override
public void onEvent(AdminEvent event, boolean includeRepresentation) {
if (topicAdminEvents != null) {
try {
produceEvent(mapper.writeValueAsString(event), topicAdminEvents);
} catch (JsonProcessingException | ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}

@Override
public void onEvent(AdminEvent event, boolean includeRepresentation) {
if (topicAdminEvents != null) {
try {
produceEvent(mapper.writeValueAsString(event), topicAdminEvents);
} catch (JsonProcessingException | ExecutionException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
}

@Override
public void close() {
// ignore
}
@Override
public void close() {
// ignore
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.github.snuk87.keycloak.kafka;

import java.util.Map;

import org.jboss.logging.Logger;
import org.keycloak.Config.Scope;
import org.keycloak.events.EventListenerProvider;
Expand All @@ -9,71 +11,74 @@

public class KafkaEventListenerProviderFactory implements EventListenerProviderFactory {

private static final Logger LOG = Logger.getLogger(KafkaEventListenerProviderFactory.class);
private static final String ID = "kafka";

private KafkaEventListenerProvider instance;
private static final Logger LOG = Logger.getLogger(KafkaEventListenerProviderFactory.class);
private static final String ID = "kafka";

private String bootstrapServers;
private String topicEvents;
private String topicAdminEvents;
private String clientId;
private String[] events;
private KafkaEventListenerProvider instance;

@Override
public EventListenerProvider create(KeycloakSession session) {
if (instance == null) {
instance = new KafkaEventListenerProvider(bootstrapServers, clientId, topicEvents, events,
topicAdminEvents);
}
private String bootstrapServers;
private String topicEvents;
private String topicAdminEvents;
private String clientId;
private String[] events;
private Map<String, Object> kafkaProducerProperties;

return instance;
@Override
public EventListenerProvider create(KeycloakSession session) {
if (instance == null) {
instance = new KafkaEventListenerProvider(bootstrapServers, clientId, topicEvents, events, topicAdminEvents,
kafkaProducerProperties);
}

@Override
public String getId() {
return ID;
}
return instance;
}

@Override
public void init(Scope config) {
LOG.info("Init kafka module ...");
topicEvents = config.get("topicEvents");
clientId = config.get("clientId", "keycloak");
bootstrapServers = config.get("bootstrapServers");
topicAdminEvents = config.get("topicAdminEvents");
@Override
public String getId() {
return ID;
}

String eventsString = config.get("events");
@Override
public void init(Scope config) {
LOG.info("Init kafka module ...");
topicEvents = config.get("topicEvents");
clientId = config.get("clientId", "keycloak");
bootstrapServers = config.get("bootstrapServers");
topicAdminEvents = config.get("topicAdminEvents");

if (eventsString != null) {
events = eventsString.split(",");
}
String eventsString = config.get("events");

if (topicEvents == null) {
throw new NullPointerException("topic must not be null.");
}

if (clientId == null) {
throw new NullPointerException("clientId must not be null.");
}
if (eventsString != null) {
events = eventsString.split(",");
}

if (bootstrapServers == null) {
throw new NullPointerException("bootstrapServers must not be null");
}
if (topicEvents == null) {
throw new NullPointerException("topic must not be null.");
}

if (events == null || events.length == 0) {
events = new String[1];
events[0] = "REGISTER";
}
if (clientId == null) {
throw new NullPointerException("clientId must not be null.");
}

@Override
public void postInit(KeycloakSessionFactory arg0) {
// ignore
if (bootstrapServers == null) {
throw new NullPointerException("bootstrapServers must not be null");
}

@Override
public void close() {
// ignore
if (events == null || events.length == 0) {
events = new String[1];
events[0] = "REGISTER";
}

kafkaProducerProperties = KafkaProducerConfig.init(config);
}

@Override
public void postInit(KeycloakSessionFactory arg0) {
// ignore
}

@Override
public void close() {
// ignore
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.github.snuk87.keycloak.kafka;

import java.util.HashMap;
import java.util.Map;

import org.keycloak.Config.Scope;

public class KafkaProducerConfig {

// https://kafka.apache.org/documentation/#producerconfigs

public static Map<String, Object> init(Scope scope) {
Map<String, Object> propertyMap = new HashMap<>();
KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();

for (KafkaProducerProperty property : producerProperties) {
if (property.getName() != null) {
propertyMap.put(property.getName(), scope.get(property.getName()));
}
}

return propertyMap;
}

enum KafkaProducerProperty {
ACKS("acks"), //
BUFFER_MEMORY("buffer.memory"), //
COMPRESSION_TYPE("compression.type"), //
RETRIES("retries"), //
SSL_KEY_PASSWORD("ssl.key.password"), //
SSL_KEYSTORE_LOCATION("ssl.keystore.location"), //
SSL_KEYSTORE_PASSWORD("ssl.keystore.password"), //
SSL_TRUSTSTORE_LOCATION("ssl.truststore.location"), //
SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password"), //
BATCH_SIZE("batch.size"), //
CLIENT_DNS_LOOKUP("client.dns.lookup"), //
CONNECTION_MAX_IDLE_MS("connections.max.idle.ms"), //
DELIVERY_TIMEOUT_MS("delivery.timeout.ms"), //
LINGER_MS("linger.ms"), //
MAX_BLOCK_MS("max.block.ms"), //
MAX_REQUEST_SIZE("max.request.size"), //
PARTITIONER_CLASS("partitioner.class"), //
RECEIVE_BUFFER_BYTES("receive.buffer.bytes"), //
REQUEST_TIMEOUT_MS("request.timeout.ms"), //
SASL_CLIENT_CALLBACK_HANDLER_CLASS("sasl.client.callback.handler.class"), //
SASL_JAAS_CONFIG("sasl.jaas.config"), //
SASL_KERBEROS_SERVICE_NAME("sasl.kerberos.service.name"), //
SASL_LOGIN_CALLBACK_HANDLER_CLASS("sasl.login.callback.handler.class"), //
SASL_LOGIN_CLASS("sasl.login.class"), //
SASL_MECHANISM("sasl.mechanism"), //
SECURITY_PROTOCOL("security.protocol"), //
SEND_BUFFER_BYTES("send.buffer.bytes"), //
SSL_ENABLED_PROTOCOLS("ssl.enabled.protocols"), //
SSL_KEYSTORE_TYPE("ssl.keystore.type"), //
SSL_PROTOCOL("ssl.protocol"), //
SSL_PROVIDER("ssl.provider"), //
SSL_TRUSTSTORE_TYPE("ssl.truststore.type"), //
ENABLE_IDEMPOTENCE("enable.idempotence"), //
INTERCEPTOR_CLASS("interceptor.classes"), //
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION("max.in.flight.requests.per.connection"), //
METADATA_MAX_AGE_MS("metadata.max.age.ms"), //
METADATA_MAX_IDLE_MS("metadata.max.idle.ms"), //
METRIC_REPORTERS("metric.reporters"), //
METRIC_NUM_SAMPLES("metrics.num.samples"), //
METRICS_RECORDING_LEVEL("metrics.recording.level"), //
METRICS_SAMPLE_WINDOW_MS("metrics.sample.window.ms"), //
RECONNECT_BACKOFF_MAX_MS("reconnect.backoff.max.ms"), //
RECONNECT_BACKOFF_MS("reconnect.backoff.ms"), //
RETRY_BACKOFF_MS("retry.backoff.ms"), //
SASL_KERBEROS_KINIT_CMD("sasl.kerberos.kinit.cmd"), //
SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN("sasl.kerberos.min.time.before.relogin"), //
SASL_KERBEROS_TICKET_RENEW_JITTER("sasl.kerberos.ticket.renew.jitter"), //
SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR("sasl.kerberos.ticket.renew.window.factor"), //
SASL_LOGIN_REFRESH_BUFFER_SECONDS("sasl.login.refresh.buffer.seconds"), //
SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS("sasl.login.refresh.min.period.seconds"), //
SASL_LOGIN_REFRESH_WINDOW_FACTOR("sasl.login.refresh.window.factor"), //
SASL_LOGIN_REFRESH_WINDOW_JITTER("sasl.login.refresh.window.jitter"), //
SECURITY_PROVIDERS("security.providers"), //
SSL_CIPHER_SUITES("ssl.cipher.suites"), //
SSL_ENDPOINT_IDENTIFICATION_ALGORITHM("ssl.endpoint.identification.algorithm"), //
SSL_KEYMANAGER_ALGORITHM("ssl.keymanager.algorithm"), //
SSL_SECURE_RANDOM_IMPLEMENTATION("ssl.secure.random.implementation"), //
SSL_TRUSTMANAGER_ALGORITHM("ssl.trustmanager.algorithm"), //
TRANSACTION_TIMEOUT_MS("transaction.timeout.ms"), //
TRANSACTION_ID("transactional.id");

private String name;

private KafkaProducerProperty(String name) {
this.name = name;
}

public String getName() {
return name;
}
}

}
Loading

0 comments on commit 3911555

Please sign in to comment.