Skip to content

Commit

Permalink
KAFKA-17310 locking the offline dir can destroy the broker exceptiona…
Browse files Browse the repository at this point in the history
…lly (apache#16856)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
brandboat authored Aug 12, 2024
1 parent bbdf79e commit 5d115fe
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,10 @@ public class FileLock {
private final FileChannel channel;
private java.nio.channels.FileLock flock;

public FileLock(File file) {
public FileLock(File file) throws IOException {
this.file = file;
try {
this.channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE);
} catch (IOException e) {
throw new RuntimeException(e);
}
this.channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ,
StandardOpenOption.WRITE);
}

public File file() {
Expand All @@ -54,19 +50,15 @@ public File file() {
/**
* Lock the file or throw an exception if the lock is already held
*/
public synchronized void lock() {
public synchronized void lock() throws IOException {
LOGGER.trace("Acquiring lock on {}", file.getAbsolutePath());
try {
flock = channel.lock();
} catch (IOException e) {
throw new RuntimeException(e);
}
flock = channel.lock();
}

/**
* Try to lock the file and return true if the locking succeeds
*/
public synchronized boolean tryLock() {
public synchronized boolean tryLock() throws IOException {
LOGGER.trace("Acquiring lock on {}", file.getAbsolutePath());
try {
// weirdly this method will return null if the lock is held by another
Expand All @@ -76,37 +68,27 @@ public synchronized boolean tryLock() {
return flock != null;
} catch (OverlappingFileLockException e) {
return false;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Unlock the lock if it is held
*/
public synchronized void unlock() {
public synchronized void unlock() throws IOException {
LOGGER.trace("Releasing lock on {}", file.getAbsolutePath());
if (flock != null) {
try {
flock.release();
} catch (IOException e) {
throw new RuntimeException(e);
}
flock.release();
}
}

/**
* Destroy this lock, closing the associated FileChannel
*/
public synchronized void destroy() {
public synchronized void destroy() throws IOException {
unlock();
try {
if (file.exists() && file.delete()) {
LOGGER.trace("Deleted {}", file.getAbsolutePath());
}
channel.close();
} catch (IOException e) {
throw new RuntimeException(e);
if (file.exists() && file.delete()) {
LOGGER.trace("Deleted {}", file.getAbsolutePath());
}
channel.close();
}
}
5 changes: 3 additions & 2 deletions shell/src/main/java/org/apache/kafka/shell/MetadataShell.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -107,7 +108,7 @@ static File parentParent(File file) {
* this hole would require the parent directory to always be writable when loading a
* snapshot so that we could create our .lock file there.
*/
static FileLock takeDirectoryLockIfExists(File directory) {
static FileLock takeDirectoryLockIfExists(File directory) throws IOException {
if (new File(directory, ".lock").exists()) {
return takeDirectoryLock(directory);
} else {
Expand All @@ -118,7 +119,7 @@ static FileLock takeDirectoryLockIfExists(File directory) {
/**
* Take the FileLock in the given directory.
*/
static FileLock takeDirectoryLock(File directory) {
static FileLock takeDirectoryLock(File directory) throws IOException {
FileLock fileLock = new FileLock(new File(directory, ".lock"));
try {
if (!fileLock.tryLock()) {
Expand Down

0 comments on commit 5d115fe

Please sign in to comment.