Skip to content

Commit

Permalink
fix: fix response earlier release (#555)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Dec 13, 2023
1 parent 20fd237 commit 59c6044
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ public void setSend(NetworkSend send) {

public NetworkSend maybeCompleteSend() {
if (send != null && send.completed()) {
send.release();
midWrite = false;
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
NetworkSend result = send;
Expand Down Expand Up @@ -432,7 +433,12 @@ public long write() throws IOException {
return 0;

midWrite = true;
return send.writeTo(transportLayer);
try {
return send.writeTo(transportLayer);
} catch (IOException e) {
send.release();
throw e;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ public void release() {
if (!freed) {
pack.release();
freed = true;
} else {
LOGGER.warn("PooledMemoryRecords[{}] has been freed", this, new RuntimeException());
}
}

Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1146,7 +1146,6 @@ private[kafka] class Processor(
if (response == null) {
throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`")
}
send.release()
updateRequestMetrics(response)

// Invoke send completion callback
Expand Down Expand Up @@ -1184,7 +1183,6 @@ private[kafka] class Processor(
}.remoteHost
inflightResponses.entrySet().removeIf(e => {
val remove = connectionId.equals(e.getValue.request.context.connectionId)
e.getKey.release()
remove
})
// the channel has been closed by the selector but the quotas still need to be updated
Expand Down

0 comments on commit 59c6044

Please sign in to comment.