From ae5cfb1e3a04d4ec1d1a8268d214ac0782c7942a Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Thu, 30 Nov 2023 12:39:30 +0800 Subject: [PATCH] feat(s3stream/wal): pre-check the real capacity of the block device (#771) Signed-off-by: Ning Yu --- .../stream/s3/wal/benchmark/WriteBench.java | 2 +- .../s3/wal/util/WALBlockDeviceChannel.java | 17 ++++++++++++-- .../automq/stream/s3/wal/util/WALChannel.java | 3 ++- .../automq/stream/s3/wal/util/WALUtil.java | 23 +++++++++++++++++++ .../automq/stream/utils/CommandResult.java | 7 ++++++ 5 files changed, 48 insertions(+), 4 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java index 37ef853467..23c992e929 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/benchmark/WriteBench.java @@ -89,7 +89,7 @@ public static void main(String[] args) throws IOException { private static void resetWALHeader(String path) throws IOException { System.out.println("Resetting WAL header"); - if (path.startsWith(WALChannel.WALChannelBuilder.DEVICE_PREFIX)) { + if (path.startsWith(WALChannel.DEVICE_PREFIX)) { // block device int capacity = BlockWALService.WAL_HEADER_TOTAL_CAPACITY; WALChannel channel = WALChannel.builder(path).capacity(capacity).build(); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java index a6cb7c759c..6ba3369747 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java @@ -23,14 +23,18 @@ import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOUtils; import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectRandomAccessFile; import io.netty.buffer.ByteBuf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET; public class WALBlockDeviceChannel implements WALChannel { + private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class); final String path; final long capacityWant; final boolean recoveryMode; @@ -104,10 +108,19 @@ public static String checkAvailable() { @Override public void open(CapacityReader reader) throws IOException { - if (!path.startsWith(WALChannelBuilder.DEVICE_PREFIX)) { + if (!path.startsWith(WALChannel.DEVICE_PREFIX)) { openAndCheckFile(); } else { - // We could not get the real capacity of the block device, so we just use the `capacityWant` as the capacity here + try { + long capacity = WALUtil.getBlockDeviceCapacity(path); + if (!recoveryMode && capacityWant > capacity) { + // the real capacity of the block device is smaller than requested + throw new WALCapacityMismatchException(path, capacityWant, capacity); + } + } catch (ExecutionException e) { + LOGGER.warn("failed to get the real capacity of the block device {}, just skip checking", path, e); + } + // We could not get the real capacity of the WAL in block device, so we just use the `capacityWant` as the capacity here // It will be checked and updated in `checkCapacity` later capacityFact = capacityWant; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java index b770273885..614c183691 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java @@ -30,6 +30,8 @@ */ public interface WALChannel { + String DEVICE_PREFIX = "/dev/"; + static WALChannelBuilder builder(String path) { return new WALChannelBuilder(path); } @@ -98,7 +100,6 @@ interface CapacityReader { } class WALChannelBuilder { - public static final String DEVICE_PREFIX = "/dev/"; private final String path; private Boolean direct; private long capacity; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java index dcc41832ef..802f0601d1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java @@ -17,12 +17,15 @@ package com.automq.stream.s3.wal.util; +import com.automq.stream.utils.CommandResult; +import com.automq.stream.utils.CommandUtils; import io.netty.buffer.ByteBuf; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import java.util.zip.CRC32; public class WALUtil { @@ -100,4 +103,24 @@ public static void createFile(String path, long length) throws IOException { raf.setLength(length); } } + + /** + * Get the capacity of the given block device. + */ + public static long getBlockDeviceCapacity(String path) throws ExecutionException { + String[] cmd = new String[]{ + "lsblk", + "--bytes", + "--nodeps", + "--output", "SIZE", + "--noheadings", + "--raw", + path + }; + CommandResult result = CommandUtils.run(cmd); + if (!result.success()) { + throw new ExecutionException("get block device capacity fail: " + result, null); + } + return Long.parseLong(result.stdout().trim()); + } } diff --git a/s3stream/src/main/java/com/automq/stream/utils/CommandResult.java b/s3stream/src/main/java/com/automq/stream/utils/CommandResult.java index b4dbe3ce8b..f7c8c6246f 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/CommandResult.java +++ b/s3stream/src/main/java/com/automq/stream/utils/CommandResult.java @@ -28,6 +28,13 @@ public CommandResult(int code, String stdout, String stderr) { this.stderr = stderr; } + /** + * Returns true if the command exited with a zero exit code. + */ + public boolean success() { + return code == 0; + } + public int code() { return code; }