Skip to content

Commit

Permalink
INTERNAL: Limit bulk get keys size
Browse files Browse the repository at this point in the history
  • Loading branch information
cheesecrust committed Dec 6, 2024
1 parent c9473d9 commit f25bea7
Showing 1 changed file with 30 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
* Memcached node for the ASCII protocol.
*/
public final class AsciiMemcachedNodeImpl extends TCPMemcachedNodeImpl {

private static final int GET_BULK_CHUNK_SIZE = 200;

public AsciiMemcachedNodeImpl(String name,
SocketAddress sa,
int bufSize, BlockingQueue<Operation> rq,
Expand All @@ -45,31 +48,37 @@ protected void optimize() {
// make sure there are at least two get operations in a row before
// attempting to optimize them.
Operation nxtOp = writeQ.peek();
if (nxtOp instanceof GetOperation && nxtOp.getAPIType() != APIType.MGET) {
optimizedOp = writeQ.remove();
nxtOp = writeQ.peek();
if (nxtOp instanceof GetOperation && nxtOp.getAPIType() != APIType.MGET) {
OptimizedGetImpl og = new OptimizedGetImpl(
(GetOperation) optimizedOp);
optimizedOp = og;
if (!(nxtOp instanceof GetOperation) || nxtOp.getAPIType() != APIType.MGET ||
((GetOperation) nxtOp).getKeys().size() > GET_BULK_CHUNK_SIZE) {
return;
}

do {
GetOperationImpl o = (GetOperationImpl) writeQ.remove();
if (!o.isCancelled()) {
og.addOperation(o);
}
nxtOp = writeQ.peek();
} while (nxtOp instanceof GetOperation &&
nxtOp.getAPIType() != APIType.MGET);
int cnt = ((GetOperation) nxtOp).getKeys().size();
optimizedOp = writeQ.remove();
nxtOp = writeQ.peek();
OptimizedGetImpl og = null;

// Initialize the new mega get
optimizedOp.initialize();
assert optimizedOp.getState() == OperationState.WRITE_QUEUED;
ProxyCallback pcb = (ProxyCallback) og.getCallback();
getLogger().debug("Set up %s with %s keys and %s callbacks",
this, pcb.numKeys(), pcb.numCallbacks());
while (nxtOp instanceof GetOperation && nxtOp.getAPIType() != APIType.MGET) {
if (og == null) {
og = new OptimizedGetImpl((GetOperation) optimizedOp);
optimizedOp = og;
}
cnt += ((GetOperation) nxtOp).getKeys().size();
if (cnt > GET_BULK_CHUNK_SIZE) {
break;
}
GetOperationImpl currentOp = (GetOperationImpl) writeQ.remove();
if (!currentOp.isCancelled()) {
og.addOperation(currentOp);
}
nxtOp = writeQ.peek();
}
// Initialize the new mega get
optimizedOp.initialize();
assert optimizedOp.getState() == OperationState.WRITE_QUEUED;
ProxyCallback pcb = (ProxyCallback) optimizedOp.getCallback();
getLogger().debug("Set up %s with %s keys and %s callbacks",
this, pcb.numKeys(), pcb.numCallbacks());
}

}

0 comments on commit f25bea7

Please sign in to comment.