diff --git a/.github/workflows/s3-stream-e2e.yml b/.github/workflows/s3-stream-e2e.yml index c0d038218..d392d2f77 100644 --- a/.github/workflows/s3-stream-e2e.yml +++ b/.github/workflows/s3-stream-e2e.yml @@ -1,6 +1,6 @@ name: E2E-TEST for AutoMQ S3Stream on: - pull_request: + pull_request_target: types: - opened - reopened @@ -47,4 +47,4 @@ jobs: report_paths: '**/surefire-reports/TEST-*.xml' annotate_only: true include_passed: true - detailed_summary: true \ No newline at end of file + detailed_summary: true diff --git a/.gitignore b/.gitignore index 6177d8cc4..cdba9cc9f 100644 --- a/.gitignore +++ b/.gitignore @@ -40,5 +40,10 @@ target/ !**/src/test/**/target/ *.xml.versionsBackup +# Gradle +.gradle/ +build/ +gradle/wrapper/*.jar + # Docker distribution/docker/ddl.sql \ No newline at end of file diff --git a/s3stream/build.gradle b/s3stream/build.gradle new file mode 100644 index 000000000..67c52c2c5 --- /dev/null +++ b/s3stream/build.gradle @@ -0,0 +1,78 @@ +/* + * This file was generated by the Gradle 'init' task. + */ +import com.github.spotbugs.snom.Confidence +import com.github.spotbugs.snom.Effort + +plugins { + id 'java-library' + id 'maven-publish' + id("com.github.spotbugs") version "6.0.7" +} + +spotbugsMain { + reports { + html { + required = true + outputLocation = file("$buildDir/reports/spotbugs.html") + setStylesheet("fancy-hist.xsl") + } + } +} + +spotbugs { + effort = Effort.valueOf('DEFAULT') + reportLevel = Confidence.valueOf('HIGH') +} + +repositories { + mavenLocal() + maven { + url = uri('https://repo.maven.apache.org/maven2/') + } +} + +dependencies { + api 'software.amazon.awssdk:s3:2.20.127' + api 'io.netty:netty-tcnative-boringssl-static:2.0.53.Final' + api 'io.netty:netty-buffer:4.1.100.Final' + api 'com.bucket4j:bucket4j-core:8.5.0' + api 'org.apache.commons:commons-lang3:3.13.0' + api 'org.slf4j:slf4j-api:2.0.9' + api 'net.sourceforge.argparse4j:argparse4j:0.9.0' + api 'net.java.dev.jna:jna:5.2.0' + api 'com.google.guava:guava:32.0.1-jre' + api 'com.fasterxml.jackson.core:jackson-databind:2.16.0' + api 'io.opentelemetry:opentelemetry-api:1.32.0' + api 'io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:1.32.0' + api 'org.aspectj:aspectjrt:1.9.20.1' + api 'org.aspectj:aspectjweaver:1.9.20.1' + testImplementation 'org.slf4j:slf4j-simple:2.0.9' + testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0' + testImplementation 'org.mockito:mockito-core:5.5.0' + testImplementation 'org.mockito:mockito-junit-jupiter:5.5.0' +} + +group = 'com.automq.elasticstream' +description = 's3stream' +java.sourceCompatibility = '11' + +java { + withSourcesJar() +} + +publishing { + publications { + maven(MavenPublication) { + from(components.java) + } + } +} + +tasks.withType(JavaCompile) { + options.encoding = 'UTF-8' +} + +tasks.withType(Javadoc) { + options.encoding = 'UTF-8' +} diff --git a/s3stream/gradle/wrapper/gradle-wrapper.properties b/s3stream/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..37aef8d3f --- /dev/null +++ b/s3stream/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip +networkTimeout=10000 +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/s3stream/gradlew b/s3stream/gradlew new file mode 100755 index 000000000..aeb74cbb4 --- /dev/null +++ b/s3stream/gradlew @@ -0,0 +1,245 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +# This is normally unused +# shellcheck disable=SC2034 +APP_BASE_NAME=${0##*/} +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/s3stream/gradlew.bat b/s3stream/gradlew.bat new file mode 100644 index 000000000..6689b85be --- /dev/null +++ b/s3stream/gradlew.bat @@ -0,0 +1,92 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/s3stream/pom.xml b/s3stream/pom.xml index b6518b2e4..d74a54ff4 100644 --- a/s3stream/pom.xml +++ b/s3stream/pom.xml @@ -31,8 +31,8 @@ 2.20.127 3.2.0 - 17 - 17 + 11 + 11 UTF-8 1.32.0 1.9.20.1 diff --git a/s3stream/settings.gradle b/s3stream/settings.gradle new file mode 100644 index 000000000..51b17df85 --- /dev/null +++ b/s3stream/settings.gradle @@ -0,0 +1,5 @@ +/* + * This file was generated by the Gradle 'init' task. + */ + +rootProject.name = 's3stream' diff --git a/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java b/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java index 71c669d2e..830561675 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java +++ b/s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java @@ -18,12 +18,28 @@ package com.automq.stream.s3; import io.netty.buffer.ByteBuf; +import java.util.Objects; -public record DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition, - int size) { +public final class DataBlockIndex { public static final int BLOCK_INDEX_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */ + 4 /* record count */ + 8 /* block position */ + 4 /* block size */; + private final long streamId; + private final long startOffset; + private final int endOffsetDelta; + private final int recordCount; + private final long startPosition; + private final int size; + + public DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition, + int size) { + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffsetDelta = endOffsetDelta; + this.recordCount = recordCount; + this.startPosition = startPosition; + this.size = size; + } public long endOffset() { return startOffset + endOffsetDelta; @@ -41,4 +57,60 @@ public void encode(ByteBuf buf) { buf.writeLong(startPosition); buf.writeInt(size); } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + public int endOffsetDelta() { + return endOffsetDelta; + } + + public int recordCount() { + return recordCount; + } + + public long startPosition() { + return startPosition; + } + + public int size() { + return size; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (DataBlockIndex) obj; + return this.streamId == that.streamId && + this.startOffset == that.startOffset && + this.endOffsetDelta == that.endOffsetDelta && + this.recordCount == that.recordCount && + this.startPosition == that.startPosition && + this.size == that.size; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset, endOffsetDelta, recordCount, startPosition, size); + } + + @Override + public String toString() { + return "DataBlockIndex[" + + "streamId=" + streamId + ", " + + "startOffset=" + startOffset + ", " + + "endOffsetDelta=" + endOffsetDelta + ", " + + "recordCount=" + recordCount + ", " + + "startPosition=" + startPosition + ", " + + "size=" + size + ']'; + } + } diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index 5e9001250..fc6c82472 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -28,6 +28,7 @@ import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; @@ -121,10 +122,20 @@ public void close0() { } /** - * @param dataBlockSize The total size of the data blocks, which equals to index start position. - * @param indexBlock raw index data. + * */ - public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) { + public static final class BasicObjectInfo { + private final long dataBlockSize; + private final IndexBlock indexBlock; + + /** + * @param dataBlockSize The total size of the data blocks, which equals to index start position. + * @param indexBlock raw index data. + */ + public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) { + this.dataBlockSize = dataBlockSize; + this.indexBlock = indexBlock; + } public static BasicObjectInfo parse(ByteBuf objectTailBuf, S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException { @@ -154,6 +165,38 @@ public int size() { void close() { indexBlock.close(); } + + public long dataBlockSize() { + return dataBlockSize; + } + + public IndexBlock indexBlock() { + return indexBlock; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (BasicObjectInfo) obj; + return this.dataBlockSize == that.dataBlockSize && + Objects.equals(this.indexBlock, that.indexBlock); + } + + @Override + public int hashCode() { + return Objects.hash(dataBlockSize, indexBlock); + } + + @Override + public String toString() { + return "BasicObjectInfo[" + + "dataBlockSize=" + dataBlockSize + ", " + + "indexBlock=" + indexBlock + ']'; + } + } public static class IndexBlock { @@ -261,8 +304,62 @@ void close() { } } - public record FindIndexResult(boolean isFulfilled, long nextStartOffset, int nextMaxBytes, - List streamDataBlocks) { + public static final class FindIndexResult { + private final boolean isFulfilled; + private final long nextStartOffset; + private final int nextMaxBytes; + private final List streamDataBlocks; + + public FindIndexResult(boolean isFulfilled, long nextStartOffset, int nextMaxBytes, + List streamDataBlocks) { + this.isFulfilled = isFulfilled; + this.nextStartOffset = nextStartOffset; + this.nextMaxBytes = nextMaxBytes; + this.streamDataBlocks = streamDataBlocks; + } + + public boolean isFulfilled() { + return isFulfilled; + } + + public long nextStartOffset() { + return nextStartOffset; + } + + public int nextMaxBytes() { + return nextMaxBytes; + } + + public List streamDataBlocks() { + return streamDataBlocks; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (FindIndexResult) obj; + return this.isFulfilled == that.isFulfilled && + this.nextStartOffset == that.nextStartOffset && + this.nextMaxBytes == that.nextMaxBytes && + Objects.equals(this.streamDataBlocks, that.streamDataBlocks); + } + + @Override + public int hashCode() { + return Objects.hash(isFulfilled, nextStartOffset, nextMaxBytes, streamDataBlocks); + } + + @Override + public String toString() { + return "FindIndexResult[" + + "isFulfilled=" + isFulfilled + ", " + + "nextStartOffset=" + nextStartOffset + ", " + + "nextMaxBytes=" + nextMaxBytes + ", " + + "streamDataBlocks=" + streamDataBlocks + ']'; + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 84bca7ac5..58280b52e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -456,11 +456,9 @@ public CompletableFuture forceUpload(long streamId) { StorageOperationStats.getInstance().forceUploadWALAwaitStats.record(MetricsLevel.DEBUG, timer.elapsedAs(TimeUnit.NANOSECONDS)); uploadDeltaWAL(streamId, true); // Wait for all tasks contains streamId complete. - List> tasksContainsStream = this.inflightWALUploadTasks.stream() - .filter(it -> it.cache.containsStream(streamId)) - .map(it -> it.cf) - .toList(); - FutureUtil.propagate(CompletableFuture.allOf(tasksContainsStream.toArray(new CompletableFuture[0])), cf); + FutureUtil.propagate(CompletableFuture.allOf(this.inflightWALUploadTasks.stream() + .filter(it -> it.cache.containsStream(streamId)) + .map(it -> it.cf).toArray(CompletableFuture[]::new)), cf); if (LogCache.MATCH_ALL_STREAMS != streamId) { callbackSequencer.tryFree(streamId); } @@ -736,7 +734,15 @@ synchronized private long calculate() { * Wrapper of {@link WalWriteRequest}. * When the {@code request} is null, it is used as a flag. */ - record WalWriteRequestWrapper(WalWriteRequest request) { + static final class WalWriteRequestWrapper { + private final WalWriteRequest request; + + /** + * + */ + WalWriteRequestWrapper(WalWriteRequest request) { + this.request = request; + } static WalWriteRequestWrapper flag() { return new WalWriteRequestWrapper(null); @@ -745,6 +751,32 @@ static WalWriteRequestWrapper flag() { public boolean isFlag() { return request == null; } + + public WalWriteRequest request() { + return request; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (WalWriteRequestWrapper) obj; + return Objects.equals(this.request, that.request); + } + + @Override + public int hashCode() { + return Objects.hash(request); + } + + @Override + public String toString() { + return "WalWriteRequestWrapper[" + + "request=" + request + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index c2c6fddb1..a8fc11ac5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -40,6 +40,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -146,7 +148,7 @@ public void shutdown() { openedStreams.forEach((streamId, stream) -> streamCloseFutures.put(streamId, stream.close())); for (; ; ) { Threads.sleep(1000); - List closingStreams = streamCloseFutures.entrySet().stream().filter(e -> !e.getValue().isDone()).map(Map.Entry::getKey).toList(); + List closingStreams = streamCloseFutures.entrySet().stream().filter(e -> !e.getValue().isDone()).map(Map.Entry::getKey).collect(Collectors.toList()); LOGGER.info("waiting streams close, closed {} / all {}, closing[{}]", streamCloseFutures.size() - closingStreams.size(), streamCloseFutures.size(), closingStreams); if (closingStreams.isEmpty()) { break; diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java index d75ca18e6..49843f8d1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/BlockCache.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Objects; import java.util.SortedMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -391,7 +392,45 @@ public interface CacheEvictListener { void onCacheEvict(long streamId, long startOffset, long endOffset, int size); } - record CacheBlockKey(long streamId, long startOffset) { + static final class CacheBlockKey { + private final long streamId; + private final long startOffset; + + CacheBlockKey(long streamId, long startOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (CacheBlockKey) obj; + return this.streamId == that.streamId && + this.startOffset == that.startOffset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset); + } + + @Override + public String toString() { + return "CacheBlockKey[" + + "streamId=" + streamId + ", " + + "startOffset=" + startOffset + ']'; + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java index 3162a2f0a..257ab5e85 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DataBlockReadAccumulator.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; @@ -89,6 +90,45 @@ public void readDataBlock(ObjectReader reader, DataBlockIndex blockIndex) { } } - public record ReserveResult(int reserveSize, CompletableFuture cf) { + public static final class ReserveResult { + private final int reserveSize; + private final CompletableFuture cf; + + public ReserveResult(int reserveSize, CompletableFuture cf) { + this.reserveSize = reserveSize; + this.cf = cf; + } + + public int reserveSize() { + return reserveSize; + } + + public CompletableFuture cf() { + return cf; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReserveResult) obj; + return this.reserveSize == that.reserveSize && + Objects.equals(this.cf, that.cf); + } + + @Override + public int hashCode() { + return Objects.hash(reserveSize, cf); + } + + @Override + public String toString() { + return "ReserveResult[" + + "reserveSize=" + reserveSize + ", " + + "cf=" + cf + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index d517dd10c..c123eddf8 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -243,7 +244,45 @@ public enum ReadBlockCacheStatus { WAIT_THROTTLE, } - public record ReadAheadTaskKey(long streamId, long startOffset) { + public static final class ReadAheadTaskKey { + private final long streamId; + private final long startOffset; + + public ReadAheadTaskKey(long streamId, long startOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReadAheadTaskKey) obj; + return this.streamId == that.streamId && + this.startOffset == that.startOffset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset); + } + + @Override + public String toString() { + return "ReadAheadTaskKey[" + + "streamId=" + streamId + ", " + + "startOffset=" + startOffset + ']'; + } } @@ -261,7 +300,21 @@ void setStatus(ReadBlockCacheStatus status) { } } - public record ReadTaskKey(long streamId, long startOffset, long endOffset, int maxBytes, UUID uuid) { + public static final class ReadTaskKey { + private final long streamId; + private final long startOffset; + private final long endOffset; + private final int maxBytes; + private final UUID uuid; + + public ReadTaskKey(long streamId, long startOffset, long endOffset, int maxBytes, UUID uuid) { + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.maxBytes = maxBytes; + this.uuid = uuid; + } + @Override public String toString() { return "ReadTaskKey{" + @@ -272,6 +325,46 @@ public String toString() { ", uuid=" + uuid + '}'; } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + public long endOffset() { + return endOffset; + } + + public int maxBytes() { + return maxBytes; + } + + public UUID uuid() { + return uuid; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReadTaskKey) obj; + return this.streamId == that.streamId && + this.startOffset == that.startOffset && + this.endOffset == that.endOffset && + this.maxBytes == that.maxBytes && + Objects.equals(this.uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(streamId, startOffset, endOffset, maxBytes, uuid); + } + } public static class ReadTaskContext { @@ -288,7 +381,38 @@ void setStatus(ReadBlockCacheStatus status) { } } - public record ReadAheadRecord(long nextRAOffset) { + public static final class ReadAheadRecord { + private final long nextRAOffset; + + public ReadAheadRecord(long nextRAOffset) { + this.nextRAOffset = nextRAOffset; + } + + public long nextRAOffset() { + return nextRAOffset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReadAheadRecord) obj; + return this.nextRAOffset == that.nextRAOffset; + } + + @Override + public int hashCode() { + return Objects.hash(nextRAOffset); + } + + @Override + public String toString() { + return "ReadAheadRecord[" + + "nextRAOffset=" + nextRAOffset + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java b/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java index e19a1ea30..5ed00c71f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/InflightReadThrottle.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -147,6 +148,45 @@ public void run() { } } - record InflightReadItem(int readSize, CompletableFuture cf) { + static final class InflightReadItem { + private final int readSize; + private final CompletableFuture cf; + + InflightReadItem(int readSize, CompletableFuture cf) { + this.readSize = readSize; + this.cf = cf; + } + + public int readSize() { + return readSize; + } + + public CompletableFuture cf() { + return cf; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (InflightReadItem) obj; + return this.readSize == that.readSize && + Objects.equals(this.cf, that.cf); + } + + @Override + public int hashCode() { + return Objects.hash(readSize, cf); + } + + @Override + public String toString() { + return "InflightReadItem[" + + "readSize=" + readSize + ", " + + "cf=" + cf + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java index ee4913a52..634a3f886 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java @@ -149,7 +149,7 @@ public void shutdown() { public CompletableFuture compact() { return this.objectManager.getServerObjects().thenComposeAsync(objectMetadataList -> { List streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream()) - .map(StreamOffsetRange::streamId).distinct().toList(); + .map(StreamOffsetRange::streamId).distinct().collect(Collectors.toList()); return this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> this.compact(streamMetadataList, objectMetadataList), compactThreadPool); }, compactThreadPool); @@ -289,7 +289,7 @@ public CompletableFuture forceSplitAll() { //TODO: deal with metadata delay this.compactScheduledExecutor.execute(() -> this.objectManager.getServerObjects().thenAcceptAsync(objectMetadataList -> { List streamIds = objectMetadataList.stream().flatMap(e -> e.getOffsetRanges().stream()) - .map(StreamOffsetRange::streamId).distinct().toList(); + .map(StreamOffsetRange::streamId).distinct().collect(Collectors.toList()); this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> { if (objectMetadataList.isEmpty()) { logger.info("No stream set objects to force split"); @@ -313,7 +313,7 @@ public CompletableFuture forceSplitAll() { * @param streamMetadataList metadata of opened streams * @param objectMetadata stream set object to split * @param cfs List of CompletableFuture of StreamObject - * @return true if split succeed, false otherwise + * @return true if split succeed, false otherwise */ private boolean splitStreamSetObject(List streamMetadataList, S3ObjectMetadata objectMetadata, Collection> cfs) { @@ -370,7 +370,7 @@ Collection> groupAndSplitStreamDataBlocks(S3Obje // prepare N stream objects at one time objectManager.prepareObject(batchGroup.size(), TimeUnit.MINUTES.toMillis(CompactionConstants.S3_OBJECT_TTL_MINUTES)) .thenComposeAsync(objectId -> { - List blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).toList(); + List blocksToRead = batchGroup.stream().flatMap(p -> p.getLeft().stream()).collect(Collectors.toList()); DataBlockReader reader = new DataBlockReader(objectMetadata, s3Operator, compactionBucket, bucketCallbackScheduledExecutor); // batch read reader.readBlocks(blocksToRead, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth)); @@ -478,7 +478,7 @@ CommitStreamSetObjectRequest buildCompactRequest(List streamMeta request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds)); List compactedObjectMetadata = objectsToCompact.stream() - .filter(e -> compactedObjectIds.contains(e.objectId())).toList(); + .filter(e -> compactedObjectIds.contains(e.objectId())).collect(Collectors.toList()); if (isSanityCheckFailed(streamMetadataList, compactedObjectMetadata, request)) { logger.error("Sanity check failed, compaction result is illegal"); return null; diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionStats.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionStats.java index 3c1c4ab5b..51a032045 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionStats.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactionStats.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; public class CompactionStats { @@ -60,7 +61,45 @@ public Map getS3ObjectToCompactedObjectNumMap() { return s3ObjectToCompactedObjectNumMap; } - public record CompactionStreamRecord(int streamNumInStreamSet, int streamObjectNum) { + public static final class CompactionStreamRecord { + private final int streamNumInStreamSet; + private final int streamObjectNum; + + public CompactionStreamRecord(int streamNumInStreamSet, int streamObjectNum) { + this.streamNumInStreamSet = streamNumInStreamSet; + this.streamObjectNum = streamObjectNum; + } + + public int streamNumInStreamSet() { + return streamNumInStreamSet; + } + + public int streamObjectNum() { + return streamObjectNum; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (CompactionStreamRecord) obj; + return this.streamNumInStreamSet == that.streamNumInStreamSet && + this.streamObjectNum == that.streamObjectNum; + } + + @Override + public int hashCode() { + return Objects.hash(streamNumInStreamSet, streamObjectNum); + } + + @Override + public String toString() { + return "CompactionStreamRecord[" + + "streamNumInStreamSet=" + streamNumInStreamSet + ", " + + "streamObjectNum=" + streamObjectNum + ']'; + } } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java index 5cc780ef2..775a58e57 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockReader.java @@ -189,7 +189,7 @@ private void readContinuousBlocks0(List streamDataBlocks) { private CompletableFuture rangeRead(long start, long end) { return rangeRead0(start, end).whenComplete((ret, ex) -> - CompactionStats.getInstance().compactionReadSizeStats.add(MetricsLevel.INFO, ret.readableBytes())); + CompactionStats.getInstance().compactionReadSizeStats.add(MetricsLevel.INFO, ret.readableBytes())); } private CompletableFuture rangeRead0(long start, long end) { diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java index 7b633c84e..341a2926a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/utils/CompactionUtils.java @@ -116,7 +116,7 @@ public static Map> blockWaitObjectIndices(List sortStreamRangePositions(Map> streamDataBlocksMap) { //TODO: use merge sort @@ -135,7 +135,7 @@ public static List sortStreamRangePositions(Map> groupStreamDataBlocks(List streamDataBlocks, Predicate predicate) { @@ -156,7 +156,8 @@ public static List> groupStreamDataBlocks(List buildObjectStreamRangeFromGroup(List> streamDataBlockGroup) { + public static List buildObjectStreamRangeFromGroup( + List> streamDataBlockGroup) { List objectStreamRanges = new ArrayList<>(); for (List streamDataBlocks : streamDataBlockGroup) { @@ -174,7 +175,8 @@ public static List buildObjectStreamRangeFromGroup(List buildDataBlockIndicesFromGroup(List> streamDataBlockGroup) { + public static List buildDataBlockIndicesFromGroup( + List> streamDataBlockGroup) { List dataBlockIndices = new ArrayList<>(); long blockStartPosition = 0; diff --git a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java index d2edf99e1..725ef401f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java @@ -167,11 +167,11 @@ public synchronized CompletableFuture> getObjects(long st .stream() .map(Pair::getRight) .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1))) - .toList(); + .collect(Collectors.toList()); List streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) .stream() .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1))) - .toList(); + .collect(Collectors.toList()); List result = new ArrayList<>(); result.addAll(streamSetObjectList); @@ -182,7 +182,7 @@ public synchronized CompletableFuture> getObjects(long st return Long.compare(startOffset1, startOffset2); }); - return CompletableFuture.completedFuture(result.stream().limit(limit).toList()); + return CompletableFuture.completedFuture(result.stream().limit(limit).collect(Collectors.toList())); } @Override @@ -190,7 +190,7 @@ public synchronized CompletableFuture> getServerObjects() List result = streamSetObjects.values() .stream() .filter(pair -> pair.getLeft() == NODE_ID_ALLOC.get()) - .map(Pair::getRight).toList(); + .map(Pair::getRight).collect(Collectors.toList()); return CompletableFuture.completedFuture(result); } @@ -201,18 +201,18 @@ public synchronized CompletableFuture> getStreamObjects(l .stream() .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.streamId() == streamId && r.endOffset() > startOffset && (r.startOffset() < endOffset || endOffset == -1))) .limit(limit) - .toList(); + .collect(Collectors.toList()); return CompletableFuture.completedFuture(streamObjectList); } @Override public synchronized CompletableFuture> getOpeningStreams() { - return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.state() == StreamState.OPENED).toList()); + return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.state() == StreamState.OPENED).collect(Collectors.toList())); } @Override public CompletableFuture> getStreams(List streamIds) { - return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).toList()); + return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).collect(Collectors.toList())); } @Override diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 11dba5a2a..10d3e1c34 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -268,14 +268,14 @@ public static void registerNetworkLimiterSupplier(AsyncNetworkBandwidthLimiter.T Supplier networkAvailableBandwidthSupplier, Supplier networkLimiterQueueSizeSupplier) { switch (type) { - case INBOUND -> { + case INBOUND: S3StreamMetricsManager.networkInboundAvailableBandwidthSupplier = networkAvailableBandwidthSupplier; S3StreamMetricsManager.networkInboundLimiterQueueSizeSupplier = networkLimiterQueueSizeSupplier; - } - case OUTBOUND -> { + break; + case OUTBOUND: S3StreamMetricsManager.networkOutboundAvailableBandwidthSupplier = networkAvailableBandwidthSupplier; S3StreamMetricsManager.networkOutboundLimiterQueueSizeSupplier = networkLimiterQueueSizeSupplier; - } + break; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java index 529528751..1435c8189 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/wrapper/HistogramMetric.java @@ -25,7 +25,7 @@ public class HistogramMetric extends ConfigurableMetrics { private final LongHistogram longHistogram; - public HistogramMetric(MetricsConfig metricsConfig,LongHistogram longHistogram) { + public HistogramMetric(MetricsConfig metricsConfig, LongHistogram longHistogram) { this(metricsConfig, Attributes.empty(), longHistogram); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java index 4208f450d..8a2c36d48 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java @@ -167,10 +167,58 @@ public String getName() { } } - record BucketItem(int priority, long size, CompletableFuture cf) implements Comparable { + static final class BucketItem implements Comparable { + private final int priority; + private final long size; + private final CompletableFuture cf; + + BucketItem(int priority, long size, CompletableFuture cf) { + this.priority = priority; + this.size = size; + this.cf = cf; + } + @Override public int compareTo(BucketItem o) { return Long.compare(priority, o.priority); } + + public int priority() { + return priority; + } + + public long size() { + return size; + } + + public CompletableFuture cf() { + return cf; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (BucketItem) obj; + return this.priority == that.priority && + this.size == that.size && + Objects.equals(this.cf, that.cf); + } + + @Override + public int hashCode() { + return Objects.hash(priority, size, cf); + } + + @Override + public String toString() { + return "BucketItem[" + + "priority=" + priority + ", " + + "size=" + size + ", " + + "cf=" + cf + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java index 3c489fc2d..8de9e8cd5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/DefaultS3Operator.java @@ -47,6 +47,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -176,7 +177,8 @@ private static boolean checkPartNumbers(CompletedMultipartUpload multipartUpload private static boolean isUnrecoverable(Throwable ex) { ex = cause(ex); - if (ex instanceof S3Exception s3Ex) { + if (ex instanceof S3Exception) { + S3Exception s3Ex = (S3Exception) ex; return s3Ex.statusCode() == HttpStatusCode.FORBIDDEN || s3Ex.statusCode() == HttpStatusCode.NOT_FOUND; } return false; @@ -808,7 +810,61 @@ void handleReadCompleted(ByteBuf rst, Throwable ex) { } } - record ReadTask(String path, long start, long end, CompletableFuture cf) { + static final class ReadTask { + private final String path; + private final long start; + private final long end; + private final CompletableFuture cf; + + ReadTask(String path, long start, long end, CompletableFuture cf) { + this.path = path; + this.start = start; + this.end = end; + this.cf = cf; + } + + public String path() { + return path; + } + + public long start() { + return start; + } + + public long end() { + return end; + } + + public CompletableFuture cf() { + return cf; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ReadTask) obj; + return Objects.equals(this.path, that.path) && + this.start == that.start && + this.end == that.end && + Objects.equals(this.cf, that.cf); + } + + @Override + public int hashCode() { + return Objects.hash(path, start, end, cf); + } + + @Override + public String toString() { + return "ReadTask[" + + "path=" + path + ", " + + "start=" + start + ", " + + "end=" + end + ", " + + "cf=" + cf + ']'; + } } public static class Builder { diff --git a/s3stream/src/main/java/com/automq/stream/s3/trace/aop/S3StreamTraceAspect.java b/s3stream/src/main/java/com/automq/stream/s3/trace/aop/S3StreamTraceAspect.java index 3490797fb..a4923c5df 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/trace/aop/S3StreamTraceAspect.java +++ b/s3stream/src/main/java/com/automq/stream/s3/trace/aop/S3StreamTraceAspect.java @@ -35,7 +35,8 @@ public void trace(WithSpan withSpan) { @Around(value = "trace(withSpan) && execution(* com.automq.stream..*(..))", argNames = "joinPoint,withSpan") public Object createSpan(ProceedingJoinPoint joinPoint, WithSpan withSpan) throws Throwable { Object[] args = joinPoint.getArgs(); - if (args.length > 0 && args[0] instanceof TraceContext context) { + if (args.length > 0 && args[0] instanceof TraceContext) { + TraceContext context = (TraceContext) args[0]; return TraceUtils.trace(context, joinPoint, withSpan); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index 95b00dc23..849a72300 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -713,15 +714,56 @@ public String toString() { } } - record AppendResultImpl(long recordOffset, CompletableFuture future) implements AppendResult { + static final class AppendResultImpl implements AppendResult { + private final long recordOffset; + private final CompletableFuture future; + + AppendResultImpl(long recordOffset, CompletableFuture future) { + this.recordOffset = recordOffset; + this.future = future; + } @Override public String toString() { return "AppendResultImpl{" + "recordOffset=" + recordOffset + '}'; } + + @Override + public long recordOffset() { + return recordOffset; + } + + @Override + public CompletableFuture future() { + return future; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (AppendResultImpl) obj; + return this.recordOffset == that.recordOffset && + Objects.equals(this.future, that.future); + } + + @Override + public int hashCode() { + return Objects.hash(recordOffset, future); + } + } - record RecoverResultImpl(ByteBuf record, long recordOffset) implements RecoverResult { + static final class RecoverResultImpl implements RecoverResult { + private final ByteBuf record; + private final long recordOffset; + + RecoverResultImpl(ByteBuf record, long recordOffset) { + this.record = record; + this.recordOffset = recordOffset; + } @Override public String toString() { @@ -730,6 +772,33 @@ public String toString() { + ", recordOffset=" + recordOffset + '}'; } + + @Override + public ByteBuf record() { + return record; + } + + @Override + public long recordOffset() { + return recordOffset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (RecoverResultImpl) obj; + return Objects.equals(this.record, that.record) && + this.recordOffset == that.recordOffset; + } + + @Override + public int hashCode() { + return Objects.hash(record, recordOffset); + } + } static class ReadRecordException extends Exception { diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java index f50042617..359e6d69a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.NavigableSet; +import java.util.Objects; import java.util.Random; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; @@ -54,6 +55,9 @@ public class WriteBench implements AutoCloseable { private final WriteAheadLog log; private final TrimOffset trimOffset = new TrimOffset(); + // Generate random payloads for this benchmark tool + private Random random = new Random(); + public WriteBench(Config config) throws IOException { BlockWALService.BlockWALServiceBuilder builder = BlockWALService.builder(config.path, config.capacity); if (config.depth != null) { @@ -176,7 +180,7 @@ private void runAppendTask(int index, AppendTaskConfig config, Stat stat) throws System.out.printf("Append task %d started\n", index); byte[] bytes = new byte[config.recordSizeBytes]; - new Random().nextBytes(bytes); + random.nextBytes(bytes); ByteBuf payload = Unpooled.wrappedBuffer(bytes).retain(); int intervalNanos = (int) TimeUnit.SECONDS.toNanos(1) / Math.max(1, config.throughputBytes / config.recordSizeBytes); long lastAppendTimeNanos = System.nanoTime(); @@ -325,7 +329,62 @@ public Result reset() { return new Result(countValue, costNanosValue, maxCostNanosValue, elapsedTimeNanos); } - public record Result(long count, long costNanos, long maxCostNanos, long elapsedTimeNanos) { + public static final class Result { + private final long count; + private final long costNanos; + private final long maxCostNanos; + private final long elapsedTimeNanos; + + public Result(long count, long costNanos, long maxCostNanos, long elapsedTimeNanos) { + this.count = count; + this.costNanos = costNanos; + this.maxCostNanos = maxCostNanos; + this.elapsedTimeNanos = elapsedTimeNanos; + } + + public long count() { + return count; + } + + public long costNanos() { + return costNanos; + } + + public long maxCostNanos() { + return maxCostNanos; + } + + public long elapsedTimeNanos() { + return elapsedTimeNanos; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (Result) obj; + return this.count == that.count && + this.costNanos == that.costNanos && + this.maxCostNanos == that.maxCostNanos && + this.elapsedTimeNanos == that.elapsedTimeNanos; + } + + @Override + public int hashCode() { + return Objects.hash(count, costNanos, maxCostNanos, elapsedTimeNanos); + } + + @Override + public String toString() { + return "Result[" + + "count=" + count + ", " + + "costNanos=" + costNanos + ", " + + "maxCostNanos=" + maxCostNanos + ", " + + "elapsedTimeNanos=" + elapsedTimeNanos + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java b/s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java index af0cff6e8..9602311ea 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java +++ b/s3stream/src/main/java/com/automq/stream/utils/AsyncRateLimiter.java @@ -18,6 +18,7 @@ package com.automq.stream.utils; import com.google.common.util.concurrent.RateLimiter; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -69,7 +70,44 @@ private synchronized void tick() { } } - record Acquire(CompletableFuture cf, int size) { - } + static final class Acquire { + private final CompletableFuture cf; + private final int size; + + Acquire(CompletableFuture cf, int size) { + this.cf = cf; + this.size = size; + } + + public CompletableFuture cf() { + return cf; + } + + public int size() { + return size; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (Acquire) obj; + return Objects.equals(this.cf, that.cf) && + this.size == that.size; + } + @Override + public int hashCode() { + return Objects.hash(cf, size); + } + + @Override + public String toString() { + return "Acquire[" + + "cf=" + cf + ", " + + "size=" + size + ']'; + } + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java index d835ea18b..805ada558 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java +++ b/s3stream/src/main/java/com/automq/stream/utils/S3Utils.java @@ -69,7 +69,7 @@ public static void checkS3Access(S3Context context) { System.exit(1); } - try (MultipartObjectOperationTask task = new MultipartObjectOperationTask(context)) { + try (S3MultipartUploadTestTask task = new S3MultipartUploadTestTask(context)) { task.run(); } catch (Throwable e) { System.out.println("ERROR: " + ExceptionUtils.getRootCause(e)); @@ -109,7 +109,8 @@ public S3CheckTask(S3Context context, String taskName) { } protected static void showErrorInfo(Exception e) { - if (e.getCause() instanceof S3Exception se) { + if (e.getCause() instanceof S3Exception) { + S3Exception se = (S3Exception) e.getCause(); // Do not use system.err because automq admin tool suppress system.err System.out.println("get S3 exception: "); se.printStackTrace(System.out); @@ -134,9 +135,11 @@ public void close() { } } - private static class MultipartObjectOperationTask extends ObjectOperationTask { - public MultipartObjectOperationTask(S3Context context) { - super(context, MultipartObjectOperationTask.class.getSimpleName()); + // This task is used to test s3 multipart upload + private static class S3MultipartUploadTestTask extends ObjectOperationTask { + private Random random = new Random(); + public S3MultipartUploadTestTask(S3Context context) { + super(context, S3MultipartUploadTestTask.class.getSimpleName()); } @Override @@ -151,12 +154,12 @@ public void run() { int totalSize = data1Size + data2Size; byte[] randomBytes = new byte[data1Size]; - new Random().nextBytes(randomBytes); + random.nextBytes(randomBytes); ByteBuf data1 = Unpooled.wrappedBuffer(randomBytes); writePart(uploadId, path, bucketName, data1, 1).thenAccept(parts::add).get(); byte[] randomBytes2 = new byte[data2Size]; - new Random().nextBytes(randomBytes2); + random.nextBytes(randomBytes2); ByteBuf data2 = Unpooled.wrappedBuffer(randomBytes2); writePart(uploadId, path, bucketName, data2, 2).thenAccept(parts::add).get(); diff --git a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java index 5b8abc052..172210971 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java +++ b/s3stream/src/main/java/com/automq/stream/utils/biniarysearch/IndexBlockOrderedBytes.java @@ -19,6 +19,7 @@ import com.automq.stream.s3.DataBlockIndex; import com.automq.stream.s3.ObjectReader; +import java.util.Objects; public class IndexBlockOrderedBytes extends AbstractOrderedCollection { private final ObjectReader.IndexBlock indexBlock; @@ -37,12 +38,55 @@ protected ComparableItem get(int index) { return new ComparableStreamRange(indexBlock.get(index)); } - public record TargetStreamOffset(long streamId, long offset) { + public static final class TargetStreamOffset { + private final long streamId; + private final long offset; + + public TargetStreamOffset(long streamId, long offset) { + this.streamId = streamId; + this.offset = offset; + } + + public long streamId() { + return streamId; + } + + public long offset() { + return offset; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (TargetStreamOffset) obj; + return this.streamId == that.streamId && + this.offset == that.offset; + } + + @Override + public int hashCode() { + return Objects.hash(streamId, offset); + } + + @Override + public String toString() { + return "TargetStreamOffset[" + + "streamId=" + streamId + ", " + + "offset=" + offset + ']'; + } } - private record ComparableStreamRange(DataBlockIndex index) + private static final class ComparableStreamRange implements ComparableItem { + private final DataBlockIndex index; + + private ComparableStreamRange(DataBlockIndex index) { + this.index = index; + } public long endOffset() { return index.endOffset(); @@ -69,5 +113,31 @@ public boolean isGreaterThan(TargetStreamOffset value) { return this.index().startOffset() > value.offset; } } + + public DataBlockIndex index() { + return index; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) + return true; + if (obj == null || obj.getClass() != this.getClass()) + return false; + var that = (ComparableStreamRange) obj; + return Objects.equals(this.index, that.index); + } + + @Override + public int hashCode() { + return Objects.hash(index); + } + + @Override + public String toString() { + return "ComparableStreamRange[" + + "index=" + index + ']'; + } + } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java index 9045c956a..733f067be 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java @@ -483,7 +483,7 @@ public void testCompactWithLimit() { assertEquals(1, request.getStreamRanges().size()); Set compactedObjectIds = new HashSet<>(request.getCompactedObjectIds()); - s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> compactedObjectIds.contains(s.objectId())).toList(); + s3ObjectMetadata = s3ObjectMetadata.stream().filter(s -> compactedObjectIds.contains(s.objectId())).collect(Collectors.toList()); Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata, request)); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/DefaultS3OperatorTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/DefaultS3OperatorTest.java index 5465184af..dfa4b21b8 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/DefaultS3OperatorTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/DefaultS3OperatorTest.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -70,7 +72,7 @@ void testDeleteObjectsSuccess() { .map(o -> DeletedObject.builder() .key(o.key()) .build()) - .toList()) + .collect(Collectors.toList())) .build(); return CompletableFuture.completedFuture(response); }); diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java index 64685a0f0..8668caf88 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/MultiPartWriterTest.java @@ -185,10 +185,17 @@ void testCopyWrite() throws NoSuchMethodException, InvocationTargetException, Il for (int i = 0; i < 3; i++) { int partNum = uploadPartRequests.get(i).partNumber(); switch (partNum) { - case 2 -> assertEquals(120L, writeContentLengths.get(i)); - case 3 -> assertEquals(280L, writeContentLengths.get(i)); - case 4 -> assertEquals(10L, writeContentLengths.get(i)); - default -> throw new IllegalStateException(); + case 2: + assertEquals(120L, writeContentLengths.get(i)); + break; + case 3: + assertEquals(280L, writeContentLengths.get(i)); + break; + case 4: + assertEquals(10L, writeContentLengths.get(i)); + break; + default: + throw new IllegalStateException(); } }