From 5e5d3797991a1d352be0f7552054b4762fb8a42e Mon Sep 17 00:00:00 2001 From: Georgios Andrianakis Date: Wed, 4 Dec 2024 11:54:55 +0200 Subject: [PATCH] Allow project to build with Java 17 and 21 The formatting changes and a result of the new version of Google Java Format being used, which in turn is needed for spotless to build on Java 17 and 21. This change makes no guarantee that project actually runs on 17 or 21, so it's only a very small first step --- .../client/scanner/log/BucketScanStatus.java | 1 + .../fluss/client/write/RecordAccumulator.java | 1 + .../fluss/client/write/WriterClient.java | 1 + .../fluss/client/table/FlussTableITCase.java | 5 ++++- .../com/alibaba/fluss/cluster/Cluster.java | 4 +++- .../fluss/memory/MemorySegmentPool.java | 4 +++- .../com/alibaba/fluss/metrics/MeterView.java | 4 ++++ .../fluss/record/DefaultValueRecordBatch.java | 4 +++- .../fluss/record/send/SendWritableOutput.java | 4 +++- .../fluss/remote/RemoteLogSegment.java | 8 +++++-- .../alibaba/fluss/utils/Preconditions.java | 1 + .../com/alibaba/fluss/utils/Projection.java | 3 +++ .../com/alibaba/fluss/utils/types/Either.java | 8 +++++-- .../com/alibaba/fluss/utils/types/Tuple2.java | 1 + .../connector/flink/catalog/FlinkCatalog.java | 12 +++++++---- .../split/PaimonSnapshotSplitState.java | 1 + .../reader/FlinkRecordsWithSplitIds.java | 1 + .../split/HybridSnapshotLogSplitState.java | 1 + .../connector/flink/utils/PushdownUtils.java | 1 + .../transform/XmlResponsesSaxParser.java | 21 ++++++++++++++----- .../reader/FlinkRecordsWithSplitIds.java | 1 + .../fluss/protogen/tests/BytesTest.java | 6 ++---- .../fluss/protogen/tests/StringsTest.java | 3 +-- .../alibaba/fluss/rpc/netty/NettyUtils.java | 4 +++- .../fluss/server/kv/KvSnapshotResource.java | 1 + .../server/kv/snapshot/KvSnapshotHandle.java | 1 + .../kv/snapshot/PeriodicSnapshotManager.java | 1 + .../kv/snapshot/SharedKvFileRegistry.java | 1 + .../fluss/server/log/AbstractIndex.java | 1 + .../fluss/server/log/ListOffsetsParam.java | 1 + .../alibaba/fluss/server/log/LogSegments.java | 16 ++++++++++---- .../fluss/server/log/WriterStateManager.java | 1 + .../log/remote/FsRemoteLogOutputStream.java | 1 + .../server/log/remote/LogSegmentFiles.java | 1 + .../alibaba/fluss/server/replica/Replica.java | 2 ++ .../server/replica/delay/DelayedWrite.java | 2 ++ .../replica/fetcher/RemoteLeaderEndpoint.java | 2 ++ .../fluss/server/utils/timer/TimingWheel.java | 1 + .../server/kv/rocksdb/RocksDBExtension.java | 1 + .../fetcher/TestingLeaderEndpoint.java | 2 ++ .../testutils/common/FlussAssertions.java | 4 +++- pom.xml | 6 +++--- 42 files changed, 112 insertions(+), 33 deletions(-) diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/BucketScanStatus.java b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/BucketScanStatus.java index 21d7dc06a..08fce2961 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/BucketScanStatus.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/scanner/log/BucketScanStatus.java @@ -23,6 +23,7 @@ class BucketScanStatus { private long offset; // last consumed position private long highWatermark; // the high watermark from last fetch + // TODO add resetStrategy and nextAllowedRetryTimeMs. public BucketScanStatus() { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java index 2aed98c12..4119d511d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java @@ -816,6 +816,7 @@ private void insertInSequenceOrder(Deque deque, WriteBatch batch) { public static final class RecordAppendResult { public final boolean batchIsFull; public final boolean newBatchCreated; + /** Whether this record was abort because the new batch created in record accumulator. */ public final boolean abortRecordForNewBatch; diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java index 086dcbdcb..3dfb6509e 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java @@ -68,6 +68,7 @@ public class WriterClient { private static final Logger LOG = LoggerFactory.getLogger(WriterClient.class); public static final String SENDER_THREAD_PREFIX = "fluss-write-sender"; + /** * {@link ConfigOptions#CLIENT_WRITER_MAX_INFLIGHT_REQUESTS_PER_BUCKET} should be less than or * equal to this value when idempotence producer enabled to ensure message ordering. diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 9d26a9b3e..d1f2e12b8 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -322,7 +322,10 @@ void testLimitScanLogTable() throws Exception { expectedRows.set(i, new Object[] {expectedRows.get(i)[1]}); } actualRows = - table.limitScan(tb, limitSize, projectedFields).get().stream() + table + .limitScan(new TableBucket(tableId, 0), limitSize, projectedFields) + .get() + .stream() .map(ScanRecord::getRow) .collect(Collectors.toList()); assertThat(actualRows.size()).isEqualTo(limitSize); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/cluster/Cluster.java b/fluss-common/src/main/java/com/alibaba/fluss/cluster/Cluster.java index f6a630cce..45aacfed0 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/cluster/Cluster.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/cluster/Cluster.java @@ -143,7 +143,9 @@ public ServerNode getCoordinatorServer() { return coordinatorServer; } - /** @return The known set of alive tablet servers. */ + /** + * @return The known set of alive tablet servers. + */ public Map getAliveTabletServers() { return aliveTabletServersById; } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegmentPool.java b/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegmentPool.java index 2b778311b..ecb7cc0a9 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegmentPool.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/memory/MemorySegmentPool.java @@ -52,7 +52,9 @@ public interface MemorySegmentPool extends MemorySegmentSource { */ void returnAll(List memory); - /** @return Free page number. */ + /** + * @return Free page number. + */ int freePages(); void close(); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MeterView.java b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MeterView.java index 56378c7db..2f83cd2df 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MeterView.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MeterView.java @@ -44,12 +44,16 @@ public class MeterView implements Meter, MetricView { /** The underlying counter maintaining the count. */ private final Counter counter; + /** The time-span over which the average is calculated. */ private final int timeSpanInSeconds; + /** Circular array containing the history of values. */ private final long[] values; + /** The index in the array for the current time. */ private int time = 0; + /** The last rate we computed. */ private double currentRate = 0; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultValueRecordBatch.java b/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultValueRecordBatch.java index 42af88cc6..706b5b84f 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultValueRecordBatch.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/DefaultValueRecordBatch.java @@ -274,7 +274,9 @@ public void append(short schemaId, BinaryRow row) throws IOException { currentRecordNumber++; } - /** @param valueBytes consisted of schema id and the row encoded in the value bytes */ + /** + * @param valueBytes consisted of schema id and the row encoded in the value bytes + */ public void append(byte[] valueBytes) throws IOException { if (isClosed) { throw new IllegalStateException( diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/send/SendWritableOutput.java b/fluss-common/src/main/java/com/alibaba/fluss/record/send/SendWritableOutput.java index a85d0dcf4..18918e880 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/send/SendWritableOutput.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/send/SendWritableOutput.java @@ -33,7 +33,9 @@ public class SendWritableOutput extends ByteBufWritableOutput { /** The current reader index of the underlying {@link #buf} for building next {@link Send}. */ private int currentReaderIndex = 0; - /** @param buf The ByteBuf that has capacity of data size excluding zero-copy. */ + /** + * @param buf The ByteBuf that has capacity of data size excluding zero-copy. + */ public SendWritableOutput(ByteBuf buf) { super(buf); this.sends = new ArrayDeque<>(1); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/remote/RemoteLogSegment.java b/fluss-common/src/main/java/com/alibaba/fluss/remote/RemoteLogSegment.java index 14e5f0145..306f30195 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/remote/RemoteLogSegment.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/remote/RemoteLogSegment.java @@ -91,12 +91,16 @@ public UUID remoteLogSegmentId() { return remoteLogSegmentId; } - /** @return Remote log start offset of this segment (inclusive). */ + /** + * @return Remote log start offset of this segment (inclusive). + */ public long remoteLogStartOffset() { return remoteLogStartOffset; } - /** @return Remote log end offset of this segment (inclusive). */ + /** + * @return Remote log end offset of this segment (inclusive). + */ public long remoteLogEndOffset() { return remoteLogEndOffset; } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/Preconditions.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/Preconditions.java index f50b57e77..02b128e3d 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/Preconditions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/Preconditions.java @@ -128,6 +128,7 @@ public static void checkArgument( throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs)); } } + // ------------------------------------------------------------------------ // Boolean Condition Checking (State) // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/Projection.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/Projection.java index 0aade6d22..5d383cd85 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/Projection.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/Projection.java @@ -41,10 +41,13 @@ public class Projection { /** the projection indexes including both selected fields and reordering them. */ final int[] projection; + /** the projection indexes that only select fields but not reordering them. */ final int[] projectionInOrder; + /** the indexes to reorder the fields of {@link #projectionInOrder} to {@link #projection}. */ final int[] reorderingIndexes; + /** the flag to indicate whether reordering is needed. */ final boolean reorderingNeeded; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/types/Either.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/types/Either.java index 1506de952..ce645d25e 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/types/Either.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/types/Either.java @@ -59,12 +59,16 @@ public static Either right(R value) { */ public abstract R right() throws IllegalStateException; - /** @return true if this is a Left value, false if this is a Right value */ + /** + * @return true if this is a Left value, false if this is a Right value + */ public final boolean isLeft() { return getClass() == Left.class; } - /** @return true if this is a Right value, false if this is a Left value */ + /** + * @return true if this is a Right value, false if this is a Left value + */ public final boolean isRight() { return getClass() == Right.class; } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/types/Tuple2.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/types/Tuple2.java index db284494b..796ab8d88 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/types/Tuple2.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/types/Tuple2.java @@ -55,6 +55,7 @@ public class Tuple2 extends Tuple { /** Field 0 of the tuple. */ public T0 f0; + /** Field 1 of the tuple. */ public T1 f1; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java index ddbc8a5f1..aaf4d1ca2 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java @@ -366,8 +366,10 @@ public List listPartitions(ObjectPath objectPath) @Override public List listPartitions( ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) - throws TableNotExistException, TableNotPartitionedException, - PartitionSpecInvalidException, CatalogException { + throws TableNotExistException, + TableNotPartitionedException, + PartitionSpecInvalidException, + CatalogException { throw new UnsupportedOperationException(); } @@ -397,8 +399,10 @@ public void createPartition( CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean b) - throws TableNotExistException, TableNotPartitionedException, - PartitionSpecInvalidException, PartitionAlreadyExistsException, + throws TableNotExistException, + TableNotPartitionedException, + PartitionSpecInvalidException, + PartitionAlreadyExistsException, CatalogException { throw new UnsupportedOperationException(); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java index 30ae5f525..7074bbbbe 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java @@ -24,6 +24,7 @@ public class PaimonSnapshotSplitState extends SourceSplitState { private final PaimonSnapshotSplit paimonSnapshotSplit; + /** The records to skip while reading a snapshot. */ private long recordsToSkip; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkRecordsWithSplitIds.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkRecordsWithSplitIds.java index ba9f8013a..7f2c5baee 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkRecordsWithSplitIds.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/reader/FlinkRecordsWithSplitIds.java @@ -45,6 +45,7 @@ public class FlinkRecordsWithSplitIds implements RecordsWithSplitIds splitIterator; + /** The table buckets of the split in splitIterator. */ private final Iterator tableBucketIterator; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplitState.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplitState.java index c73a025b8..afb5986af 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplitState.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/split/HybridSnapshotLogSplitState.java @@ -21,6 +21,7 @@ public class HybridSnapshotLogSplitState extends SourceSplitState { /** The records to skip while reading a snapshot. */ private long recordsToSkip; + /** Whether the snapshot reading is finished. */ private boolean snapshotFinished; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java index 586e84659..3e62eae7a 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/PushdownUtils.java @@ -451,6 +451,7 @@ public static long countLogTable(TablePath tablePath, Configuration flussConfig) throw new FlussRuntimeException(e); } } + // ------------------------------------------------------------------------------------------ /** A structure represents a source field equal literal expression. */ diff --git a/fluss-filesystems/fluss-fs-s3/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java b/fluss-filesystems/fluss-fs-s3/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java index 699850334..8927be8ae 100644 --- a/fluss-filesystems/fluss-fs-s3/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java +++ b/fluss-filesystems/fluss-fs-s3/src/main/java/com/amazonaws/services/s3/model/transform/XmlResponsesSaxParser.java @@ -875,12 +875,16 @@ public static class ListAllMyBucketsHandler extends AbstractHandler { private Bucket currentBucket = null; - /** @return the buckets listed in the document. */ + /** + * @return the buckets listed in the document. + */ public List getBuckets() { return buckets; } - /** @return the owner of the buckets. */ + /** + * @return the owner of the buckets. + */ public Owner getOwner() { return bucketsOwner; } @@ -937,7 +941,9 @@ public static class AccessControlListHandler extends AbstractHandler { private Grantee currentGrantee = null; private Permission currentPermission = null; - /** @return an object representing the ACL document. */ + /** + * @return an object representing the ACL document. + */ public AccessControlList getAccessControlList() { return accessControlList; } @@ -1018,7 +1024,9 @@ public static class BucketLoggingConfigurationHandler extends AbstractHandler { private final BucketLoggingConfiguration bucketLoggingConfiguration = new BucketLoggingConfiguration(); - /** @return an object representing the bucket's LoggingStatus document. */ + /** + * @return an object representing the bucket's LoggingStatus document. + */ public BucketLoggingConfiguration getBucketLoggingConfiguration() { return bucketLoggingConfiguration; } @@ -1048,7 +1056,9 @@ public static class BucketLocationHandler extends AbstractHandler { private String location = null; - /** @return the bucket's location. */ + /** + * @return the bucket's location. + */ public String getLocation() { return location; } @@ -1527,6 +1537,7 @@ public static class CompleteMultipartUploadHandler extends AbstractSSEHandler protected ServerSideEncryptionResult sseResult() { return result; } + /** * @see com.amazonaws.services.s3.model.CompleteMultipartUploadResult#getExpirationTime() */ diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/reader/FlinkRecordsWithSplitIds.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/reader/FlinkRecordsWithSplitIds.java index 99af246a1..d9c5be2cb 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/reader/FlinkRecordsWithSplitIds.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/reader/FlinkRecordsWithSplitIds.java @@ -43,6 +43,7 @@ public class FlinkRecordsWithSplitIds implements RecordsWithSplitIds splitIterator; + /** The table buckets of the split in splitIterator. */ private final Iterator tableBucketIterator; diff --git a/fluss-protogen/fluss-protogen-tests/src/test/java/com/alibaba/fluss/protogen/tests/BytesTest.java b/fluss-protogen/fluss-protogen-tests/src/test/java/com/alibaba/fluss/protogen/tests/BytesTest.java index 14f0aa393..a19695180 100644 --- a/fluss-protogen/fluss-protogen-tests/src/test/java/com/alibaba/fluss/protogen/tests/BytesTest.java +++ b/fluss-protogen/fluss-protogen-tests/src/test/java/com/alibaba/fluss/protogen/tests/BytesTest.java @@ -61,8 +61,7 @@ public void testBytes() throws Exception { // test binary equals to protobuf Bytes.B pbb = - Bytes.B - .newBuilder() + Bytes.B.newBuilder() .setPayload(ByteString.copyFrom(new byte[] {1, 2, 3, 4, 5})) .build(); @@ -254,8 +253,7 @@ public void testRepeatedBytes() throws Exception { assertThat(lpb.getExtraItemAt(1)).isEqualTo(new byte[] {4, 5, 6, 7}); Bytes.B pbb = - Bytes.B - .newBuilder() + Bytes.B.newBuilder() .addExtraItems(ByteString.copyFrom(new byte[] {1, 2, 3})) .addExtraItems(ByteString.copyFrom(new byte[] {4, 5, 6, 7})) .build(); diff --git a/fluss-protogen/fluss-protogen-tests/src/test/java/com/alibaba/fluss/protogen/tests/StringsTest.java b/fluss-protogen/fluss-protogen-tests/src/test/java/com/alibaba/fluss/protogen/tests/StringsTest.java index 47a503984..3490cf0ac 100644 --- a/fluss-protogen/fluss-protogen-tests/src/test/java/com/alibaba/fluss/protogen/tests/StringsTest.java +++ b/fluss-protogen/fluss-protogen-tests/src/test/java/com/alibaba/fluss/protogen/tests/StringsTest.java @@ -57,8 +57,7 @@ public void testStrings() throws Exception { assertThat(lps.getNameAt(2)).isEqualTo("c"); Strings.S pbs = - Strings.S - .newBuilder() + Strings.S.newBuilder() .setId("id") .addNames("a") .addNames("b") diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/NettyUtils.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/NettyUtils.java index 570f20271..d9ba3fe86 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/NettyUtils.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/NettyUtils.java @@ -34,7 +34,9 @@ /** Utils of netty. */ public class NettyUtils { - /** @return an EventLoopGroup suitable for the current platform */ + /** + * @return an EventLoopGroup suitable for the current platform + */ public static EventLoopGroup newEventLoopGroup(int nThreads, String threadNamePrefix) { if (Epoll.isAvailable()) { // Regular Epoll based event loop diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvSnapshotResource.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvSnapshotResource.java index 41926d1cd..0d034a6f6 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvSnapshotResource.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvSnapshotResource.java @@ -42,6 +42,7 @@ public class KvSnapshotResource { /** A scheduler to schedule kv snapshot. */ private final ScheduledExecutorService kvSnapshotScheduler; + /** Thread pool for async snapshot workers. */ private final ExecutorService asyncOperationsThreadPool; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/KvSnapshotHandle.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/KvSnapshotHandle.java index 042bff09b..f9b7199c7 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/KvSnapshotHandle.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/KvSnapshotHandle.java @@ -36,6 +36,7 @@ public class KvSnapshotHandle { /** The shared file(like data file) handles of the kv snapshot. */ private final List sharedFileHandles; + /** The private file(like meta file) handles of the kv snapshot. */ private final List privateFileHandles; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/PeriodicSnapshotManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/PeriodicSnapshotManager.java index 747a2b54c..e1444f935 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/PeriodicSnapshotManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/PeriodicSnapshotManager.java @@ -81,6 +81,7 @@ public class PeriodicSnapshotManager implements Closeable { private volatile boolean started = false; private final long initialDelay; + /** The table bucket that the snapshot manager is for. */ private final TableBucket tableBucket; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/SharedKvFileRegistry.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/SharedKvFileRegistry.java index 00c6b4bd5..1a1212dad 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/SharedKvFileRegistry.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/SharedKvFileRegistry.java @@ -202,6 +202,7 @@ private static final class SharedKvEntry { private final long createdBySnapshotID; private long lastUsedSnapshotID; + /** The shared kv file handle. */ KvFileHandle kvFileHandle; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java index a7ac697d5..674eb7c60 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java @@ -69,6 +69,7 @@ public abstract class AbstractIndex implements Closeable { /** The maximum number of entries this index can hold. */ private volatile int maxEntries; + /** The number of entries in this index. */ private volatile int entries; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/ListOffsetsParam.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/ListOffsetsParam.java index 7bb713cfa..bee56ea76 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/ListOffsetsParam.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/ListOffsetsParam.java @@ -28,6 +28,7 @@ public class ListOffsetsParam { * from follower, it represents listing LocalLogStartOffset. */ public static final int EARLIEST_OFFSET_TYPE = 0; + /** * Latest offset type. If the list offsets request come from client, it represents listing * HighWatermark. otherwise, the request come from follower, it represents listing diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogSegments.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogSegments.java index 18c032b25..2ed83aa66 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogSegments.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/LogSegments.java @@ -174,22 +174,30 @@ public Optional floorSegment(long offset) { return floorEntry(offset).map(Map.Entry::getValue); } - /** @return the entry associated with the greatest offset, if it exists. */ + /** + * @return the entry associated with the greatest offset, if it exists. + */ public Optional> lastEntry() { return Optional.ofNullable(segments.lastEntry()); } - /** @return the log segment with the greatest offset, if it exists. */ + /** + * @return the log segment with the greatest offset, if it exists. + */ public Optional lastSegment() { return Optional.ofNullable(segments.lastEntry()).map(Map.Entry::getValue); } - /** @return the entry associated with the greatest offset, if it exists. */ + /** + * @return the entry associated with the greatest offset, if it exists. + */ public Optional> firstEntry() { return Optional.ofNullable(segments.firstEntry()); } - /** @return the log segment with the greatest offset, if it exists. */ + /** + * @return the log segment with the greatest offset, if it exists. + */ public Optional firstSegment() { return Optional.ofNullable(segments.firstEntry()).map(Map.Entry::getValue); } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java index e6e01f648..90c2c48c5 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/WriterStateManager.java @@ -82,6 +82,7 @@ public class WriterStateManager { private final Map writers = new HashMap<>(); private final File logTabletDir; + /** The same as writers#size, but for lock-free access. */ private volatile int writerIdCount = 0; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/FsRemoteLogOutputStream.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/FsRemoteLogOutputStream.java index 2465350d4..1c4d58233 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/FsRemoteLogOutputStream.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/FsRemoteLogOutputStream.java @@ -36,6 +36,7 @@ public class FsRemoteLogOutputStream extends FSDataOutputStream { private final FsPath basePath; private final FileSystem fs; private final byte[] writeBuffer; + /** The file path can be log file, index file or remote log metadata file. */ private final FsPath remoteLogFilePath; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/LogSegmentFiles.java b/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/LogSegmentFiles.java index 4e614e298..92be27073 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/LogSegmentFiles.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/log/remote/LogSegmentFiles.java @@ -38,6 +38,7 @@ public class LogSegmentFiles { private final Path offsetIndex; private final Path timeIndex; private final @Nullable Path writerIdIndex; + // TODO add leader epoch index after introduce leader epoch. public LogSegmentFiles( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 4b9e9e43c..a07db0e6b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -143,6 +143,7 @@ public final class Replica { private final LogManager logManager; private final LogTablet logTablet; private final long replicaMaxLagTime; + /** A closeable registry to register all registered {@link Closeable}s. */ private final CloseableRegistry closeableRegistry; @@ -157,6 +158,7 @@ public final class Replica { private final int localTabletServerId; private final DelayedOperationManager> delayedWriteManager; + /** The manger to manger the isr expand and shrink. */ private final AdjustIsrManager adjustIsrManager; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedWrite.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedWrite.java index eecb7b6b0..4b28171f4 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedWrite.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/delay/DelayedWrite.java @@ -205,8 +205,10 @@ public String toString() { public static class DelayedBucketStatus { private final long requiredOffset; private final T writeResultForBucket; + /** Whether this bucket is waiting acks. */ private volatile boolean acksPending; + /** The error code of the delayed operation. */ private volatile Errors delayedError; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java index f4f1145e7..dec14b486 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpoint.java @@ -42,8 +42,10 @@ final class RemoteLeaderEndpoint implements LeaderEndpoint { private final int followerServerId; private final ServerNode remoteNode; private final TabletServerGateway tabletServerGateway; + /** The max size for the fetch response. */ private final int maxFetchSize; + /** The max fetch size for a bucket in bytes. */ private final int maxFetchSizeForBucket; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java index 472e3bd92..92f73318c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java @@ -129,6 +129,7 @@ final class TimingWheel { private final AtomicInteger taskCounter; private final DelayQueue queue; + /** The upper level timing wheel. */ private volatile TimingWheel overflowWheel; diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBExtension.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBExtension.java index 9fc9a7ebc..0d5764f8c 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBExtension.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBExtension.java @@ -38,6 +38,7 @@ public class RocksDBExtension implements BeforeEachCallback, AfterEachCallback { private File rockDbDir; private RocksDBResourceContainer rocksDBResourceContainer; + /** The RocksDB instance object. */ private RocksDBKv rocksDBKv; diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java index 3318cd5e9..af5290766 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/TestingLeaderEndpoint.java @@ -44,8 +44,10 @@ public class TestingLeaderEndpoint implements LeaderEndpoint { private final ReplicaManager replicaManager; private final ServerNode localNode; + /** The max size for the fetch response. */ private final int maxFetchSize; + /** The max fetch size for a bucket in bytes. */ private final int maxFetchSizeForBucket; diff --git a/fluss-test-utils/src/main/java/com/alibaba/fluss/testutils/common/FlussAssertions.java b/fluss-test-utils/src/main/java/com/alibaba/fluss/testutils/common/FlussAssertions.java index 0b0315a4c..2ed3f83ce 100644 --- a/fluss-test-utils/src/main/java/com/alibaba/fluss/testutils/common/FlussAssertions.java +++ b/fluss-test-utils/src/main/java/com/alibaba/fluss/testutils/common/FlussAssertions.java @@ -34,7 +34,9 @@ public class FlussAssertions { private FlussAssertions() {} - /** @see #chainOfCauses(Throwable) */ + /** + * @see #chainOfCauses(Throwable) + */ @SuppressWarnings({"rawtypes", "unused"}) public static final InstanceOfAssertFactory> STREAM_THROWABLE = new InstanceOfAssertFactory<>(Stream.class, Assertions::assertThat); diff --git a/pom.xml b/pom.xml index 2e22e16b8..cc237f3b3 100644 --- a/pom.xml +++ b/pom.xml @@ -112,7 +112,7 @@ 2.22.2.Final 1.27 - 2.27.1 + 2.43.0 3.4.3 package @@ -918,7 +918,7 @@ - 1.7 + 1.17.0 @@ -1002,4 +1002,4 @@ - \ No newline at end of file +