Skip to content

Commit

Permalink
apacheGH-3015: Close files after TDB2 compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
afs committed Feb 26, 2025
1 parent 36364d7 commit a3983b2
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.lib.tuple.Tuple;
import org.apache.jena.dboe.base.file.Location;
import org.apache.jena.dboe.storage.StoragePrefixes;
import org.apache.jena.dboe.storage.system.DatasetGraphStorage;
import org.apache.jena.dboe.trans.bplustree.BPlusTree;
import org.apache.jena.dboe.transaction.txn.TransactionalSystem;
Expand All @@ -42,15 +41,17 @@
public class DatasetGraphTDB extends DatasetGraphStorage
{
private final StorageTDB storageTDB;
private final StoragePrefixesTDB storagePrefixes;
private final Location location;
private final TransactionalSystem txnSystem;
private final StoreParams storeParams;
private final ReorderTransformation reorderTransformation;
private boolean isClosed = false;

public DatasetGraphTDB(Location location, StoreParams params, ReorderTransformation reorderTransformation,
StorageTDB storage, StoragePrefixes prefixes, TransactionalSystem txnSystem) {
StorageTDB storage, StoragePrefixesTDB prefixes, TransactionalSystem txnSystem) {
super(storage, prefixes, txnSystem);
this.storagePrefixes = prefixes;
this.storageTDB = storage;
this.location = location;
this.storeParams = params;
Expand Down Expand Up @@ -96,13 +97,17 @@ public ReorderTransformation getReorderTransform() {

@Override
public void close() {
if ( isClosed )
return;
isClosed = true;
super.close();
storageTDB.close();
storagePrefixes.close();
}

public void shutdown() {
close();
txnSystem.getTxnMgr().shutdown();
close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@

public class StoragePrefixesTDB implements StoragePrefixes {

static final RecordFactory factory = new RecordFactory(3*NodeId.SIZE, 0);
/*package*/ static final RecordFactory factory = new RecordFactory(3*NodeId.SIZE, 0);
private TransactionalSystem txnSystem;
private NodeTupleTable prefixTable;
private boolean closed = false;

public StoragePrefixesTDB(TransactionalSystem txnSystem, NodeTupleTable prefixTable) {
this.txnSystem = txnSystem;
Expand Down Expand Up @@ -149,4 +150,14 @@ private void ensureWriteTxn() {
throw new TransactionException("Not in a transaction");
txn.ensureWriteTxn();
}

// This does not need to be synchronized.
// The caller is responsible for a quiet system (e.g. no transactions active)
/*package*/ void close() {
if ( closed )
return;
closed = true;
prefixTable.close();
}

}
12 changes: 10 additions & 2 deletions jena-tdb2/src/main/java/org/apache/jena/tdb2/store/StorageTDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@

/** {@link StorageRDF} for TDB2 */
public class StorageTDB implements StorageRDF {
// SWITCHING. This could be the switch point, not the DatasetGraph. Probably makes little difference.
private TripleTable tripleTable;
private QuadTable quadTable;
private TransactionalSystem txnSystem;
// SWITCHING.

// In notifyAdd and notifyDelete, check whether the change is a real change or not.
// e.g. Adding a quad already present is not a real change.
Expand Down Expand Up @@ -202,4 +200,14 @@ private void ensureWriteTxn() {
throw new TransactionException("Not in a write transaction");
txn.ensureWriteTxn();
}

// This does not need to be synchronized.
// The caller is responsible for a quiet system (e.g. no transactions active)
/*package*/ void close() {
if ( closed )
return;
closed = true;
tripleTable.close();
quadTable.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.jena.dboe.index.Index;
import org.apache.jena.dboe.index.RangeIndex;
import org.apache.jena.dboe.storage.DatabaseRDF;
import org.apache.jena.dboe.storage.StoragePrefixes;
import org.apache.jena.dboe.sys.Names;
import org.apache.jena.dboe.trans.bplustree.BPlusTree;
import org.apache.jena.dboe.trans.bplustree.BPlusTreeFactory;
Expand Down Expand Up @@ -95,7 +94,7 @@ public static DatasetGraphTDB build(Location location, StoreParams params, Reord

TDB2StorageBuilder builder = new TDB2StorageBuilder(txnSystem, location, params, new ComponentIdMgr(UUID.randomUUID()));
StorageTDB storage = builder.buildStorage();
StoragePrefixes prefixes = builder.buildPrefixes();
StoragePrefixesTDB prefixes = builder.buildPrefixes();

// Finalize.
builder.components.forEach(txnCoord::add);
Expand Down Expand Up @@ -193,7 +192,7 @@ private StorageTDB buildStorage() {
return dsg;
}

private StoragePrefixes buildPrefixes() {
private StoragePrefixesTDB buildPrefixes() {
NodeTable nodeTablePrefixes = buildNodeTable(params.getPrefixTableBaseName(), false);
StoragePrefixesTDB prefixes = buildPrefixTable(nodeTablePrefixes);
return prefixes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ public static void compact(DatasetGraphSwitchable container) {
public static void compact(DatasetGraphSwitchable container, boolean shouldDeleteOld) {
if ( Sys.isWindows) {
// Windows does not support Files.move when the directory contains memory mapped files.
// https://github.com/apache/jena/issues/2315
// MS Windows: 2024-03-08 https://github.com/apache/jena/issues/2315
// Moving the temporary directory does not work.
DatabaseOpsWindows.compact_win(container, shouldDeleteOld);
return;
}
Expand Down Expand Up @@ -342,12 +343,11 @@ public static void compact(DatasetGraphSwitchable container, boolean shouldDelet
LOG.debug(String.format("Compact %s -> %s\n", db1.getFileName(), db2.getFileName()));
if ( Files.exists(db2) )
throw new TDBException("Inconsistent : "+db2+" already exists");

// MS Windows: 2024-03-08 https://github.com/apache/jena/issues/2315
// Moving the temporary directory does not work.
// End checks

// Location of the storage area for the compacted database.
// This is a temporary directory that is atomically moved into place when complete.
// This is not supported by MS Windows.

Path tmpDir = makeTempDirName(db2);
if ( Files.exists(tmpDir) )
Expand All @@ -358,6 +358,7 @@ public static void compact(DatasetGraphSwitchable container, boolean shouldDelet
try {
compaction(container, loc1, loc2tmp, db2);
// Container now using the new location.
// The original database is not in use.
} catch (RuntimeIOException ex) {
// Clear up - disk problems.
try { IO.deleteAll(tmpDir); } catch (Throwable th) { /* Continue with original error. */ }
Expand Down Expand Up @@ -473,7 +474,9 @@ private static void compaction(DatasetGraphSwitchable container, Location loc1,
// This call is not undone.
// Database1 is no longer in use.
txnMgr1.startExclusiveMode();

// Clean-up.
// Includes dsgBase.shutdown() which closes files.
StoreConnection.release(dsgBase.getLocation());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.jena.atlas.io.IO;
import org.apache.jena.atlas.io.IOX;
import org.apache.jena.atlas.lib.InternalErrorException;
import org.apache.jena.atlas.lib.Lib;
import org.apache.jena.atlas.logging.FmtLog;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.DBOpEnvException;
Expand All @@ -49,9 +48,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Old DatabaseOps code for compaction (up to Jena 5.0.0-rc1).
* Needed for MS Windows.
*
/**
* Old DatabaseOps code for compaction (up to Jena 5.0.0-rc1).
* Needed for MS Windows:
* (1) does not allow directories with memory -mapped file to be renamed/moved
* (2) does not release memory-mapped files until the JVM exits
*/
class DatabaseOpsWindows {
private static Logger LOG = LoggerFactory.getLogger(DatabaseOpsWindows.class);
Expand Down Expand Up @@ -101,8 +102,9 @@ class DatabaseOpsWindows {
LOG.debug(String.format("Compact %s -> %s\n", db1.getFileName(), db2.getFileName()));

try {
compaction(container, loc1, loc2);
compaction_win(container, loc1, loc2);
// Container now using the new location.
// The original database is not in use.
} catch (RuntimeIOException ex) {
// Clear up - disk problems.
try { IO.deleteAll(db2); } catch (Throwable th) { /* Continue with original error. */ }
Expand All @@ -123,25 +125,16 @@ class DatabaseOpsWindows {
if ( shouldDeleteOld ) {
// Compact put each of the databases into exclusive mode to do the switchover.
// There are no previous transactions on the old database at this point.
// MS Windows. Does not take effect until the JVM exits.
Path loc1Path = IO_DB.asPath(loc1);
LOG.warn("Database will not be fully deleted until after a server restart: (old db path='" + loc1Path + "')");
deleteDatabase(loc1Path);
}
}
}

private static void deleteDatabase(Path locationPath) {
try (Stream<Path> walk = Files.walk(locationPath)){
walk.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException ex) {
throw IOX.exception(ex);
}
}

/** Copy the latest version from one location to another. */
private static void compaction(DatasetGraphSwitchable container, Location loc1, Location loc2) {
private static void compaction_win(DatasetGraphSwitchable container, Location loc1, Location loc2) {
if ( loc1.isMem() || loc2.isMem() )
throw new TDBException("Compact involves a memory location: "+loc1+" : "+loc2);

Expand Down Expand Up @@ -180,13 +173,6 @@ private static void compaction(DatasetGraphSwitchable container, Location loc1,
DatasetGraphTDB dsgCompact = StoreConnection.connectCreate(loc2).getDatasetGraphTDB();
CopyDSG.copy(dsgBase, dsgCompact);

if ( false ) {
// DEVELOMENT. FAke a long copy time in state copy.
System.err.println("-- Inside compact 1");
Lib.sleep(3_000);
System.err.println("-- Inside compact 2");
}

TransactionCoordinator txnMgr2 = dsgCompact.getTxnSystem().getTxnMgr();
// Update TransactionCoordinator and switch over.
txnMgr2.execExclusive(()->{
Expand All @@ -213,7 +199,6 @@ private static void compaction(DatasetGraphSwitchable container, Location loc1,
// New database running.
// New transactions go to this database.
// Old readers continue on db1.

});


Expand All @@ -222,10 +207,22 @@ private static void compaction(DatasetGraphSwitchable container, Location loc1,
// This call is not undone.
// Database1 is no longer in use.
txnMgr1.startExclusiveMode();

// Clean-up.
// MS Windows does not fully release memory mapped until the JVM exits.
StoreConnection.release(dsgBase.getLocation());
}

private static void deleteDatabase(Path locationPath) {
try (Stream<Path> walk = Files.walk(locationPath)){
walk.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException ex) {
throw IOX.exception(ex);
}
}

/** Copy certain configuration files from {@code loc1} to {@code loc2}. */
private static void copyConfigFiles(Location loc1, Location loc2) {
FileFilter copyFiles = (pathname)->{
Expand Down Expand Up @@ -302,7 +299,6 @@ private static List<Path> scanForDirByPatternX(Path directory, String namebase,
return paths;
}


private static Pattern numberPattern = Pattern.compile("[\\d]+");
/** Given a filename in "base-NNNN(-text)" format, return the value of NNNN */
private static int extractIndexX(String name, String namebase, String nameSep) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ public static synchronized void internalExpel(Location location, boolean force)

// No transactions at this point
// (or we don't care and are clearing up forcefully.)
// This closes open files.

sConn.getDatasetGraphTDB().shutdown();
// Done by DatasetGraphTDB()
//txnCoord.shutdown();

sConn.isValid = false;
cache.remove(location);
Expand Down

0 comments on commit a3983b2

Please sign in to comment.