Skip to content

Commit

Permalink
Update PipeTabletEventTsFileBatch.java
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu committed Jun 19, 2024
1 parent 6f8b5a6 commit d1f25d0
Showing 1 changed file with 101 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -52,103 +53,66 @@
public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventTsFileBatch.class);
private static final String TS_FILE_PREFIX = "sender_batch";
protected final AtomicReference<File> batchFileDirWithIdSuffix = new AtomicReference<>();

// Used to generate transfer id, which is used to identify a tsfile batch instance
private static final AtomicReference<FolderManager> FOLDER_MANAGER = new AtomicReference<>();
private static final AtomicLong BATCH_ID_GENERATOR = new AtomicLong(0);
protected final AtomicLong batchId = new AtomicLong(0);
private final AtomicLong tsFileId = new AtomicLong(0);
private static final List<String> BATCH_FILE_BASE_DIRS =
Arrays.stream(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs())
.map(fileDir -> fileDir + File.separator + ".batch")
.collect(Collectors.toList());
private static FolderManager folderManager = null;
private final AtomicLong currentBatchId = new AtomicLong(BATCH_ID_GENERATOR.incrementAndGet());
private final File batchFileBaseDir;

private static final String TS_FILE_PREFIX = "tablet_batch";
private final AtomicLong tsFileIdGenerator = new AtomicLong(0);

private final long maxSizeInBytes;
private TsFileWriter fileWriter;

private final Map<String, Double> pipeName2WeightMap = new HashMap<>();

static {
try {
folderManager =
new FolderManager(BATCH_FILE_BASE_DIRS, DirectoryStrategyType.SEQUENCE_STRATEGY);
} catch (final DiskSpaceInsufficientException e) {
LOGGER.error(
"Fail to create pipe receiver file folders allocation strategy because all disks of folders are full.",
e);
}
}
private volatile TsFileWriter fileWriter;

PipeTabletEventTsFileBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) {
super(maxDelayInMs);
this.maxSizeInBytes = requestMaxBatchSizeInBytes;

batchId.set(BATCH_ID_GENERATOR.incrementAndGet());
this.maxSizeInBytes = requestMaxBatchSizeInBytes;
try {
this.batchFileBaseDir = getNextBaseDir();
} catch (final Exception e) {
throw new PipeException(
String.format("Failed to create file dir for batch: %s", e.getMessage()));
}
}

// Clear the original batch file dir if exists
if (batchFileDirWithIdSuffix.get() != null) {
if (batchFileDirWithIdSuffix.get().exists()) {
try {
FileUtils.deleteDirectory(batchFileDirWithIdSuffix.get());
LOGGER.info(
"Batch id = {}: Original batch file dir {} was deleted.",
batchId.get(),
batchFileDirWithIdSuffix.get().getPath());
} catch (Exception e) {
LOGGER.warn(
"Batch id = {}: Failed to delete original batch file dir {}, because {}.",
batchId.get(),
batchFileDirWithIdSuffix.get().getPath(),
e.getMessage(),
e);
}
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Original batch file dir {} is not existed. No need to delete.",
batchId.get(),
batchFileDirWithIdSuffix.get().getPath());
private File getNextBaseDir() throws DiskSpaceInsufficientException {
if (FOLDER_MANAGER.get() == null) {
synchronized (FOLDER_MANAGER) {
if (FOLDER_MANAGER.get() == null) {
FOLDER_MANAGER.set(
new FolderManager(
Arrays.stream(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs())
.map(fileDir -> fileDir + File.separator + ".batch")
.collect(Collectors.toList()),
DirectoryStrategyType.SEQUENCE_STRATEGY));
}
}
batchFileDirWithIdSuffix.set(null);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Current batch file dir is null. No need to delete.", batchId.get());
}
}

final String batchFileBaseDir;
final String errorMsg =
String.format(
"Batch id = %s: Failed to init pipe batch file folder manager because all disks of folders are full.",
batchId.get());
try {
batchFileBaseDir = Objects.isNull(folderManager) ? null : folderManager.getNextFolder();
if (Objects.isNull(batchFileBaseDir)) {
throw new PipeException(errorMsg);
}
} catch (final Exception e) {
throw new PipeException(errorMsg, e);
}
final String folder = FOLDER_MANAGER.get().getNextFolder();
final File baseDir = new File(folder, Long.toString(currentBatchId.get()));

// Create a new batch file dir
final File newBatchDir = new File(batchFileBaseDir, Long.toString(batchId.get()));
if (!newBatchDir.exists() && !newBatchDir.mkdirs()) {
if (baseDir.exists()) {
FileUtils.deleteQuietly(baseDir);
}
if (!baseDir.exists() && !baseDir.mkdirs()) {
LOGGER.warn(
"Batch id = {}: Failed to create batch file dir {}.",
batchId.get(),
newBatchDir.getPath());
currentBatchId.get(),
baseDir.getPath());
throw new PipeException(
String.format("Failed to create batch file dir %s.", newBatchDir.getPath()));
String.format("Failed to create batch file dir %s.", baseDir.getPath()));
}
batchFileDirWithIdSuffix.set(newBatchDir);

