Skip to content

Commit

Permalink
[client] Reuse field getters in all the CompletedFetch of same table. (
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 authored Dec 23, 2024
1 parent f3b8897 commit 376ee87
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import com.alibaba.fluss.record.LogRecordBatch;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.InternalRow.FieldGetter;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.CloseableIterator;
import com.alibaba.fluss.utils.Projection;

Expand Down Expand Up @@ -95,11 +93,7 @@ public CompletedFetch(
this.logScannerStatus = logScannerStatus;
this.projection = projection;
this.nextFetchOffset = fetchOffset;
RowType rowType = readContext.getRowType();
this.fieldGetters = new FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < fieldGetters.length; i++) {
fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i);
}
this.fieldGetters = readContext.getFieldGetters();
}

protected abstract ScanRecord toScanRecord(LogRecord record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.alibaba.fluss.metadata.TablePartition;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.record.LogRecords;
import com.alibaba.fluss.record.MemoryLogRecords;
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
import com.alibaba.fluss.remote.RemoteLogSegment;
import com.alibaba.fluss.rpc.GatewayClientProxy;
Expand Down Expand Up @@ -301,16 +303,21 @@ private synchronized void handleFetchLogResponse(
fetchOffset,
fetchResultForBucket.getHighWatermark());
} else {
DefaultCompletedFetch completedFetch =
new DefaultCompletedFetch(
tb,
fetchResultForBucket,
readContext,
logScannerStatus,
isCheckCrcs,
fetchOffset,
projection);
logFetchBuffer.add(completedFetch);
LogRecords logRecords = fetchResultForBucket.recordsOrEmpty();
if (!MemoryLogRecords.EMPTY.equals(logRecords)) {
// In oder to not signal notEmptyCondition, add completed fetch to
// buffer until log records is not empty.
DefaultCompletedFetch completedFetch =
new DefaultCompletedFetch(
tb,
fetchResultForBucket,
readContext,
logScannerStatus,
isCheckCrcs,
fetchOffset,
projection);
logFetchBuffer.add(completedFetch);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import com.alibaba.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
Expand All @@ -42,6 +43,8 @@ public class LogRecordReadContext implements LogRecordBatch.ReadContext, AutoClo
private final RowType rowType;
// the static schemaId of the table, should support dynamic schema evolution in the future
private final int schemaId;
// the fieldGetter to get the log value of the table;
private final InternalRow.FieldGetter[] fieldGetters;
// the Arrow vector schema root of the table, should be null if not ARROW log format
@Nullable private final VectorSchemaRoot vectorSchemaRoot;
// the Arrow memory buffer allocator for the table, should be null if not ARROW log format
Expand Down Expand Up @@ -137,6 +140,10 @@ private LogRecordReadContext(
this.schemaId = schemaId;
this.vectorSchemaRoot = vectorSchemaRoot;
this.bufferAllocator = bufferAllocator;
this.fieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()];
for (int i = 0; i < fieldGetters.length; i++) {
fieldGetters[i] = InternalRow.createFieldGetter(rowType.getChildren().get(i), i);
}
}

@Override
Expand All @@ -158,6 +165,10 @@ public RowType getRowType() {
return rowType;
}

public InternalRow.FieldGetter[] getFieldGetters() {
return fieldGetters;
}

@Override
public VectorSchemaRoot getVectorSchemaRoot(int schemaId) {
checkArgument(
Expand Down

0 comments on commit 376ee87

Please sign in to comment.