Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

properly name netty threads #511

Merged
merged 7 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 7.0.1
- Name netty threads according to their purpose and the plugin id [#511](https://github.com/logstash-plugins/logstash-input-beats/pull/511)

## 7.0.0
- Remove deprecated SSL settings
- SSL settings that were marked deprecated in version `6.6.0` are now marked obsolete, and will prevent the plugin from starting.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7.0.0
7.0.1
2 changes: 1 addition & 1 deletion lib/logstash/inputs/beats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def register
end # def register

def create_server
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads)
server = org.logstash.beats.Server.new(@id, @host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads)
server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled
server
end
Expand Down
6 changes: 3 additions & 3 deletions spec/inputs/beats_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
let(:port) { 9001 }

it "sends the required options to the server" do
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, event_loop_threads, executor_threads)
expect(org.logstash.beats.Server).to receive(:new).with(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads)
subject.register
end
end
Expand Down Expand Up @@ -386,8 +386,8 @@
subject(:plugin) { LogStash::Inputs::Beats.new(config) }

before do
@server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, event_loop_threads, executor_threads)
expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, event_loop_threads, executor_threads).and_return @server
@server = org.logstash.beats.Server.new(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads)
expect( org.logstash.beats.Server ).to receive(:new).with(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads).and_return @server
expect( @server ).to receive(:listen)

subject.register
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/filebeat_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
before :each do
FileUtils.rm_rf(File.join(File.dirname(__FILE__), "..", "..", "vendor", "filebeat", "data"))
start_client
raise 'Filebeat did not start in allocated time' unless is_alive
raise "Filebeat did not start in allocated time, due to: #{@execution_output}" unless is_alive
sleep(20) # give some time to FB to send something
end

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/logstash/beats/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ static public void main(String[] args) throws Exception {
// Check for leaks.
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);

Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, 0, Runtime.getRuntime().availableProcessors());
Server server = new Server("test", "0.0.0.0", DEFAULT_PORT, 15, 0, Runtime.getRuntime().availableProcessors());

if(args.length > 0 && args[0].equals("ssl")) {
logger.debug("Using SSL");
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
import org.apache.logging.log4j.Logger;
import org.logstash.netty.SslHandlerProvider;

import static org.logstash.beats.util.DaemonThreadFactory.daemonThreadFactory;
jsvd marked this conversation as resolved.
Show resolved Hide resolved

public class Server {
private final static Logger logger = LogManager.getLogger(Server.class);

private final int port;

private final String id;
private final String host;
private final int eventLoopThreadCount;
private final int executorThreadCount;
Expand All @@ -33,7 +37,8 @@ public class Server {

private final int clientInactivityTimeoutSeconds;

public Server(String host, int port, int clientInactivityTimeoutSeconds, int eventLoopThreadCount, int executorThreadCount) {
public Server(String id, String host, int port, int clientInactivityTimeoutSeconds, int eventLoopThreadCount, int executorThreadCount) {
this.id = id;
this.host = host;
this.port = port;
this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
Expand All @@ -54,12 +59,12 @@ public Server listen() throws InterruptedException {
logger.error("Could not shut down worker group before starting", e);
}
}
bossGroup = new NioEventLoopGroup(eventLoopThreadCount); // TODO: add a config to make it adjustable, no need many threads
workGroup = new NioEventLoopGroup(eventLoopThreadCount);
bossGroup = new NioEventLoopGroup(eventLoopThreadCount, daemonThreadFactory(id + "-bossGroup")); // TODO: add a config to make it adjustable, no need many threads
workGroup = new NioEventLoopGroup(eventLoopThreadCount, daemonThreadFactory(id + "-workGroup"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this is helpful for investigation.

try {
logger.info("Starting server on port: {}", this.port);

beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount);
beatsInitializer = new BeatsInitializer(id, messageListener, clientInactivityTimeoutSeconds, executorThreadCount);

ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workGroup)
Expand Down Expand Up @@ -143,12 +148,14 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
private final IMessageListener localMessageListener;
private final int localClientInactivityTimeoutSeconds;

BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) {
BeatsInitializer(String pluginId, IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThreadCount) {
// Keeps a local copy of Server settings, so they can't be modified once it starts listening
this.localMessageListener = messageListener;
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD,
daemonThreadFactory(pluginId + "-idleStateHandler"));
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThreadCount,
daemonThreadFactory(pluginId + "-beatsHandler"));
}

public void initChannel(SocketChannel socket) {
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/org/logstash/beats/util/DaemonThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.logstash.beats.util;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class DaemonThreadFactory implements ThreadFactory {

final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

DaemonThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
group = Thread.currentThread().getThreadGroup();
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
0);
t.setDaemon(true);
return t;
}

public static ThreadFactory daemonThreadFactory(String namePrefix) {
return new DaemonThreadFactory(namePrefix);
}

}
6 changes: 3 additions & 3 deletions src/test/java/org/logstash/beats/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte

final CountDownLatch latch = new CountDownLatch(concurrentConnections);

final Server server = new Server(host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount);
final Server server = new Server("testServer", host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount);

final AtomicBoolean otherCause = new AtomicBoolean(false);

Expand Down Expand Up @@ -118,7 +118,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt

final CountDownLatch latch = new CountDownLatch(concurrentConnections);
final AtomicBoolean exceptionClose = new AtomicBoolean(false);
final Server server = new Server(host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount);
final Server server = new Server("testServer", host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount);
server.setMessageListener(new MessageListener() {
@Override
public void onNewConnection(ChannelHandlerContext ctx) {
Expand Down Expand Up @@ -174,7 +174,7 @@ public void run() {

@Test
public void testServerShouldAcceptConcurrentConnection() throws InterruptedException {
final Server server = new Server(host, randomPort, 30, eventLoopThreadCount, executorThreadCount);
final Server server = new Server("testServer", host, randomPort, 30, eventLoopThreadCount, executorThreadCount);
SpyListener listener = new SpyListener();
server.setMessageListener(listener);
Runnable serverTask = new Runnable() {
Expand Down