From c54253069d28ee93e53512b8964cbbdfb34bff7d Mon Sep 17 00:00:00 2001 From: oliviarla Date: Fri, 10 Jan 2025 18:18:48 +0900 Subject: [PATCH] INTERNAL: Limit bulk get keys size --- .../ascii/AsciiMemcachedNodeImpl.java | 44 +++++++----- .../protocol/ascii/OptimizeOperationTest.java | 70 ++++++++++++++----- 2 files changed, 81 insertions(+), 33 deletions(-) diff --git a/src/main/java/net/spy/memcached/protocol/ascii/AsciiMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/ascii/AsciiMemcachedNodeImpl.java index 942222a46..7c898e013 100644 --- a/src/main/java/net/spy/memcached/protocol/ascii/AsciiMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/ascii/AsciiMemcachedNodeImpl.java @@ -30,6 +30,9 @@ * Memcached node for the ASCII protocol. */ public final class AsciiMemcachedNodeImpl extends TCPMemcachedNodeImpl { + + private static final int MAX_OPTIMIZE_KEY_SIZE = 100; + public AsciiMemcachedNodeImpl(String name, SocketAddress sa, int bufSize, BlockingQueue rq, @@ -43,31 +46,38 @@ public AsciiMemcachedNodeImpl(String name, protected void optimize() { // make sure there are at least two get operations in a row before // attempting to optimize them. - Operation nxtOp = writeQ.peek(); - if (nxtOp instanceof GetOperation) { + if (writeQ.peek() instanceof GetOperation) { optimizedOp = writeQ.remove(); - nxtOp = writeQ.peek(); - if (nxtOp instanceof GetOperation) { - OptimizedGetImpl og = new OptimizedGetImpl( - (GetOperation) optimizedOp, enabledMGetOp()); - optimizedOp = og; - do { - GetOperationImpl o = (GetOperationImpl) writeQ.remove(); - if (!o.isCancelled()) { - og.addOperation(o); + OptimizedGetImpl og = null; + int keySize = ((GetOperation) optimizedOp).getKeys().size(); + Operation nxtOp = writeQ.peek(); + + while (nxtOp instanceof GetOperation) { + keySize += ((GetOperation) nxtOp).getKeys().size(); + if (keySize > MAX_OPTIMIZE_KEY_SIZE) { + break; + } + GetOperationImpl currentOp = (GetOperationImpl) writeQ.remove(); + if (!currentOp.isCancelled()) { + if (og == null) { + og = new OptimizedGetImpl((GetOperation) optimizedOp, enabledMGetOp()); + optimizedOp = og; } - nxtOp = writeQ.peek(); - } while (nxtOp instanceof GetOperation); + og.addOperation(currentOp); + } + nxtOp = writeQ.peek(); + } - // Initialize the new mega get - optimizedOp.initialize(); + // Initialize the new mega get + if (og != null) { + og.initialize(); assert optimizedOp.getState() == OperationState.WRITE_QUEUED; - ProxyCallback pcb = (ProxyCallback) og.getCallback(); + ProxyCallback pcb = (ProxyCallback) optimizedOp.getCallback(); getLogger().debug("Set up %s with %s keys and %s callbacks", this, pcb.numKeys(), pcb.numCallbacks()); } } - } + } } diff --git a/src/test/java/net/spy/memcached/protocol/ascii/OptimizeOperationTest.java b/src/test/java/net/spy/memcached/protocol/ascii/OptimizeOperationTest.java index 33cdf63d2..4bb63cd8f 100644 --- a/src/test/java/net/spy/memcached/protocol/ascii/OptimizeOperationTest.java +++ b/src/test/java/net/spy/memcached/protocol/ascii/OptimizeOperationTest.java @@ -1,10 +1,16 @@ package net.spy.memcached.protocol.ascii; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import net.spy.memcached.ConnectionFactory; +import net.spy.memcached.ConnectionFactoryBuilder; import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.OperationStatus; +import net.spy.memcached.protocol.TCPMemcachedNodeImpl; import org.junit.jupiter.api.Test; @@ -13,24 +19,25 @@ class OptimizeOperationTest { + private final GetOperation.Callback cb = new GetOperation.Callback() { + @Override + public void gotData(String key, int flags, byte[] data) { + // do nothing + } + + @Override + public void receivedStatus(OperationStatus status) { + // do nothing + } + + @Override + public void complete() { + // do nothing + } + }; + @Test void chooseGetOrMGet() { - GetOperation.Callback cb = new GetOperation.Callback() { - @Override - public void gotData(String key, int flags, byte[] data) { - // do nothing - } - - @Override - public void receivedStatus(OperationStatus status) { - // do nothing - } - - @Override - public void complete() { - // do nothing - } - }; GetOperationImpl op1 = new GetOperationImpl("key", cb); GetOperationImpl op2 = new GetOperationImpl("key2", cb); OptimizedGetImpl optimizedOpWithMGet = new OptimizedGetImpl(op1, true); @@ -53,4 +60,35 @@ public void complete() { String commandWithGet = new String(bytesWithGet, StandardCharsets.UTF_8); assertFalse(commandWithGet.contains("mget")); } + + + @Test + void doNotMergeTwoOperations() { + ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder(); + builder.setShouldOptimize(true); + ConnectionFactory cf = builder.build(); + TCPMemcachedNodeImpl node = (TCPMemcachedNodeImpl) cf.createMemcachedNode("node1", + new InetSocketAddress("localhost", 11211), 4096); + node.setVersion("1.11.0"); + + List keyList = new ArrayList<>(); + for (int i = 0; i < 250; i++) { + keyList.add("k" + i); + } + + GetOperationImpl op = new GetOperationImpl(keyList.get(0), cb); + GetOperationImpl op1 = new GetOperationImpl(keyList.subList(0, 190), cb, true); + GetOperationImpl op2 = new GetOperationImpl(keyList.subList(191, 205), cb, true); + node.addOpToWriteQ(op); + node.addOpToWriteQ(op1); + node.addOpToWriteQ(op2); + node.fillWriteBuffer(true); + + ByteBuffer buffer = node.getWbuf(); + byte[] bytesWithMGet = new byte[buffer.remaining()]; + buffer.get(bytesWithMGet); + String commandWithMGet = new String(bytesWithMGet, StandardCharsets.UTF_8); + assertTrue(commandWithMGet.contains("mget 839 190")); + assertTrue(commandWithMGet.contains("mget 69 14")); + } }