Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAMEL-20297: do not swallow interrupted exceptions #12794

Merged
merged 10 commits into from
Jan 15, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ public Exchange receive(long timeout) {
Future<Exchange> future = executorService.submit((Callable<Exchange>) this::receive);
try {
return future.get(timeout, TimeUnit.MILLISECONDS);
} catch (ExecutionException | InterruptedException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw RuntimeCamelException.wrapRuntimeCamelException(e);
} catch (ExecutionException e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
} catch (TimeoutException e) {
// ignore as we hit timeout then return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void process(Exchange exchange) throws Exception {
} else {
throw new Jt400PgmCallException(getOutputMessages(pgmCall));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new Jt400PgmCallException(e);
} catch (Exception e) {
throw new Jt400PgmCallException(e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ private void refreshStatusBecomingLeader() {
Thread.sleep(delay);
} catch (InterruptedException e) {
LOG.warn("Thread interrupted", e);
Thread.currentThread().interrupt();
}

LOG.info("{} Current pod is becoming the new leader now...", logPrefix);
Expand All @@ -238,6 +239,7 @@ private void refreshStatusLosingLeadership() {
Thread.sleep(delay);
} catch (InterruptedException e) {
LOG.warn("Thread interrupted", e);
Thread.currentThread().interrupt();
}

LOG.info("{} Current pod is losing leadership now...", logPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void stop() {
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// ignore
LOG.info("Interrupted while waiting for thread termination");
Thread.currentThread().interrupt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ public void run() {
if (cursorRegenerationDelayEnabled) {
try {
Thread.sleep(cursorRegenerationDelay);
} catch (InterruptedException ignored) {
} catch (InterruptedException e) {
log.info("Interrupted while waiting for the cursor regeneration");
Thread.currentThread().interrupt();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ public void operationComplete(ChannelFuture channelFuture) throws Exception {
// but we can try to get a result with a 0 timeout, then netty will throw the caused
// exception wrapped in an outer exception
channelFuture.get(0, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
cause = e.getCause();
} catch (Exception e) {
cause = e.getCause();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public void onCanceled() {
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeCamelException(e.getMessage(), e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public void onCanceled() {
} else {
return ApiConsumerHelper.getResultsProcessed(this, result[0], isSplitResult());
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw RuntimeCamelException.wrapRuntimeCamelException(e);
} catch (Exception t) {
throw RuntimeCamelException.wrapRuntimeCamelException(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void onCanceled() {
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeCamelException(e.getMessage(), e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public void onCanceled() {
} else {
return ApiConsumerHelper.getResultsProcessed(this, result[0], isSplitResult());
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw RuntimeCamelException.wrapRuntimeCamelException(e);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public boolean process(Exchange exchange, AsyncCallback callback) {

// wait for result
populateResult(exchange, solverJob);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
exchange.setException(e);
} catch (Exception e) {
exchange.setException(e);
} finally {
Expand All @@ -103,6 +106,9 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
// synchronous or wrong type of body
callback.done(true);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
exchange.setException(e);
} catch (Exception e) {
exchange.setException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ protected void startServer() throws Exception {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
Expand Down Expand Up @@ -269,6 +270,7 @@ protected void stopServer() {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
Expand Down Expand Up @@ -306,6 +308,7 @@ protected void stopVertx() {
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ public void process(Exchange exchange) throws Exception {
} else {
throw new Exception("Body does not contain Stanza/Stanza[] object(s)");
}
} catch (XMPPException xmppe) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeExchangeException(
"Cannot send XMPP direct: from " + endpoint.getUser() + " to: "
"Interrupted while sending XMPP direct: from " + endpoint.getUser() + " to: "
+ XmppEndpoint.getConnectionMessage(connection),
exchange, xmppe);
exchange, e);

} catch (Exception e) {
throw new RuntimeExchangeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public void process(Exchange exchange) {
if (connection == null) {
try {
connection = endpoint.createConnection();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeExchangeException("Interrupted while connecting to XMPP server.", exchange, e);
} catch (Exception e) {
throw new RuntimeExchangeException("Could not connect to XMPP server.", exchange, e);
}
Expand All @@ -60,6 +63,9 @@ public void process(Exchange exchange) {
if (chat == null) {
try {
initializeChat();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeExchangeException("Interrupted while initializing XMPP chat.", exchange, e);
} catch (Exception e) {
throw new RuntimeExchangeException("Could not initialize XMPP chat.", exchange, e);
}
Expand All @@ -84,6 +90,9 @@ public void process(Exchange exchange) {
// must invoke nextMessage to consume the response from the server
// otherwise the client local queue will fill up (CAMEL-1467)
chat.pollMessage();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeExchangeException("Interrupted while sending XMPP message: " + message, exchange, e);
} catch (Exception e) {
throw new RuntimeExchangeException("Could not send XMPP message: " + message, exchange, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public void process(Exchange exchange) {
if (!connection.isConnected()) {
this.reconnect();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeCamelException("Interrupted while connecting to XMPP server.", e);
} catch (Exception e) {
throw new RuntimeCamelException("Could not connect to XMPP server.", e);
}
Expand Down Expand Up @@ -88,6 +91,12 @@ public void process(Exchange exchange) {
LOG.debug("Sending XMPP message to {} from {} : {}", participant, endpoint.getUser(), message.getBody());
}
chat.send(message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeExchangeException(
"Interrupted while sending XMPP message to " + participant + " from " + endpoint.getUser() + " : " + message
+ " to: " + XmppEndpoint.getConnectionMessage(connection),
exchange, e);
} catch (Exception e) {
throw new RuntimeExchangeException(
"Could not send XMPP message to " + participant + " from " + endpoint.getUser() + " : " + message
Expand Down