diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index f4f72ed63..2d6ab4511 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -43,7 +43,8 @@ public class DefaultS3BlockCache implements S3BlockCache { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3BlockCache.class); - private final Map> inflightReadAheadTasks = new ConcurrentHashMap<>(); + private final Map inflightReadAheadTasks = new ConcurrentHashMap<>(); + private final Map inflightReadStatusMap = new ConcurrentHashMap<>(); private final BlockCache cache; private final ExecutorService mainExecutor; private final ReadAheadManager readAheadManager; @@ -81,14 +82,17 @@ public CompletableFuture read(long streamId, long startOffset, lo CompletableFuture readCf = new CompletableFuture<>(); ReadAheadAgent agent = this.readAheadManager.getOrCreateReadAheadAgent(streamId, startOffset); UUID uuid = UUID.randomUUID(); + ReadTaskKey key = new ReadTaskKey(streamId, startOffset, endOffset, maxBytes , uuid); + ReadTaskContext context = new ReadTaskContext(agent, ReadBlockCacheStatus.INIT); + this.inflightReadStatusMap.put(key, context); // submit read task to mainExecutor to avoid read slower the caller thread. mainExecutor.execute(() -> { - FutureUtil.exec(() -> { - read0(streamId, startOffset, endOffset, maxBytes, agent, uuid).whenComplete((ret, ex) -> { + try { + FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, uuid, context).whenComplete((ret, ex) -> { if (ex != null) { LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail", streamId, startOffset, endOffset, maxBytes, ex); - readCf.completeExceptionally(ex); this.inflightReadThrottle.release(uuid); + this.inflightReadStatusMap.remove(key); return; } int totalReturnedSize = ret.getRecords().stream().mapToInt(StreamRecordBatch::size).sum(); @@ -102,15 +106,22 @@ public CompletableFuture read(long streamId, long startOffset, lo LOGGER.debug("[S3BlockCache] read data complete, cache hit: {}, stream={}, {}-{}, total bytes: {} ", ret.getCacheAccessType() == CacheAccessType.BLOCK_CACHE_HIT, streamId, startOffset, endOffset, totalReturnedSize); } - readCf.complete(ret); this.inflightReadThrottle.release(uuid); - }); - }, readCf, LOGGER, "read"); + this.inflightReadStatusMap.remove(key); + }), readCf); + } catch (Exception e) { + LOGGER.error("read {} [{}, {}), maxBytes: {} from block cache fail, {}", streamId, startOffset, endOffset, maxBytes, e); + this.inflightReadThrottle.release(uuid); + this.inflightReadStatusMap.remove(key); + readCf.completeExceptionally(e); + } }); return readCf; } - public CompletableFuture read0(long streamId, long startOffset, long endOffset, int maxBytes, ReadAheadAgent agent, UUID uuid) { + public CompletableFuture read0(long streamId, long startOffset, long endOffset, int maxBytes, UUID uuid, ReadTaskContext context) { + ReadAheadAgent agent = context.agent; + if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] read0, stream={}, {}-{}, total bytes: {}, uuid: {} ", streamId, startOffset, endOffset, maxBytes, uuid); } @@ -122,15 +133,17 @@ public CompletableFuture read0(long streamId, long startOffset, l long nextStartOffset = startOffset; int nextMaxBytes = maxBytes; - CompletableFuture inflightReadAheadTask = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, nextStartOffset)); - if (inflightReadAheadTask != null) { + ReadAheadTaskContext inflightReadAheadTaskContext = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, nextStartOffset)); + if (inflightReadAheadTaskContext != null) { CompletableFuture readCf = new CompletableFuture<>(); - inflightReadAheadTask.whenComplete((nil, ex) -> FutureUtil.exec(() -> FutureUtil.propagate( - read0(streamId, startOffset, endOffset, maxBytes, agent, uuid), readCf), readCf, LOGGER, "read0")); + context.setStatus(ReadBlockCacheStatus.WAIT_INFLIGHT_RA); + inflightReadAheadTaskContext.cf.whenComplete((nil, ex) -> FutureUtil.exec(() -> FutureUtil.propagate( + read0(streamId, startOffset, endOffset, maxBytes, uuid, context), readCf), readCf, LOGGER, "read0")); return readCf; } // 1. get from cache + context.setStatus(ReadBlockCacheStatus.GET_FROM_CACHE); BlockCache.GetCacheResult cacheRst = cache.get(streamId, nextStartOffset, endOffset, nextMaxBytes); List cacheRecords = cacheRst.getRecords(); if (!cacheRecords.isEmpty()) { @@ -148,7 +161,7 @@ public CompletableFuture read0(long streamId, long startOffset, l if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] read data partially hit cache, stream={}, {}-{}, total bytes: {} ", streamId, nextStartOffset, endOffset, nextMaxBytes); } - return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, agent, uuid).thenApply(rst -> { + return read0(streamId, nextStartOffset, endOffset, nextMaxBytes, uuid, context).thenApply(rst -> { List records = new ArrayList<>(cacheRecords); records.addAll(rst.getRecords()); return new ReadDataBlock(records, CacheAccessType.BLOCK_CACHE_MISS); @@ -157,6 +170,7 @@ public CompletableFuture read0(long streamId, long startOffset, l } // 2. get from s3 + context.setStatus(ReadBlockCacheStatus.GET_FROM_S3); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[S3BlockCache] read data cache miss, stream={}, {}-{}, total bytes: {} ", streamId, startOffset, endOffset, maxBytes); } @@ -190,7 +204,61 @@ public record ReadAheadTaskKey(long streamId, long startOffset) { } + public static class ReadAheadTaskContext { + final CompletableFuture cf; + ReadBlockCacheStatus status; + + public ReadAheadTaskContext(CompletableFuture cf, ReadBlockCacheStatus status) { + this.cf = cf; + this.status = status; + } + + void setStatus(ReadBlockCacheStatus status) { + this.status = status; + } + } + + public record ReadTaskKey(long streamId, long startOffset, long endOffset, int maxBytes, UUID uuid) { + @Override + public String toString() { + return "ReadTaskKey{" + + "streamId=" + streamId + + ", startOffset=" + startOffset + + ", endOffset=" + endOffset + + ", maxBytes=" + maxBytes + + ", uuid=" + uuid + + '}'; + } + } + + public static class ReadTaskContext { + final ReadAheadAgent agent; + ReadBlockCacheStatus status; + + public ReadTaskContext(ReadAheadAgent agent, ReadBlockCacheStatus status) { + this.agent = agent; + this.status = status; + } + + void setStatus(ReadBlockCacheStatus status) { + this.status = status; + } + } + public record ReadAheadRecord(long nextRAOffset) { } + public enum ReadBlockCacheStatus { + /* Status for read request */ + INIT, + WAIT_INFLIGHT_RA, + GET_FROM_CACHE, + GET_FROM_S3, + + /* Status for read ahead request */ + WAIT_DATA_INDEX, + WAIT_FETCH_DATA, + WAIT_THROTTLE, + } + } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java index 83114090f..7263120a7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java @@ -54,7 +54,7 @@ public class StreamReader { private final ObjectReaderLRUCache objectReaders; private final DataBlockReadAccumulator dataBlockReadAccumulator; private final BlockCache blockCache; - private final Map> inflightReadAheadTaskMap; + private final Map inflightReadAheadTaskMap; private final InflightReadThrottle inflightReadThrottle; private final ExecutorService streamReaderExecutor = Threads.newFixedThreadPoolWithMonitor( 2, @@ -73,7 +73,7 @@ public class StreamReader { LOGGER); public StreamReader(S3Operator operator, ObjectManager objectManager, BlockCache blockCache, - Map> inflightReadAheadTaskMap, + Map inflightReadAheadTaskMap, InflightReadThrottle inflightReadThrottle) { this.s3Operator = operator; this.objectManager = objectManager; @@ -86,7 +86,7 @@ public StreamReader(S3Operator operator, ObjectManager objectManager, BlockCache // for test public StreamReader(S3Operator operator, ObjectManager objectManager, BlockCache blockCache, ObjectReaderLRUCache objectReaders, - DataBlockReadAccumulator dataBlockReadAccumulator, Map> inflightReadAheadTaskMap, + DataBlockReadAccumulator dataBlockReadAccumulator, Map inflightReadAheadTaskMap, InflightReadThrottle inflightReadThrottle) { this.s3Operator = operator; this.objectManager = objectManager; @@ -112,15 +112,19 @@ public CompletableFuture> syncReadAhead(long streamId, l TimerUtil timer = new TimerUtil(); DefaultS3BlockCache.ReadAheadTaskKey readAheadTaskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset); // put a placeholder task at start offset to prevent next cache miss request spawn duplicated read ahead task - inflightReadAheadTaskMap.putIfAbsent(readAheadTaskKey, new CompletableFuture<>()); - context.taskKeySet.add(readAheadTaskKey); + DefaultS3BlockCache.ReadAheadTaskContext readAheadTaskContext = new DefaultS3BlockCache.ReadAheadTaskContext(new CompletableFuture<>(), + DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_DATA_INDEX); + if (inflightReadAheadTaskMap.putIfAbsent(readAheadTaskKey, readAheadTaskContext) == null) { + context.taskKeySet.add(readAheadTaskKey); + } return getDataBlockIndices(streamId, endOffset, context) .thenComposeAsync(v -> handleSyncReadAhead(streamId, startOffset, endOffset, maxBytes, agent, uuid, timer, context), streamReaderExecutor) .whenComplete((nil, ex) -> { for (DefaultS3BlockCache.ReadAheadTaskKey key : context.taskKeySet) { - completeInflightTask(key, ex); + completeInflightTask0(key, ex); } + context.taskKeySet.clear(); }); } @@ -167,7 +171,14 @@ CompletableFuture> handleSyncReadAhead(long streamId, lo sortedDataBlockKeyList.add(dataBlockKey); DataBlockReadAccumulator.ReserveResult reserveResult = reserveResults.get(i); DefaultS3BlockCache.ReadAheadTaskKey taskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, streamDataBlock.getStartOffset()); - int readIndex = i; + if (context.taskKeySet.contains(taskKey)) { + setInflightReadAheadStatus(taskKey, DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA); + } + boolean isNotAlignedFirstBlock = i == 0 && startOffset != streamDataBlock.getStartOffset(); + if (isNotAlignedFirstBlock && context.taskKeySet.contains(taskKey)) { + setInflightReadAheadStatus(new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset), + DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA); + } cfList.add(reserveResult.cf().thenApplyAsync(dataBlock -> { if (dataBlock.records().isEmpty()) { return new ArrayList(); @@ -187,13 +198,10 @@ CompletableFuture> handleSyncReadAhead(long streamId, lo LOGGER.error("[S3BlockCache] sync ra fail to read data block, stream={}, {}-{}, data block: {}", streamId, startOffset, endOffset, streamDataBlock, ex); } - completeInflightTask(taskKey, ex); - context.taskKeySet.remove(taskKey); - if (readIndex == 0) { + completeInflightTask(context, taskKey, ex); + if (isNotAlignedFirstBlock) { // in case of first data block and startOffset is not aligned with start of data block - DefaultS3BlockCache.ReadAheadTaskKey key = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset); - completeInflightTask(key, ex); - context.taskKeySet.remove(key); + completeInflightTask(context, new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset), ex); } })); if (reserveResult.reserveSize() > 0) { @@ -259,15 +267,18 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int TimerUtil timer = new TimerUtil(); DefaultS3BlockCache.ReadAheadTaskKey readAheadTaskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset); // put a placeholder task at start offset to prevent next cache miss request spawn duplicated read ahead task - inflightReadAheadTaskMap.putIfAbsent(readAheadTaskKey, new CompletableFuture<>()); + DefaultS3BlockCache.ReadAheadTaskContext readAheadTaskContext = new DefaultS3BlockCache.ReadAheadTaskContext(new CompletableFuture<>(), + DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_DATA_INDEX); + inflightReadAheadTaskMap.putIfAbsent(readAheadTaskKey, readAheadTaskContext); context.taskKeySet.add(readAheadTaskKey); getDataBlockIndices(streamId, endOffset, context) .thenAcceptAsync(v -> handleAsyncReadAhead(streamId, startOffset, endOffset, maxBytes, agent, timer, context), streamReaderExecutor) .whenComplete((nil, ex) -> { for (DefaultS3BlockCache.ReadAheadTaskKey key : context.taskKeySet) { - completeInflightTask(key, ex); + completeInflightTask0(key, ex); } + context.taskKeySet.clear(); }); } @@ -299,6 +310,7 @@ CompletableFuture handleAsyncReadAhead(long streamId, long startOffset, lo DefaultS3BlockCache.ReadAheadTaskKey taskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, streamDataBlock.getStartOffset()); DataBlockReadAccumulator.ReserveResult reserveResult = dataBlockReadAccumulator.reserveDataBlock(List.of(pair)).get(0); int readIndex = i; + boolean isNotAlignedFirstBlock = i == 0 && startOffset != streamDataBlock.getStartOffset(); CompletableFuture cf = reserveResult.cf().thenAcceptAsync(dataBlock -> { if (dataBlock.records().isEmpty()) { return; @@ -318,13 +330,10 @@ CompletableFuture handleAsyncReadAhead(long streamId, long startOffset, lo streamId, startOffset, endOffset, streamDataBlock, ex); } inflightReadThrottle.release(uuid); - completeInflightTask(taskKey, ex); - context.taskKeySet.remove(taskKey); - if (readIndex == 0) { + completeInflightTask(context, taskKey, ex); + if (isNotAlignedFirstBlock) { // in case of first data block and startOffset is not aligned with start of data block - DefaultS3BlockCache.ReadAheadTaskKey key = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset); - completeInflightTask(key, ex); - context.taskKeySet.remove(key); + completeInflightTask(context, new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset), ex); } }); cfList.add(cf); @@ -336,6 +345,13 @@ CompletableFuture handleAsyncReadAhead(long streamId, long startOffset, lo if (reserveResult.reserveSize() > 0) { inflightReadThrottle.acquire(uuid, reserveResult.reserveSize()).thenAcceptAsync(nil -> { // read data block + if (context.taskKeySet.contains(taskKey)) { + setInflightReadAheadStatus(taskKey, DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA); + } + if (isNotAlignedFirstBlock && context.taskKeySet.contains(taskKey)) { + setInflightReadAheadStatus(new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset), + DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA); + } dataBlockReadAccumulator.readDataBlock(objectReader, streamDataBlock.dataBlockIndex()); }, streamReaderExecutor).exceptionally(ex -> { cf.completeExceptionally(ex); @@ -402,8 +418,21 @@ CompletableFuture getDataBlockIndices(long streamId, long endOffset, ReadC for (StreamDataBlock streamDataBlock : streamDataBlocks) { DefaultS3BlockCache.ReadAheadTaskKey taskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, streamDataBlock.getStartOffset()); - inflightReadAheadTaskMap.putIfAbsent(taskKey, new CompletableFuture<>()); - context.taskKeySet.add(taskKey); + DefaultS3BlockCache.ReadAheadTaskContext readAheadContext = new DefaultS3BlockCache.ReadAheadTaskContext(new CompletableFuture<>(), + DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_THROTTLE); + if (inflightReadAheadTaskMap.putIfAbsent(taskKey, readAheadContext) == null) { + context.taskKeySet.add(taskKey); + } + if (context.isFirstDataBlock && streamDataBlock.getStartOffset() != context.nextStartOffset) { + context.isFirstDataBlock = false; + DefaultS3BlockCache.ReadAheadTaskKey key = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, context.nextStartOffset); + if (context.taskKeySet.contains(key)) { + inflightReadAheadTaskMap.computeIfPresent(key, (k, v) -> { + v.status = DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_THROTTLE; + return v; + }); + } + } } S3ObjectMetadata objectMetadata = context.objects.get(context.objectIndex); @@ -421,13 +450,28 @@ CompletableFuture getDataBlockIndices(long streamId, long endOffset, ReadC }, streamReaderExecutor); } - private void completeInflightTask(DefaultS3BlockCache.ReadAheadTaskKey key, Throwable ex) { - CompletableFuture cf = inflightReadAheadTaskMap.remove(key); - if (cf != null) { + private void setInflightReadAheadStatus(DefaultS3BlockCache.ReadAheadTaskKey key, DefaultS3BlockCache.ReadBlockCacheStatus status) { + inflightReadAheadTaskMap.computeIfPresent(key, (k, readAheadContext) -> { + readAheadContext.status = status; + return readAheadContext; + }); + } + + private void completeInflightTask(ReadContext readContext, DefaultS3BlockCache.ReadAheadTaskKey key, Throwable ex) { + if (!readContext.taskKeySet.contains(key)) { + return; + } + completeInflightTask0(key, ex); + readContext.taskKeySet.remove(key); + } + + private void completeInflightTask0(DefaultS3BlockCache.ReadAheadTaskKey key, Throwable ex) { + DefaultS3BlockCache.ReadAheadTaskContext context = inflightReadAheadTaskMap.remove(key); + if (context != null) { if (ex != null) { - cf.completeExceptionally(ex); + context.cf.completeExceptionally(ex); } else { - cf.complete(null); + context.cf.complete(null); } } } @@ -460,6 +504,7 @@ static class ReadContext { List>> streamDataBlocksPair; Map objectReaderMap; Set taskKeySet; + boolean isFirstDataBlock = true; int objectIndex; long nextStartOffset; int nextMaxBytes; diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java index b4edda4bd..94b5003fe 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/StreamReaderTest.java @@ -96,7 +96,7 @@ public void testSyncReadAheadInflight() { S3Operator s3Operator = Mockito.mock(S3Operator.class); ObjectManager objectManager = Mockito.mock(ObjectManager.class); BlockCache blockCache = Mockito.mock(BlockCache.class); - Map> inflightReadAheadTasks = new HashMap<>(); + Map inflightReadAheadTasks = new HashMap<>(); StreamReader streamReader = Mockito.spy(new StreamReader(s3Operator, objectManager, blockCache, cache, accumulator, inflightReadAheadTasks, new InflightReadThrottle())); long streamId = 233L; @@ -118,8 +118,12 @@ public void testSyncReadAheadInflight() { streamReader.syncReadAhead(streamId, startOffset, endOffset, maxBytes, Mockito.mock(ReadAheadAgent.class), UUID.randomUUID()); Threads.sleep(1000); Assertions.assertEquals(2, inflightReadAheadTasks.size()); - Assertions.assertTrue(inflightReadAheadTasks.containsKey(new ReadAheadTaskKey(233L, startOffset))); - Assertions.assertTrue(inflightReadAheadTasks.containsKey(new ReadAheadTaskKey(233L, 64))); + ReadAheadTaskKey key1 = new ReadAheadTaskKey(233L, startOffset); + ReadAheadTaskKey key2 = new ReadAheadTaskKey(233L, 64); + Assertions.assertTrue(inflightReadAheadTasks.containsKey(key1)); + Assertions.assertTrue(inflightReadAheadTasks.containsKey(key2)); + Assertions.assertEquals(DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA, inflightReadAheadTasks.get(key1).status); + Assertions.assertEquals(DefaultS3BlockCache.ReadBlockCacheStatus.WAIT_FETCH_DATA, inflightReadAheadTasks.get(key2).status); } @Test @@ -173,7 +177,7 @@ public StreamRecordBatch next() { cf.whenComplete((rst, ex) -> { Assertions.assertNull(ex); Assertions.assertEquals(1, rst.size()); - Assertions.assertTrue(record1.equals(rst.get(0))); + Assertions.assertEquals(record1, rst.get(0)); Assertions.assertEquals(2, record1.getPayload().refCnt()); Assertions.assertEquals(1, record2.getPayload().refCnt()); }).join(); @@ -186,7 +190,7 @@ public void testSyncReadAheadNotAlign() { S3Operator s3Operator = Mockito.mock(S3Operator.class); ObjectManager objectManager = Mockito.mock(ObjectManager.class); BlockCache blockCache = Mockito.mock(BlockCache.class); - Map> inflightReadAheadTasks = new HashMap<>(); + Map inflightReadAheadTasks = new HashMap<>(); StreamReader streamReader = new StreamReader(s3Operator, objectManager, blockCache, cache, accumulator, inflightReadAheadTasks, new InflightReadThrottle()); long startOffset = 32; @@ -226,7 +230,9 @@ public StreamRecordBatch next() { }); Mockito.when(reader.read(index1)).thenReturn(CompletableFuture.completedFuture(dataBlock1)); context.objectReaderMap = new HashMap<>(Map.of(1L, reader)); - inflightReadAheadTasks.put(new ReadAheadTaskKey(233L, startOffset), new CompletableFuture<>()); + ReadAheadTaskKey key = new ReadAheadTaskKey(233L, startOffset); + context.taskKeySet.add(key); + inflightReadAheadTasks.put(key, new DefaultS3BlockCache.ReadAheadTaskContext(new CompletableFuture<>(), DefaultS3BlockCache.ReadBlockCacheStatus.INIT)); CompletableFuture> cf = streamReader.handleSyncReadAhead(233L, startOffset, 999, 64, Mockito.mock(ReadAheadAgent.class), UUID.randomUUID(), new TimerUtil(), context); @@ -234,7 +240,7 @@ public StreamRecordBatch next() { Assertions.assertNull(ex); Assertions.assertTrue(inflightReadAheadTasks.isEmpty()); Assertions.assertEquals(1, rst.size()); - Assertions.assertTrue(record1.equals(rst.get(0))); + Assertions.assertEquals(record1, rst.get(0)); Assertions.assertEquals(2, record1.getPayload().refCnt()); Assertions.assertEquals(1, record2.getPayload().refCnt()); }).join();