diff --git a/CHANGELOG.md b/CHANGELOG.md index 49f72257..61090581 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 7.0.1 + - Name netty threads according to their purpose and the plugin id [#511](https://github.com/logstash-plugins/logstash-input-beats/pull/511) + ## 7.0.0 - Remove deprecated SSL settings - SSL settings that were marked deprecated in version `6.6.0` are now marked obsolete, and will prevent the plugin from starting. diff --git a/VERSION b/VERSION index 66ce77b7..9fe9ff9d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -7.0.0 +7.0.1 diff --git a/lib/logstash/inputs/beats.rb b/lib/logstash/inputs/beats.rb index a84d94f4..4585fc8f 100644 --- a/lib/logstash/inputs/beats.rb +++ b/lib/logstash/inputs/beats.rb @@ -205,7 +205,7 @@ def register end # def register def create_server - server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads) + server = org.logstash.beats.Server.new(@id, @host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads) server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled server end diff --git a/spec/inputs/beats_spec.rb b/spec/inputs/beats_spec.rb index 6d8aab04..4c465c67 100644 --- a/spec/inputs/beats_spec.rb +++ b/spec/inputs/beats_spec.rb @@ -38,7 +38,7 @@ let(:port) { 9001 } it "sends the required options to the server" do - expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, event_loop_threads, executor_threads) + expect(org.logstash.beats.Server).to receive(:new).with(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads) subject.register end end @@ -386,8 +386,8 @@ subject(:plugin) { LogStash::Inputs::Beats.new(config) } before do - @server = org.logstash.beats.Server.new(host, port, client_inactivity_timeout, event_loop_threads, executor_threads) - expect( org.logstash.beats.Server ).to receive(:new).with(host, port, client_inactivity_timeout, event_loop_threads, executor_threads).and_return @server + @server = org.logstash.beats.Server.new(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads) + expect( org.logstash.beats.Server ).to receive(:new).with(plugin.id, host, port, client_inactivity_timeout, event_loop_threads, executor_threads).and_return @server expect( @server ).to receive(:listen) subject.register diff --git a/spec/integration/filebeat_spec.rb b/spec/integration/filebeat_spec.rb index e2e13d8c..386e1fff 100644 --- a/spec/integration/filebeat_spec.rb +++ b/spec/integration/filebeat_spec.rb @@ -51,7 +51,7 @@ before :each do FileUtils.rm_rf(File.join(File.dirname(__FILE__), "..", "..", "vendor", "filebeat", "data")) start_client - raise 'Filebeat did not start in allocated time' unless is_alive + raise "Filebeat did not start in allocated time, due to: #{@execution_output}" unless is_alive sleep(20) # give some time to FB to send something end diff --git a/src/main/java/org/logstash/beats/Runner.java b/src/main/java/org/logstash/beats/Runner.java index 32134a32..c2ac49cb 100644 --- a/src/main/java/org/logstash/beats/Runner.java +++ b/src/main/java/org/logstash/beats/Runner.java @@ -17,7 +17,7 @@ static public void main(String[] args) throws Exception { // Check for leaks. // ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); - Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, 0, Runtime.getRuntime().availableProcessors()); + Server server = new Server("test", "0.0.0.0", DEFAULT_PORT, 15, 0, Runtime.getRuntime().availableProcessors()); if(args.length > 0 && args[0].equals("ssl")) { logger.debug("Using SSL"); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index fb4b4b1b..49c2a627 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -18,10 +18,14 @@ import org.apache.logging.log4j.Logger; import org.logstash.netty.SslHandlerProvider; +import static org.logstash.beats.util.DaemonThreadFactory.daemonThreadFactory; + public class Server { private final static Logger logger = LogManager.getLogger(Server.class); private final int port; + + private final String id; private final String host; private final int eventLoopThreadCount; private final int executorThreadCount; @@ -33,7 +37,8 @@ public class Server { private final int clientInactivityTimeoutSeconds; - public Server(String host, int port, int clientInactivityTimeoutSeconds, int eventLoopThreadCount, int executorThreadCount) { + public Server(String id, String host, int port, int clientInactivityTimeoutSeconds, int eventLoopThreadCount, int executorThreadCount) { + this.id = id; this.host = host; this.port = port; this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; @@ -54,12 +59,12 @@ public Server listen() throws InterruptedException { logger.error("Could not shut down worker group before starting", e); } } - bossGroup = new NioEventLoopGroup(eventLoopThreadCount); // TODO: add a config to make it adjustable, no need many threads - workGroup = new NioEventLoopGroup(eventLoopThreadCount); + bossGroup = new NioEventLoopGroup(eventLoopThreadCount, daemonThreadFactory(id + "-bossGroup")); // TODO: add a config to make it adjustable, no need many threads + workGroup = new NioEventLoopGroup(eventLoopThreadCount, daemonThreadFactory(id + "-workGroup")); try { logger.info("Starting server on port: {}", this.port); - beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount); + beatsInitializer = new BeatsInitializer(id, messageListener, clientInactivityTimeoutSeconds, executorThreadCount); ServerBootstrap server = new ServerBootstrap(); server.group(bossGroup, workGroup) @@ -143,12 +148,14 @@ private class BeatsInitializer extends ChannelInitializer { private final IMessageListener localMessageListener; private final int localClientInactivityTimeoutSeconds; - BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) { + BeatsInitializer(String pluginId, IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThreadCount) { // Keeps a local copy of Server settings, so they can't be modified once it starts listening this.localMessageListener = messageListener; this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; - idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); - beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); + idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD, + daemonThreadFactory(pluginId + "-idleStateHandler")); + beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThreadCount, + daemonThreadFactory(pluginId + "-beatsHandler")); } public void initChannel(SocketChannel socket) { diff --git a/src/main/java/org/logstash/beats/util/DaemonThreadFactory.java b/src/main/java/org/logstash/beats/util/DaemonThreadFactory.java new file mode 100644 index 00000000..dcfe7e8d --- /dev/null +++ b/src/main/java/org/logstash/beats/util/DaemonThreadFactory.java @@ -0,0 +1,30 @@ +package org.logstash.beats.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class DaemonThreadFactory implements ThreadFactory { + + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + DaemonThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + group = Thread.currentThread().getThreadGroup(); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", + 0); + t.setDaemon(true); + return t; + } + + public static ThreadFactory daemonThreadFactory(String namePrefix) { + return new DaemonThreadFactory(namePrefix); + } + +} diff --git a/src/test/java/org/logstash/beats/ServerTest.java b/src/test/java/org/logstash/beats/ServerTest.java index 4e604778..73462f22 100644 --- a/src/test/java/org/logstash/beats/ServerTest.java +++ b/src/test/java/org/logstash/beats/ServerTest.java @@ -52,7 +52,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte final CountDownLatch latch = new CountDownLatch(concurrentConnections); - final Server server = new Server(host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount); + final Server server = new Server("testServer", host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount); final AtomicBoolean otherCause = new AtomicBoolean(false); @@ -118,7 +118,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt final CountDownLatch latch = new CountDownLatch(concurrentConnections); final AtomicBoolean exceptionClose = new AtomicBoolean(false); - final Server server = new Server(host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount); + final Server server = new Server("testServer", host, randomPort, inactivityTime, eventLoopThreadCount, executorThreadCount); server.setMessageListener(new MessageListener() { @Override public void onNewConnection(ChannelHandlerContext ctx) { @@ -174,7 +174,7 @@ public void run() { @Test public void testServerShouldAcceptConcurrentConnection() throws InterruptedException { - final Server server = new Server(host, randomPort, 30, eventLoopThreadCount, executorThreadCount); + final Server server = new Server("testServer", host, randomPort, 30, eventLoopThreadCount, executorThreadCount); SpyListener listener = new SpyListener(); server.setMessageListener(listener); Runnable serverTask = new Runnable() {