LOGGER.info(
"Batch id = {}: Create batch dir successfully, batch file dir = {}.",
batchId.get(),
newBatchDir.getPath());
currentBatchId.get(),
baseDir.getPath());
return baseDir;
}

@Override
Expand All @@ -158,33 +122,43 @@ protected void constructBatch(final TabletInsertionEvent event)
fileWriter =
new TsFileWriter(
new File(
batchFileDirWithIdSuffix.get(),
batchFileBaseDir,
TS_FILE_PREFIX
+ "_"
+ tsFileId.getAndIncrement()
+ tsFileIdGenerator.getAndIncrement()
+ TsFileConstant.TSFILE_SUFFIX));
}

if (event instanceof PipeInsertNodeTabletInsertionEvent) {
final List<Tablet> tablets = ((PipeInsertNodeTabletInsertionEvent) event).convertToTablets();
final PipeInsertNodeTabletInsertionEvent insertNodeTabletInsertionEvent =
(PipeInsertNodeTabletInsertionEvent) event;
final List<Tablet> tablets = insertNodeTabletInsertionEvent.convertToTablets();
for (int i = 0; i < tablets.size(); ++i) {
final Tablet tablet = tablets.get(i);
if (tablet.rowSize == 0) {
continue;
}
writeTablet(
tablet,
((PipeInsertNodeTabletInsertionEvent) event).isAligned(i),
((PipeInsertNodeTabletInsertionEvent) event).getPipeName());
insertNodeTabletInsertionEvent.isAligned(i),
insertNodeTabletInsertionEvent.getPipeName());
}
} else if (event instanceof PipeRawTabletInsertionEvent) {
final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) event;
writeTablet(
((PipeRawTabletInsertionEvent) event).convertToTablet(),
((PipeRawTabletInsertionEvent) event).isAligned(),
((PipeRawTabletInsertionEvent) event).getPipeName());
rawTabletInsertionEvent.convertToTablet(),
rawTabletInsertionEvent.isAligned(),
rawTabletInsertionEvent.getPipeName());
} else {
LOGGER.warn(
"Batch id = {}: Unsupported event {} type {} when constructing tsfile batch",
currentBatchId.get(),
event,
event.getClass());
}
}

// TODO: Change the logic in table model
private void writeTablet(final Tablet tablet, final boolean isAligned, final String pipeName)
throws IOException, WriteProcessException {
if (isAligned) {
Expand All @@ -204,12 +178,15 @@ private void writeTablet(final Tablet tablet, final boolean isAligned, final Str
}
fileWriter.write(tablet);
}

pipeName2WeightMap.compute(pipeName, (name, weight) -> Objects.nonNull(weight) ? ++weight : 1);
}

public Map<String, Double> deepCopyPipeName2WeightMap() {
final double sum =
pipeName2WeightMap.values().stream().reduce(Double::sum).orElse(Double.MIN_VALUE);
final double sum = pipeName2WeightMap.values().stream().reduce(Double::sum).orElse(0.0);
if (sum == 0.0) {
return Collections.emptyMap();
}
pipeName2WeightMap.entrySet().forEach(entry -> entry.setValue(entry.getValue() / sum));
return new HashMap<>(pipeName2WeightMap);
}
Expand All @@ -220,20 +197,15 @@ public synchronized File sealTsFile() throws IOException {
}

fileWriter.close();
return fileWriter.getIOWriter().getFile();
}

@Override
public synchronized void onSuccess() {
super.onSuccess();

if (isClosed) {
return;
final File sealedFile = fileWriter.getIOWriter().getFile();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Batch id = {}: Seal tsfile {} successfully.",
currentBatchId.get(),
sealedFile.getPath());
}

// Delete file only after this file is transferred
pipeName2WeightMap.clear();
fileWriter = null;
return fileWriter.getIOWriter().getFile();
}

@Override
Expand All @@ -246,19 +218,47 @@ protected long getMaxBatchSizeInBytes() {
return maxSizeInBytes;
}

@Override
public synchronized void onSuccess() {
super.onSuccess();

pipeName2WeightMap.clear();

// We don't need to delete the tsFile here, because the tsFile
// will be deleted after the file is transferred.
fileWriter = null;
}

@Override
public synchronized void close() {
super.close();

pipeName2WeightMap.clear();

if (Objects.nonNull(fileWriter)) {
try {
fileWriter.close();
} catch (final Exception e) {
LOGGER.info(
"Batch id = {}: Failed to close the tsfile {} when trying to close batch, because {}",
currentBatchId.get(),
fileWriter.getIOWriter().getFile().getPath(),
e.getMessage(),
e);
}

try {
FileUtils.delete(fileWriter.getIOWriter().getFile());
} catch (final IOException e) {
} catch (final Exception e) {
LOGGER.info(
"Failed to delete the tsFile when trying to close tsFile batch, may need to delete manually, because {}",
e.getMessage());
"Batch id = {}: Failed to delete the tsfile {} when trying to close batch, because {}",
currentBatchId.get(),
fileWriter.getIOWriter().getFile().getPath(),
e.getMessage(),
e);
}

fileWriter = null;
}
}
}

0 comments on commit d1f25d0

Please sign in to comment.