Skip to content

Commit

Permalink
Improved exception and shutdown handling
Browse files Browse the repository at this point in the history
- Added join for the CacheWriter thread, so it is not terminated hard
- Improved exception and interrupt handling, deleting files if they
  have a risk of being incomplete
  • Loading branch information
xpomul committed Aug 7, 2023
1 parent cfed903 commit 7576cc0
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ public CacheEntry findStoredClass(final String namespace, final URL sourceFileUR
// so, take it from there
storedClass = itemsInQueue.get(new CacheItemKey(directoryString, name));
if (storedClass == null) {
// else, read it from disk
// else, read it from disk (if it exists)
final File cachedBytecodeFile = new File(cacheDirectory, name);
storedClass = lockManager.executeRead(name, () -> read(name, cachedBytecodeFile));
if (cachedBytecodeFile.exists()) {
storedClass = lockManager.executeRead(name, () -> read(name, cachedBytecodeFile));
}
}
isCached = storedClass != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
Expand All @@ -33,6 +33,12 @@
* @author Martin Lippert
*/
public class CacheWriter {
/**
* A timeout value in milliseconds to wait for the writer thread to finish after
* signalling the interrupt.
*/
private static final long JOIN_TIMEOUT = 5000L;

/**
* A map for items that are currently contained in the {@link #cacheWriterQueue}
*/
Expand Down Expand Up @@ -62,13 +68,10 @@ public CacheWriter(final BlockingQueue<CacheItem> cacheQueue, final Map<CacheIte
try {
while (true) {
final CacheItem item = cacheQueue.take();
try {
store(item);
} catch (final IOException ioe) {
// storing in cache failed, do nothing
}
store(item);
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
});
this.writerThread.setPriority(Thread.MIN_PRIORITY);
Expand All @@ -86,6 +89,13 @@ public void start() {
*/
public void stop() {
this.writerThread.interrupt();
// wait for the writer thread to finish, so we don't System.exit() in the middle of writing...
try {
this.writerThread.join(JOIN_TIMEOUT);
} catch (InterruptedException e) {
Log.error("Interrupted while joining the writerThread", e);
Thread.currentThread().interrupt();
}
}

/**
Expand All @@ -97,7 +107,7 @@ public void stop() {
* @param item the cache item to store to disc
* @throws IOException if an error occurs while writing to the cache
*/
protected void store(final CacheItem item) throws IOException {
protected void store(final CacheItem item) {
// write out generated classes first
final Map<String, byte[]> generatedClasses = item.getGeneratedClasses();
if (generatedClasses != null) {
Expand All @@ -115,22 +125,44 @@ protected void store(final CacheItem item) throws IOException {
() -> storeSingleClass(item.getName(), item.getCachedBytes(), item.getDirectory()));
}

private void storeSingleClass(final String className, final byte[] classBytes, final String cacheDirectory)
throws FileNotFoundException, IOException {
private void storeSingleClass(final String className, final byte[] classBytes, final String cacheDirectory) {
final File directory = new File(cacheDirectory);
if (!directory.exists()) {
directory.mkdirs();
}

try (FileOutputStream fosCache = new FileOutputStream(new File(directory, className));
final File outputFile = new File(directory, className);
boolean success = true;
try (FileOutputStream fosCache = new FileOutputStream(outputFile);
DataOutputStream outCache = new DataOutputStream(new BufferedOutputStream(fosCache))) {
outCache.write(classBytes);
outCache.flush();
fosCache.getFD().sync();
} catch (IOException e) {
Log.error("Failed to store class " + className + " in cache", e);
success = false;
}

// if there was an error during writing the file, or if an interrupt happened,
// we have a risk of having written an incomplete file. To be sure, we check
// the length of the file on disk. If it does not match, we delete the file
// again.
if ((!success || Thread.currentThread().isInterrupted()) && outputFile.exists()
&& outputFile.length() != classBytes.length) {
Log.debug("File " + outputFile.getAbsolutePath() + " was not completely written to disk. Removing it.");
try {
Files.delete(outputFile.toPath());
} catch (IOException e) {
Log.error("File " + outputFile.getAbsolutePath() + " is corrupted but could not be deleted.", e);
// last resort: try to delete the file when the VM terminates
outputFile.deleteOnExit();
}
}

// after writing the file, remove the item from the itemsInQueue lookup map as
// well
// well - we do this even the writing was unsuccessful to keep the queue and the
// itemsInQueue in sync. It will do no further harm, just the file is not cached and will be woven and
// cached again next time.
itemsInQueue.remove(new CacheItemKey(cacheDirectory, className));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.eclipse.equinox.weaving.internal.caching;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -76,11 +75,8 @@ public interface ReadOperation {
public interface WriteOperation {
/**
* The logic to write a class file to the cache.
*
* @throws IOException in case of a file system error in the logic, this can be
* propagated to the caller.
*/
void write() throws IOException;
void write();
}

/**
Expand Down Expand Up @@ -110,7 +106,7 @@ private Lock acquireLock(final String classname, final boolean writeLock) {
// existingEntry == null means there is no lock for the given classname, so we
// create one.
final LockWithCount resultingEntry = existingEntry != null ? existingEntry : new LockWithCount();
// ge the lock that we need (read or write, depending on the parameter).
// get the lock that we need (read or write, depending on the parameter).
lockWrapper[0] = writeLock ? resultingEntry.lock.writeLock() : resultingEntry.lock.readLock();
// increment the lock counter
resultingEntry.count.incrementAndGet();
Expand Down Expand Up @@ -150,11 +146,8 @@ public byte[] executeRead(final String classname, final ReadOperation readOperat
*
* @param classname the classname defining the scope of the lock to acquire
* @param writeOperation the operation to execute
*
* @throws IOException any exception thrown by the writeOperation will be
* propagated to the caller
*/
public void executeWrite(final String classname, final WriteOperation writeOperation) throws IOException {
public void executeWrite(final String classname, final WriteOperation writeOperation) {
final Lock lock = acquireLock(classname, true);
try {
writeOperation.write();
Expand Down

0 comments on commit 7576cc0

Please sign in to comment.