Skip to content

Commit

Permalink
improve rabbit error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
burdoto committed Mar 29, 2024
1 parent 3b445ee commit f5fe8c7
Showing 1 changed file with 32 additions and 29 deletions.
61 changes: 32 additions & 29 deletions src/main/java/org/comroid/api/net/Rabbit.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.comroid.api.net;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Delivery;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.experimental.NonFinal;
Expand All @@ -10,30 +13,27 @@
import org.comroid.api.data.seri.DataNode;
import org.comroid.api.data.seri.adp.JSON;
import org.comroid.api.func.exc.ThrowingFunction;
import org.comroid.api.func.exc.ThrowingSupplier;
import org.comroid.api.func.ext.Wrap;
import org.comroid.api.func.util.Event;
import org.comroid.api.java.Activator;
import org.comroid.api.java.SoftDepend;
import org.jetbrains.annotations.Nullable;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import static java.util.Collections.unmodifiableMap;
import static org.comroid.api.Polyfill.uncheckedCast;
import static org.comroid.api.func.exc.ThrowingSupplier.fallback;

@Log
@Value
public class Rabbit {
private static final ConnectionFactory factory = new ConnectionFactory();
private static final Map<URI, Rabbit> $cache = new ConcurrentHashMap<>();
public static final Map<URI, Rabbit> CACHE = unmodifiableMap($cache);

Expand All @@ -45,26 +45,24 @@ public static Wrap<Rabbit> of(@Nullable String uri) {
}

URI uri;
Connection connection;
@NonFinal Connection connection;
Map<String, Exchange> exchanges = new ConcurrentHashMap<>();

@SneakyThrows
private Rabbit(URI uri) {
this.uri = uri;
var connFactory = new ConnectionFactory();
connFactory.setUri(uri);
/*
Wrap.of(fallback(SSLContext::getDefault))
.ifPresent(fallback ->
connFactory.setSslContextFactory(name -> {
try {
return SSLContext.getInstance(name);
} catch (Throwable t) {
return fallback;
}
}));
*/
this.connection = connFactory.newConnection();
this.connection = touch();
}

@SneakyThrows
private synchronized Connection touch() {
if (connection != null && connection.isOpen())
return connection;
if (connection != null) try {
connection.close();
} catch (Throwable ignored) {
}
factory.setUri(uri);
return factory.newConnection();
}

public Exchange exchange(String exchange) {
Expand All @@ -88,7 +86,7 @@ private Exchange(String exchange) {
}

@SneakyThrows
private Channel touch() {
private synchronized Channel touch() {
if (channel != null) {
if (channel.isOpen())
return channel;
Expand All @@ -97,6 +95,7 @@ private Channel touch() {
} catch (Throwable ignored) {
}
}
connection = Rabbit.this.touch();
channel = connection.createChannel();
channel.exchangeDeclare(exchange, "topic");
return channel;
Expand Down Expand Up @@ -125,7 +124,7 @@ public void run() {
}

@SneakyThrows
public Channel touch() {
public synchronized Channel touch() {
var channel = Exchange.this.touch();
if (tag == null) {
var queue = Exchange.this.touch().queueDeclare().getQueue();
Expand All @@ -138,11 +137,15 @@ public Channel touch() {

@SneakyThrows
public void send(T data) {
var body = SoftDepend.type("com.fasterxml.jackson.databind.ObjectMapper")
.map(ThrowingFunction.logging(log, $$ -> new ObjectMapper().writeValueAsString(data)))
.or(data::toSerializedString)
.assertion();
touch().basicPublish(exchange, routingKey, null, body.getBytes());
try {
var body = SoftDepend.type("com.fasterxml.jackson.databind.ObjectMapper")
.map(ThrowingFunction.logging(log, $$ -> new ObjectMapper().writeValueAsString(data)))
.or(data::toSerializedString)
.assertion();
touch().basicPublish(exchange, routingKey, null, body.getBytes());
} catch (Throwable t) {
log.log(Level.FINE, "Could not send data to rabbit", t);
}
}

private void handleRabbitData(String $, Delivery content) {
Expand Down

0 comments on commit f5fe8c7

Please sign in to comment.