Skip to content

Commit

Permalink
Active Load: Check dir r/w permissions before loading from listening …
Browse files Browse the repository at this point in the history
…dirs (#13488) (#13500)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
YC27 and SteveYurongSu authored Sep 13, 2024
1 parent 492a7f3 commit ba3f08e
Showing 1 changed file with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService {
private final AtomicReference<String[]> listeningDirsConfig = new AtomicReference<>();
private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();

private final Set<String> noPermissionDirs = new CopyOnWriteArraySet<>();

private final ActiveLoadTsFileLoader activeLoadTsFileLoader;

public ActiveLoadDirScanner(final ActiveLoadTsFileLoader activeLoadTsFileLoader) {
Expand All @@ -73,6 +75,10 @@ private void scan() throws IOException {
hotReloadActiveLoadDirs();

for (final String listeningDir : listeningDirs) {
if (!checkPermission(listeningDir)) {
continue;
}

final int currentAllowedPendingSize = activeLoadTsFileLoader.getCurrentAllowedPendingSize();
if (currentAllowedPendingSize <= 0) {
return;
Expand All @@ -93,6 +99,43 @@ private void scan() throws IOException {
}
}

private boolean checkPermission(final String listeningDir) {
try {
final Path listeningDirPath = new File(listeningDir).toPath();

if (!Files.isReadable(listeningDirPath)) {
if (!noPermissionDirs.contains(listeningDir)) {
LOGGER.error(
"Current dir path is not readable: {}."
+ "Skip scanning this dir. Please check the permission.",
listeningDirPath);
noPermissionDirs.add(listeningDir);
}
return false;
}

if (!Files.isWritable(listeningDirPath)) {
if (!noPermissionDirs.contains(listeningDir)) {
LOGGER.error(
"Current dir path is not writable: {}."
+ "Skip scanning this dir. Please check the permission.",
listeningDirPath);
noPermissionDirs.add(listeningDir);
}
return false;
}

noPermissionDirs.remove(listeningDir);
return true;
} catch (final Exception e) {
LOGGER.error(
"Error occurred during checking r/w permission of dir: {}. Skip scanning this dir.",
listeningDir,
e);
return false;
}
}

private boolean isTsFileCompleted(final String file) {
try (final TsFileSequenceReader reader = new TsFileSequenceReader(file, false)) {
return TSFileConfig.MAGIC_STRING.equals(reader.readTailMagic());
Expand Down

0 comments on commit ba3f08e

Please sign in to comment.