Skip to content

Commit

Permalink
Modernize WatchServiceFileSystemWatcher
Browse files Browse the repository at this point in the history
- Switch to java.nio consistently
- Allow to monitor a directory for specific files
- Monitor .env file
  • Loading branch information
gsmet committed Jul 11, 2024
1 parent 00350b7 commit 9a71a13
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,17 @@ public void handleChanges(Collection<FileChangeEvent> changes) {
periodicTestCompile();
}
};
// monitor .env as it can impact test execution
testClassChangeWatcher.watchFiles(Path.of(context.getApplicationRoot().getProjectDirectory()),
List.of(Path.of(".env")),
callback);
Set<Path> nonExistent = new HashSet<>();
for (DevModeContext.ModuleInfo module : context.getAllModules()) {
for (Path path : module.getMain().getSourcePaths()) {
testClassChangeWatcher.watchPath(path.toFile(), callback);
testClassChangeWatcher.watchDirectoryRecursively(path, callback);
}
for (Path path : module.getMain().getResourcePaths()) {
testClassChangeWatcher.watchPath(path.toFile(), callback);
testClassChangeWatcher.watchDirectoryRecursively(path, callback);
}
}
for (DevModeContext.ModuleInfo module : context.getAllModules()) {
Expand All @@ -264,14 +268,14 @@ public void handleChanges(Collection<FileChangeEvent> changes) {
if (!Files.isDirectory(path)) {
nonExistent.add(path);
} else {
testClassChangeWatcher.watchPath(path.toFile(), callback);
testClassChangeWatcher.watchDirectoryRecursively(path, callback);
}
}
for (Path path : module.getTest().get().getResourcePaths()) {
if (!Files.isDirectory(path)) {
nonExistent.add(path);
} else {
testClassChangeWatcher.watchPath(path.toFile(), callback);
testClassChangeWatcher.watchDirectoryRecursively(path, callback);
}
}
}
Expand All @@ -287,7 +291,7 @@ public void run() {
Path i = iterator.next();
if (Files.isDirectory(i)) {
iterator.remove();
testClassChangeWatcher.watchPath(i.toFile(), callback);
testClassChangeWatcher.watchDirectoryRecursively(i, callback);
added = true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.quarkus.deployment.dev.filesystem.watch;

import java.io.File;
import java.nio.file.Path;

/**
* The event object that is fired when a file system change is detected.
Expand All @@ -10,7 +10,7 @@
*/
public class FileChangeEvent {

private final File file;
private final Path file;
private final Type type;

/**
Expand All @@ -19,7 +19,7 @@ public class FileChangeEvent {
* @param file the file which is being watched
* @param type the type of event that was encountered
*/
public FileChangeEvent(File file, Type type) {
public FileChangeEvent(Path file, Type type) {
this.file = file;
this.type = type;
}
Expand All @@ -29,7 +29,7 @@ public FileChangeEvent(File file, Type type) {
*
* @return the file which was being watched
*/
public File getFile() {
public Path getFile() {
return file;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
Expand All @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.jboss.logging.Logger;

Expand All @@ -43,9 +45,9 @@ public class WatchServiceFileSystemWatcher implements Runnable {
private static final AtomicInteger threadIdCounter = new AtomicInteger(0);

private WatchService watchService;
private final Map<File, PathData> files = Collections.synchronizedMap(new HashMap<File, PathData>());
private final Map<Path, PathData> monitoredDirectories = Collections.synchronizedMap(new HashMap<>());
private final Map<WatchKey, PathData> pathDataByKey = Collections
.synchronizedMap(new IdentityHashMap<WatchKey, PathData>());
.synchronizedMap(new IdentityHashMap<>());

private volatile boolean stopped = false;
private final Thread watchThread;
Expand All @@ -70,19 +72,19 @@ public void run() {
try {
PathData pathData = pathDataByKey.get(key);
if (pathData != null) {
final List<FileChangeEvent> results = new ArrayList<FileChangeEvent>();
final List<FileChangeEvent> results = new ArrayList<>();
List<WatchEvent<?>> events = key.pollEvents();
final Set<File> addedFiles = new HashSet<File>();
final Set<File> deletedFiles = new HashSet<File>();
final Set<Path> addedFiles = new HashSet<>();
final Set<Path> deletedFiles = new HashSet<>();
for (WatchEvent<?> event : events) {
Path eventPath = (Path) event.context();
File targetFile = ((Path) key.watchable()).resolve(eventPath).toFile();
Path targetFile = ((Path) key.watchable()).resolve(eventPath).toAbsolutePath();
FileChangeEvent.Type type;

if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
type = FileChangeEvent.Type.ADDED;
addedFiles.add(targetFile);
if (targetFile.isDirectory()) {
if (Files.isDirectory(targetFile)) {
try {
addWatchedDirectory(pathData, targetFile);
} catch (IOException e) {
Expand All @@ -107,6 +109,12 @@ public void run() {
Iterator<FileChangeEvent> it = results.iterator();
while (it.hasNext()) {
FileChangeEvent event = it.next();

if (!pathData.isMonitored(event.getFile())) {
it.remove();
continue;
}

if (event.getType() == FileChangeEvent.Type.MODIFIED) {
if (addedFiles.contains(event.getFile()) &&
deletedFiles.contains(event.getFile())) {
Expand Down Expand Up @@ -134,15 +142,15 @@ public void run() {
}

if (!results.isEmpty()) {
for (FileChangeCallback callback : pathData.callbacks) {
for (FileChangeCallback callback : pathData.getCallbacks()) {
invokeCallback(callback, results);
}
}
}
} finally {
//if the key is no longer valid remove it from the files list
if (!key.reset()) {
files.remove(key.watchable());
monitoredDirectories.remove(key.watchable());
}
}
}
Expand All @@ -156,39 +164,59 @@ public void run() {
}
}

public synchronized void watchPath(File file, FileChangeCallback callback) {
public synchronized void watchDirectoryRecursively(Path directory, FileChangeCallback callback) {
try {
PathData data = files.get(file);
Path absoluteDirectory = directory.toAbsolutePath();
PathData data = monitoredDirectories.get(absoluteDirectory);
if (data == null) {
Set<File> allDirectories = doScan(file).keySet();
Path path = Paths.get(file.toURI());
data = new PathData(path);
for (File dir : allDirectories) {
Set<Path> allDirectories = doScan(absoluteDirectory).keySet();
data = new PathData(absoluteDirectory, List.of());
for (Path dir : allDirectories) {
addWatchedDirectory(data, dir);
}
files.put(file, data);
monitoredDirectories.put(absoluteDirectory, data);
}
data.addCallback(callback);

} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* @param directory a directory that will be watched
* @param monitoredFiles list of monitored files relative to directory. An empty list will monitor all files.
* @param callback callback called when a file is changed
*/
public synchronized void watchFiles(Path directory, List<Path> monitoredFiles, FileChangeCallback callback) {
try {
Path absoluteDirectory = directory.toAbsolutePath();
PathData data = monitoredDirectories.get(absoluteDirectory);
if (data == null) {
data = new PathData(absoluteDirectory, monitoredFiles);
addWatchedDirectory(data, absoluteDirectory);
monitoredDirectories.put(absoluteDirectory, data);
}
data.callbacks.add(callback);
data.addCallback(callback);

} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void addWatchedDirectory(PathData data, File dir) throws IOException {
Path path = Paths.get(dir.toURI());
WatchKey key = path.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
private void addWatchedDirectory(PathData data, Path dir) throws IOException {
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
pathDataByKey.put(key, data);
data.keys.add(key);
data.addWatchKey(key);
}

public synchronized void unwatchPath(File file, final FileChangeCallback callback) {
PathData data = files.get(file);
public synchronized void unwatchPath(Path directory, final FileChangeCallback callback) {
PathData data = monitoredDirectories.get(directory);
if (data != null) {
data.callbacks.remove(callback);
if (data.callbacks.isEmpty()) {
files.remove(file);
for (WatchKey key : data.keys) {
data.removeCallback(callback);
if (data.getCallbacks().isEmpty()) {
monitoredDirectories.remove(directory);
for (WatchKey key : data.getWatchKeys()) {
key.cancel();
pathDataByKey.remove(key);
}
Expand All @@ -205,20 +233,21 @@ public void close() throws IOException {
}
}

private static Map<File, Long> doScan(File file) {
final Map<File, Long> results = new HashMap<File, Long>();
private static Map<Path, Long> doScan(Path directory) {
final Map<Path, Long> results = new HashMap<>();

final Deque<File> toScan = new ArrayDeque<File>();
toScan.add(file);
final Deque<Path> toScan = new ArrayDeque<>();
toScan.add(directory);
while (!toScan.isEmpty()) {
File next = toScan.pop();
if (next.isDirectory()) {
results.put(next, next.lastModified());
File[] list = next.listFiles();
if (list != null) {
for (File f : list) {
toScan.push(new File(f.getAbsolutePath()));
Path next = toScan.pop();
if (Files.isDirectory(next)) {
try {
results.put(next, Files.getLastModifiedTime(directory).toMillis());
try (Stream<Path> list = Files.list(next)) {
list.forEach(p -> toScan.push(p.toAbsolutePath()));
}
} catch (IOException e) {
throw new UncheckedIOException("Unable to scan: " + next, e);
}
}
}
Expand All @@ -234,12 +263,52 @@ private static void invokeCallback(FileChangeCallback callback, List<FileChangeE
}

private class PathData {
final Path path;
final List<FileChangeCallback> callbacks = new ArrayList<FileChangeCallback>();
final List<WatchKey> keys = new ArrayList<WatchKey>();

private PathData(Path path) {
private final Path path;
private final List<FileChangeCallback> callbacks = new ArrayList<>();
private final List<WatchKey> watchKeys = new ArrayList<>();
private final List<Path> monitoredFiles;

private PathData(Path path, List<Path> monitoredFiles) {
this.path = path;
this.monitoredFiles = monitoredFiles.stream().map(p -> path.resolve(p).toAbsolutePath())
.collect(Collectors.toList());
}

private void addWatchKey(WatchKey key) {
this.watchKeys.add(key);
}

private void addCallback(FileChangeCallback callback) {
this.callbacks.add(callback);
}

private void removeCallback(FileChangeCallback callback) {
this.callbacks.remove(callback);
}

private List<FileChangeCallback> getCallbacks() {
return callbacks;
}

private List<WatchKey> getWatchKeys() {
return watchKeys;
}

private boolean isMonitored(Path file) {
if (monitoredFiles.isEmpty()) {
return true;
}

Path absolutePath = file.isAbsolute() ? file : file.toAbsolutePath();

for (Path monitoredFile : monitoredFiles) {
if (monitoredFile.equals(absolutePath)) {
return true;
}
}

return false;
}
}

Expand Down
Loading

0 comments on commit 9a71a13

Please sign in to comment.