This a Flink Connector (Source and Sink) for RabbitMQ Streams.
The standard connector
builds on the RabbitMQ standard library and enforces auto acknowledgements.
This connector uses the RabbitMQ Stream java client.
See "develop"-branch for new updates.
// Port is 5552 for the streams module
final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("rabbitmq")
.setPort(5552)
.setUserName("xxx")
.setPassword("xxx")
.build();
final RMQStreamsConfig streamsConfig = new RMQStreamsConfig(connectionConfig, "name of your events stream");
// we publish CloudEvents in our stream
final RMQStreamSource<CloudEvent> source = new RMQStreamSource<CloudEvent>(streamsConfig, new CloudEventSchema());
SingleOutputStreamOperator<CloudEvent> incoming = env.addSource(source)
.returns(Types.GENERIC(CloudEvent.class));
// you can filter the incoming stream by event type
incoming.filter(new EventFilter(Set.of("my.event.type")))
.map( ...