Skip to content

Commit

Permalink
feat(s3stream/wal): pre-check the real capacity of the block device (#…
Browse files Browse the repository at this point in the history
…771)

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored and ShadowySpirits committed Mar 14, 2024
1 parent 5796e04 commit ae5cfb1
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static void main(String[] args) throws IOException {

private static void resetWALHeader(String path) throws IOException {
System.out.println("Resetting WAL header");
if (path.startsWith(WALChannel.WALChannelBuilder.DEVICE_PREFIX)) {
if (path.startsWith(WALChannel.DEVICE_PREFIX)) {
// block device
int capacity = BlockWALService.WAL_HEADER_TOTAL_CAPACITY;
WALChannel channel = WALChannel.builder(path).capacity(capacity).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOUtils;
import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectRandomAccessFile;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;

import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET;

public class WALBlockDeviceChannel implements WALChannel {
private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class);
final String path;
final long capacityWant;
final boolean recoveryMode;
Expand Down Expand Up @@ -104,10 +108,19 @@ public static String checkAvailable() {

@Override
public void open(CapacityReader reader) throws IOException {
if (!path.startsWith(WALChannelBuilder.DEVICE_PREFIX)) {
if (!path.startsWith(WALChannel.DEVICE_PREFIX)) {
openAndCheckFile();
} else {
// We could not get the real capacity of the block device, so we just use the `capacityWant` as the capacity here
try {
long capacity = WALUtil.getBlockDeviceCapacity(path);
if (!recoveryMode && capacityWant > capacity) {
// the real capacity of the block device is smaller than requested
throw new WALCapacityMismatchException(path, capacityWant, capacity);
}
} catch (ExecutionException e) {
LOGGER.warn("failed to get the real capacity of the block device {}, just skip checking", path, e);
}
// We could not get the real capacity of the WAL in block device, so we just use the `capacityWant` as the capacity here
// It will be checked and updated in `checkCapacity` later
capacityFact = capacityWant;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
public interface WALChannel {

String DEVICE_PREFIX = "/dev/";

static WALChannelBuilder builder(String path) {
return new WALChannelBuilder(path);
}
Expand Down Expand Up @@ -98,7 +100,6 @@ interface CapacityReader {
}

class WALChannelBuilder {
public static final String DEVICE_PREFIX = "/dev/";
private final String path;
private Boolean direct;
private long capacity;
Expand Down
23 changes: 23 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package com.automq.stream.s3.wal.util;

import com.automq.stream.utils.CommandResult;
import com.automq.stream.utils.CommandUtils;
import io.netty.buffer.ByteBuf;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.zip.CRC32;

public class WALUtil {
Expand Down Expand Up @@ -100,4 +103,24 @@ public static void createFile(String path, long length) throws IOException {
raf.setLength(length);
}
}

/**
* Get the capacity of the given block device.
*/
public static long getBlockDeviceCapacity(String path) throws ExecutionException {
String[] cmd = new String[]{
"lsblk",
"--bytes",
"--nodeps",
"--output", "SIZE",
"--noheadings",
"--raw",
path
};
CommandResult result = CommandUtils.run(cmd);
if (!result.success()) {
throw new ExecutionException("get block device capacity fail: " + result, null);
}
return Long.parseLong(result.stdout().trim());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ public CommandResult(int code, String stdout, String stderr) {
this.stderr = stderr;
}

/**
* Returns true if the command exited with a zero exit code.
*/
public boolean success() {
return code == 0;
}

public int code() {
return code;
}
Expand Down

0 comments on commit ae5cfb1

Please sign in to comment.