Skip to content

Commit

Permalink
[fix][offload] Fix OOM in tiered storage, caused by unbounded offsets…
Browse files Browse the repository at this point in the history
… cache (apache#22679)

Co-authored-by: Jiwe Guo <[email protected]>
  • Loading branch information
lhotari and Technoboy- authored May 9, 2024
1 parent bd4c57d commit 566330c
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
*/
@LimitedPrivate
@Evolving
public interface LedgerOffloaderFactory<T extends LedgerOffloader> {
public interface LedgerOffloaderFactory<T extends LedgerOffloader> extends AutoCloseable {

/**
* Check whether the provided driver <tt>driverName</tt> is supported.
Expand Down Expand Up @@ -111,4 +111,9 @@ default T create(OffloadPoliciesImpl offloadPolicies,
throws IOException {
return create(offloadPolicies, userMetadata, scheduler, offloaderStats);
}

@Override
default void close() throws Exception {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ public LedgerOffloaderFactory getOffloaderFactory(String driverName) throws IOEx
@Override
public void close() throws Exception {
offloaders.forEach(offloader -> {
try {
offloader.getRight().close();
} catch (Exception e) {
log.warn("Failed to close offloader '{}': {}",
offloader.getRight().getClass(), e.getMessage());
}
try {
offloader.getLeft().close();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
import org.apache.bookkeeper.mledger.LedgerOffloaderStatsDisable;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.jcloud.impl.OffsetsCache;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand All @@ -33,12 +34,7 @@
* A jcloud based offloader factory.
*/
public class JCloudLedgerOffloaderFactory implements LedgerOffloaderFactory<BlobStoreManagedLedgerOffloader> {

public static JCloudLedgerOffloaderFactory of() {
return INSTANCE;
}

private static final JCloudLedgerOffloaderFactory INSTANCE = new JCloudLedgerOffloaderFactory();
private final OffsetsCache entryOffsetsCache = new OffsetsCache();

@Override
public boolean isDriverSupported(String driverName) {
Expand All @@ -58,6 +54,12 @@ public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicie

TieredStorageConfiguration config =
TieredStorageConfiguration.create(offloadPolicies.toProperties());
return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats);
return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats,
entryOffsetsCache);
}

@Override
public void close() throws Exception {
entryOffsetsCache.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import java.io.DataInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -56,19 +54,13 @@

public class BlobStoreBackedReadHandleImpl implements ReadHandle {
private static final Logger log = LoggerFactory.getLogger(BlobStoreBackedReadHandleImpl.class);
private static final int CACHE_TTL_SECONDS =
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 30 * 60);

private final long ledgerId;
private final OffloadIndexBlock index;
private final BackedInputStream inputStream;
private final DataInputStream dataStream;
private final ExecutorService executor;
// this Cache is accessed only by one thread
private final Cache<Long, Long> entryOffsets = CacheBuilder
.newBuilder()
.expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
.build();
private final OffsetsCache entryOffsetsCache;
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();

enum State {
Expand All @@ -79,12 +71,14 @@ enum State {
private volatile State state = null;

private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index,
BackedInputStream inputStream, ExecutorService executor) {
BackedInputStream inputStream, ExecutorService executor,
OffsetsCache entryOffsetsCache) {
this.ledgerId = ledgerId;
this.index = index;
this.inputStream = inputStream;
this.dataStream = new DataInputStream(inputStream);
this.executor = executor;
this.entryOffsetsCache = entryOffsetsCache;
state = State.Opened;
}

Expand All @@ -109,7 +103,6 @@ public CompletableFuture<Void> closeAsync() {
try {
index.close();
inputStream.close();
entryOffsets.invalidateAll();
state = State.Closed;
promise.complete(null);
} catch (IOException t) {
Expand Down Expand Up @@ -164,7 +157,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
long entryId = dataStream.readLong();

if (entryId == nextExpectedId) {
entryOffsets.put(entryId, currentPosition);
entryOffsetsCache.put(ledgerId, entryId, currentPosition);
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(length, length);
entries.add(LedgerEntryImpl.create(ledgerId, entryId, length, buf));
int toWrite = length;
Expand Down Expand Up @@ -215,7 +208,7 @@ public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntr
}

private void seekToEntry(long nextExpectedId) throws IOException {
Long knownOffset = entryOffsets.getIfPresent(nextExpectedId);
Long knownOffset = entryOffsetsCache.getIfPresent(ledgerId, nextExpectedId);
if (knownOffset != null) {
inputStream.seek(knownOffset);
} else {
Expand Down Expand Up @@ -269,7 +262,8 @@ public static ReadHandle open(ScheduledExecutorService executor,
BlobStore blobStore, String bucket, String key, String indexKey,
VersionCheck versionCheck,
long ledgerId, int readBufferSize,
LedgerOffloaderStats offloaderStats, String managedLedgerName)
LedgerOffloaderStats offloaderStats, String managedLedgerName,
OffsetsCache entryOffsetsCache)
throws IOException, BKException.BKNoSuchLedgerExistsException {
int retryCount = 3;
OffloadIndexBlock index = null;
Expand Down Expand Up @@ -310,7 +304,7 @@ public static ReadHandle open(ScheduledExecutorService executor,
BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key,
versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName);

return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor);
return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor, entryOffsetsCache);
}

// for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
private AtomicLong bufferLength = new AtomicLong(0);
private AtomicLong segmentLength = new AtomicLong(0);
private final long maxBufferLength;
private final OffsetsCache entryOffsetsCache;
private final ConcurrentLinkedQueue<Entry> offloadBuffer = new ConcurrentLinkedQueue<>();
private CompletableFuture<OffloadResult> offloadResult;
private volatile PositionImpl lastOfferedPosition = PositionImpl.LATEST;
Expand All @@ -123,13 +124,16 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config,
Map<String, String> userMetadata,
OrderedScheduler scheduler,
LedgerOffloaderStats offloaderStats) throws IOException {
LedgerOffloaderStats offloaderStats,
OffsetsCache entryOffsetsCache)
throws IOException {

return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats);
return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats, entryOffsetsCache);
}

BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler,
Map<String, String> userMetadata, LedgerOffloaderStats offloaderStats) {
Map<String, String> userMetadata, LedgerOffloaderStats offloaderStats,
OffsetsCache entryOffsetsCache) {

this.scheduler = scheduler;
this.userMetadata = userMetadata;
Expand All @@ -140,6 +144,7 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration
this.minSegmentCloseTimeMillis = Duration.ofSeconds(config.getMinSegmentTimeInSecond()).toMillis();
//ensure buffer can have enough content to fill a block
this.maxBufferLength = Math.max(config.getWriteBufferSizeInBytes(), config.getMinBlockSizeInBytes());
this.entryOffsetsCache = entryOffsetsCache;
this.segmentBeginTimeMillis = System.currentTimeMillis();
if (!Strings.isNullOrEmpty(config.getRegion())) {
this.writeLocation = new LocationBuilder()
Expand Down Expand Up @@ -555,7 +560,8 @@ public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uid,
readBucket, key, indexKey,
DataBlockUtils.VERSION_CHECK,
ledgerId, config.getReadBufferSizeInBytes(),
this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME)));
this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME),
this.entryOffsetsCache));
} catch (Throwable t) {
log.error("Failed readOffloaded: ", t);
promise.completeExceptionally(t);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class OffsetsCache implements AutoCloseable {
private static final int CACHE_TTL_SECONDS =
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.ttl.seconds", 5 * 60);
// limit the cache size to avoid OOM
// 1 million entries consumes about 60MB of heap space
private static final int CACHE_MAX_SIZE =
Integer.getInteger("pulsar.jclouds.readhandleimpl.offsetsscache.max.size", 1_000_000);
private final ScheduledExecutorService cacheEvictionExecutor;

record Key(long ledgerId, long entryId) {

}

private final Cache<OffsetsCache.Key, Long> entryOffsetsCache;

public OffsetsCache() {
if (CACHE_MAX_SIZE > 0) {
entryOffsetsCache = CacheBuilder
.newBuilder()
.expireAfterAccess(CACHE_TTL_SECONDS, TimeUnit.SECONDS)
.maximumSize(CACHE_MAX_SIZE)
.build();
cacheEvictionExecutor =
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("jcloud-offsets-cache-eviction"));
int period = Math.max(CACHE_TTL_SECONDS / 2, 1);
cacheEvictionExecutor.scheduleAtFixedRate(() -> {
entryOffsetsCache.cleanUp();
}, period, period, TimeUnit.SECONDS);
} else {
cacheEvictionExecutor = null;
entryOffsetsCache = null;
}
}

public void put(long ledgerId, long entryId, long currentPosition) {
if (entryOffsetsCache != null) {
entryOffsetsCache.put(new Key(ledgerId, entryId), currentPosition);
}
}

public Long getIfPresent(long ledgerId, long entryId) {
return entryOffsetsCache != null ? entryOffsetsCache.getIfPresent(new Key(ledgerId, entryId)) : null;
}

public void clear() {
if (entryOffsetsCache != null) {
entryOffsetsCache.invalidateAll();
}
}

@Override
public void close() {
if (cacheEvictionExecutor != null) {
cacheEvictionExecutor.shutdownNow();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.jclouds.blobstore.BlobStore;
import org.jclouds.domain.Credentials;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;

public abstract class BlobStoreManagedLedgerOffloaderBase {
Expand All @@ -46,6 +47,7 @@ public abstract class BlobStoreManagedLedgerOffloaderBase {
protected final JCloudBlobStoreProvider provider;
protected TieredStorageConfiguration config;
protected BlobStore blobStore = null;
protected final OffsetsCache entryOffsetsCache = new OffsetsCache();

protected BlobStoreManagedLedgerOffloaderBase() throws Exception {
scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(5).name("offloader").build();
Expand All @@ -56,6 +58,13 @@ protected BlobStoreManagedLedgerOffloaderBase() throws Exception {
@AfterMethod(alwaysRun = true)
public void cleanupMockBookKeeper() {
bk.getLedgerMap().clear();
entryOffsetsCache.clear();
}

@AfterClass(alwaysRun = true)
public void cleanup() throws Exception {
entryOffsetsCache.close();
scheduler.shutdownNow();
}

protected static MockManagedLedger createMockManagedLedger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, Map<String,
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
.create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats);
.create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats, entryOffsetsCache);
return offloader;
}

Expand All @@ -91,7 +91,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mo
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader
.create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats);
.create(mockedConfig, new HashMap<String, String>(), scheduler, this.offloaderStats, entryOffsetsCache);
return offloader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ private BlobStoreManagedLedgerOffloader getOffloader(BlobStore mockedBlobStore)
private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws IOException {
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats);
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats,
entryOffsetsCache);
return offloader;
}

private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore) throws IOException {
mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket)));
Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore();
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats);
BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap<String,String>(), scheduler, this.offloaderStats,
entryOffsetsCache);
return offloader;
}

Expand Down
Loading

0 comments on commit 566330c

Please sign in to comment.