Skip to content

Commit

Permalink
Backport #511: properly name netty threads (#512)
Browse files Browse the repository at this point in the history
Co-authored-by: Mashhur <[email protected]>
  • Loading branch information
jsvd and mashhurs authored Feb 4, 2025
1 parent 5941a0f commit 176dcaf
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 6.9.2
- Name netty threads according to their purpose and the plugin id [#511](https://github.com/logstash-plugins/logstash-input-beats/pull/511)

## 6.9.1
- Upgrade netty to 4.1.115 [#507](https://github.com/logstash-plugins/logstash-input-beats/pull/507)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.9.1
6.9.2
2 changes: 1 addition & 1 deletion lib/logstash/inputs/beats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def register
end # def register

def create_server
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads)
server = org.logstash.beats.Server.new(@id, @host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads)
server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled
server
end
Expand Down
6 changes: 3 additions & 3 deletions spec/inputs/beats_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
let(:port) { 9001 }

it "sends the required options to the server" do
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, event_loop_threads, executor_threads)
expect(org.logstash.beats.Server).to receive(:new).with(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads)
subject.register
end
end
Expand Down Expand Up @@ -531,8 +531,8 @@
subject(:plugin) { LogStash::Inputs::Beats.new(config) }

before do
@server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, event_loop_threads, executor_threads)
expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, event_loop_threads, executor_threads).and_return @server
@server = org.logstash.beats.Server.new(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads)
expect( org.logstash.beats.Server ).to receive(:new).with(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads).and_return @server
expect( @server ).to receive(:listen)

subject.register
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/logstash/beats/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ static public void main(String[] args) throws Exception {
// Check for leaks.
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);

Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, 0, Runtime.getRuntime().availableProcessors());
Server server = new Server("test", "0.0.0.0", DEFAULT_PORT, 15, 0, Runtime.getRuntime().availableProcessors());

if(args.length > 0 && args[0].equals("ssl")) {
logger.debug("Using SSL");
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
import org.apache.logging.log4j.Logger;
import org.logstash.netty.SslHandlerProvider;

import static org.logstash.beats.util.DaemonThreadFactory.daemonThreadFactory;

public class Server {
private final static Logger logger = LogManager.getLogger(Server.class);

private final int port;

private final String id;
private final String host;
private final int eventLoopThreadCount;
private final int executorThreadCount;
Expand All @@ -33,7 +37,8 @@ public class Server {

private final int clientInactivityTimeoutSeconds;

public Server(String host, int port, int clientInactivityTimeoutSeconds, int eventLoopThreadCount, int executorThreadCount) {
public Server(String id, String host, int port, int clientInactivityTimeoutSeconds, int eventLoopThreadCount, int executorThreadCount) {
this.id = id;
this.host = host;
this.port = port;
this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
Expand All @@ -54,12 +59,12 @@ public Server listen() throws InterruptedException {
logger.error("Could not shut down worker group before starting", e);
}
}
bossGroup = new NioEventLoopGroup(eventLoopThreadCount); // TODO: add a config to make it adjustable, no need many threads
workGroup = new NioEventLoopGroup(eventLoopThreadCount);
bossGroup = new NioEventLoopGroup(eventLoopThreadCount, daemonThreadFactory(id + "-bossGroup")); // TODO: add a config to make it adjustable, no need many threads
workGroup = new NioEventLoopGroup(eventLoopThreadCount, daemonThreadFactory(id + "-workGroup"));
try {
logger.info("Starting server on port: {}", this.port);

beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount);
beatsInitializer = new BeatsInitializer(id, messageListener, clientInactivityTimeoutSeconds, executorThreadCount);

ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workGroup)
Expand Down Expand Up @@ -143,12 +148,14 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
private final IMessageListener localMessageListener;
private final int localClientInactivityTimeoutSeconds;

BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) {
BeatsInitializer(String pluginId, IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThreadCount) {
// Keeps a local copy of Server settings, so they can't be modified once it starts listening
this.localMessageListener = messageListener;
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD,
daemonThreadFactory(pluginId + "-idleStateHandler"));
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThreadCount,
daemonThreadFactory(pluginId + "-beatsHandler"));
}

public void initChannel(SocketChannel socket) {
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/org/logstash/beats/util/DaemonThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.logstash.beats.util;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class DaemonThreadFactory implements ThreadFactory {

final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

DaemonThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
group = Thread.currentThread().getThreadGroup();
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
0);
t.setDaemon(true);
return t;
}

public static ThreadFactory daemonThreadFactory(String namePrefix) {
return new DaemonThreadFactory(namePrefix);
}

}
6 changes: 3 additions & 3 deletions src/test/java/org/logstash/beats/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte

final CountDownLatch latch = new CountDownLatch(concurrentConnections);

final Server server = new Server(host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount);
final Server server = new Server("testServer", host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount);

final AtomicBoolean otherCause = new AtomicBoolean(false);

Expand Down Expand Up @@ -118,7 +118,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt

final CountDownLatch latch = new CountDownLatch(concurrentConnections);
final AtomicBoolean exceptionClose = new AtomicBoolean(false);
final Server server = new Server(host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount);
final Server server = new Server("testServer", host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount);
server.setMessageListener(new MessageListener() {
@Override
public void onNewConnection(ChannelHandlerContext ctx) {
Expand Down Expand Up @@ -174,7 +174,7 @@ public void run() {

@Test
public void testServerShouldAcceptConcurrentConnection() throws InterruptedException {
final Server server = new Server(host, randomPort, 30, eventLoopThreadCount, executorThreadCount);
final Server server = new Server("testServer", host, randomPort, 30, eventLoopThreadCount, executorThreadCount);
SpyListener listener = new SpyListener();
server.setMessageListener(listener);
Runnable serverTask = new Runnable() {
Expand Down

0 comments on commit 176dcaf

Please sign in to comment.