diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetcherTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetcherTest.java index 53405b6fa..a0199c0fa 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetcherTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetcherTest.java @@ -25,6 +25,7 @@ import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.record.MemoryLogRecords; import com.alibaba.fluss.rpc.RpcClient; @@ -47,6 +48,7 @@ import static com.alibaba.fluss.client.admin.ClientToServerITCaseUtils.createTable; import static com.alibaba.fluss.client.admin.ClientToServerITCaseUtils.initConfig; import static com.alibaba.fluss.record.TestData.DATA1; +import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest; import static com.alibaba.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; @@ -65,37 +67,56 @@ public class LogFetcherTest { private LogFetcher logFetcher; private Connection conn; private Admin admin; - private long tableId; - private final int bucketId0 = 0; - private final int bucketId1 = 1; + private RpcClient rpcClient; + private Configuration clientConf; + private MetadataUpdater metadataUpdater; // TODO covert this test to UT as kafka. @BeforeEach protected void setup() throws Exception { - Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + this.clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); conn = ConnectionFactory.createConnection(clientConf); admin = conn.getAdmin(); + this.rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient(); + this.metadataUpdater = new MetadataUpdater(clientConf, rpcClient); + } + + @AfterEach + protected void teardown() throws Exception { + if (admin != null) { + admin.close(); + admin = null; + } + + if (conn != null) { + conn.close(); + conn = null; + } + } + + @Test + void testFetch() throws Exception { // We create table data1NonPkTablePath previously. TablePath tablePath = TablePath.of("test_db_1", "test_table_for_log_fetcher"); - tableId = createTable(admin, tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false); + long tableId = createTable(admin, tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false); FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId); - - RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient(); - MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, rpcClient); metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath)); Map scanBuckets = new HashMap<>(); // add bucket 0 and bucket 1 to log scanner status. + int bucketId0 = 0; scanBuckets.put(new TableBucket(tableId, bucketId0), 0L); + int bucketId1 = 1; scanBuckets.put(new TableBucket(tableId, bucketId1), 0L); LogScannerStatus logScannerStatus = new LogScannerStatus(); logScannerStatus.assignScanBuckets(scanBuckets); TestingScannerMetricGroup scannerMetricGroup = TestingScannerMetricGroup.newInstance(); + TableInfo tableInfo = new TableInfo(tablePath, tableId, DATA1_TABLE_DESCRIPTOR, 1); logFetcher = new LogFetcher( - DATA1_TABLE_INFO, + tableInfo, null, rpcClient, logScannerStatus, @@ -103,23 +124,7 @@ protected void setup() throws Exception { metadataUpdater, scannerMetricGroup, new RemoteFileDownloader(1)); - } - - @AfterEach - protected void teardown() throws Exception { - if (admin != null) { - admin.close(); - admin = null; - } - if (conn != null) { - conn.close(); - conn = null; - } - } - - @Test - void testFetch() throws Exception { // add one batch records to tb0. TableBucket tb0 = new TableBucket(tableId, bucketId0); addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L);