Skip to content

Commit

Permalink
Smart async writer in webserver (#9191) (#9292)
Browse files Browse the repository at this point in the history
New type of writer that can switch from async to sync writing dynamically based on average async queue size heuristics.

Co-authored-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
barchetta and spericas authored Sep 27, 2024
1 parent 32d42c7 commit 16e708b
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.common.socket;

import java.util.concurrent.ExecutorService;

import io.helidon.common.buffers.BufferData;

/**
* A special socket write that starts async but may switch to sync mode if it
* detects that the async queue size is below {@link #QUEUE_SIZE_THRESHOLD}.
* If it switches to sync mode, it shall never return back to async mode.
*/
public class SmartSocketWriter extends SocketWriter {
private static final long WINDOW_SIZE = 1000;
private static final double QUEUE_SIZE_THRESHOLD = 2.0;

private final SocketWriterAsync asyncWriter;
private volatile long windowIndex;
private volatile boolean asyncMode;

SmartSocketWriter(ExecutorService executor, HelidonSocket socket, int writeQueueLength) {
super(socket);
this.asyncWriter = new SocketWriterAsync(executor, socket, writeQueueLength);
this.asyncMode = true;
this.windowIndex = 0L;
}

@Override
public void write(BufferData... buffers) {
for (BufferData buffer : buffers) {
write(buffer);
}
}

@Override
public void write(BufferData buffer) {
if (asyncMode) {
asyncWriter.write(buffer);
if (++windowIndex % WINDOW_SIZE == 0 && asyncWriter.avgQueueSize() < QUEUE_SIZE_THRESHOLD) {
asyncMode = false;
}
} else {
asyncWriter.drainQueue();
writeNow(buffer); // blocking write
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,15 +44,19 @@ protected SocketWriter(HelidonSocket socket) {
* @param socket socket to write to
* @param writeQueueLength maximal number of queued writes, write operation will block if the queue is full; if set to
* {code 1} or lower, write queue is disabled and writes are direct to socket (blocking)
* @param smartAsyncWrites flag to enable smart async writes, see {@link io.helidon.common.socket.SmartSocketWriter}
* @return a new socket writer
*/
public static SocketWriter create(ExecutorService executor,
HelidonSocket socket,
int writeQueueLength) {
int writeQueueLength,
boolean smartAsyncWrites) {
if (writeQueueLength <= 1) {
return new SocketWriterDirect(socket);
} else {
return new SocketWriterAsync(executor, socket, writeQueueLength);
return smartAsyncWrites
? new SmartSocketWriter(executor, socket, writeQueueLength)
: new SocketWriterAsync(executor, socket, writeQueueLength);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,13 +33,15 @@
class SocketWriterAsync extends SocketWriter implements DataWriter {
private static final System.Logger LOGGER = System.getLogger(SocketWriterAsync.class.getName());
private static final BufferData CLOSING_TOKEN = BufferData.empty();

private final ExecutorService executor;
private final ArrayBlockingQueue<BufferData> writeQueue;
private final CountDownLatch cdl = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false);
private volatile Throwable caught;
private volatile boolean run = true;
private Thread thread;
private double avgQueueSize;

/**
* A new socket writer.
Expand Down Expand Up @@ -116,14 +118,16 @@ private void run() {
CompositeBufferData toWrite = BufferData.createComposite(writeQueue.take()); // wait if the queue is empty
// we only want to read a certain amount of data, if somebody writes huge amounts
// we could spin here forever and run out of memory
for (int i = 0; i < 1000; i++) {
int queueSize = 1;
for (; queueSize <= 1000; queueSize++) {
BufferData newBuf = writeQueue.poll(); // drain ~all elements from the queue, don't wait.
if (newBuf == null) {
break;
}
toWrite.add(newBuf);
}
writeNow(toWrite);
avgQueueSize = (avgQueueSize + queueSize) / 2.0;
}
cdl.countDown();
} catch (Throwable e) {
Expand All @@ -141,4 +145,15 @@ private void checkRunning() {
throw new SocketWriterException(caught);
}
}

void drainQueue() {
BufferData buffer;
while ((buffer = writeQueue.poll()) != null) {
writeNow(buffer);
}
}

double avgQueueSize() {
return avgQueueSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ public final void run() {
}

reader = new DataReader(new MapExceptionDataSupplier(helidonSocket));
writer = SocketWriter.create(listenerContext.executor(), helidonSocket,
listenerContext.config().writeQueueLength());
writer = SocketWriter.create(listenerContext.executor(),
helidonSocket,
listenerConfig.writeQueueLength(),
listenerConfig.smartAsyncWrites());
} catch (Exception e) {
throw e instanceof RuntimeException re ? re : new RuntimeException(e); // see ServerListener
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ interface ListenerConfigBlueprint {
@Option.DefaultInt(0)
int writeQueueLength();

/**
* If enabled and {@link #writeQueueLength()} is greater than 1, then
* start with async writes but possibly switch to sync writes if
* async queue size is always below a certain threshold.
*
* @return smart async setting
*/
@Option.Configured
@Option.DefaultBoolean(false)
boolean smartAsyncWrites();

/**
* Initial buffer size in bytes of {@link java.io.BufferedOutputStream} created internally to
* write data to a socket connection. Default is {@code 4096}.
Expand Down

0 comments on commit 16e708b

Please sign in to comment.