Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu committed Jun 19, 2024
1 parent d1f25d0 commit abb64f3
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public abstract class PipeTabletEventBatch implements AutoCloseable {
private final int maxDelayInMs;
private long firstEventProcessingTime = Long.MIN_VALUE;

protected long totalBufferSize = 0;

protected volatile boolean isClosed = false;

protected PipeTabletEventBatch(final int maxDelayInMs) {
Expand Down Expand Up @@ -75,20 +77,20 @@ synchronized boolean onEvent(final TabletInsertionEvent event)
}
}

return getTotalBufferSize() >= getMaxBatchSizeInBytes()
return totalBufferSize >= getMaxBatchSizeInBytes()
|| System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs;
}

protected abstract void constructBatch(final TabletInsertionEvent event)
throws WALPipeException, IOException, WriteProcessException;

protected abstract long getTotalBufferSize();

protected abstract long getMaxBatchSizeInBytes();

public synchronized void onSuccess() {
events.clear();

totalBufferSize = 0;

firstEventProcessingTime = Long.MIN_VALUE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {

// limit in buffer size
private final PipeMemoryBlock allocatedMemoryBlock;
private long totalBufferSize = 0;

// Used to rate limit when transferring data
private final Map<String, Long> pipeName2BytesAccumulated = new HashMap<>();
Expand Down Expand Up @@ -103,7 +102,6 @@ public synchronized void onSuccess() {
insertNodeBuffers.clear();
tabletBuffers.clear();

totalBufferSize = 0;
pipeName2BytesAccumulated.clear();
}

Expand All @@ -117,11 +115,6 @@ protected long getMaxBatchSizeInBytes() {
return allocatedMemoryBlock.getMemoryUsageInBytes();
}

@Override
protected long getTotalBufferSize() {
return totalBufferSize;
}

public Map<String, Long> deepCopyPipeName2BytesAccumulated() {
return new HashMap<>(pipeName2BytesAccumulated);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
Expand Down Expand Up @@ -94,9 +95,8 @@ private File getNextBaseDir() throws DiskSpaceInsufficientException {
}
}

final String folder = FOLDER_MANAGER.get().getNextFolder();
final File baseDir = new File(folder, Long.toString(currentBatchId.get()));

final File baseDir =
new File(FOLDER_MANAGER.get().getNextFolder(), Long.toString(currentBatchId.get()));
if (baseDir.exists()) {
FileUtils.deleteQuietly(baseDir);
}
Expand All @@ -106,7 +106,9 @@ private File getNextBaseDir() throws DiskSpaceInsufficientException {
currentBatchId.get(),
baseDir.getPath());
throw new PipeException(
String.format("Failed to create batch file dir %s.", baseDir.getPath()));
String.format(
"Failed to create batch file dir %s. (Batch id = %s)",
baseDir.getPath(), currentBatchId.get()));
}
LOGGER.info(
"Batch id = {}: Create batch dir successfully, batch file dir = {}.",
Expand Down Expand Up @@ -179,6 +181,7 @@ private void writeTablet(final Tablet tablet, final boolean isAligned, final Str
fileWriter.write(tablet);
}

totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
pipeName2WeightMap.compute(pipeName, (name, weight) -> Objects.nonNull(weight) ? ++weight : 1);
}

Expand Down Expand Up @@ -208,11 +211,6 @@ public synchronized File sealTsFile() throws IOException {
return fileWriter.getIOWriter().getFile();
}

@Override
protected long getTotalBufferSize() {
return fileWriter.getIOWriter().getFile().length();
}

@Override
protected long getMaxBatchSizeInBytes() {
return maxSizeInBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
Expand Down Expand Up @@ -120,14 +120,15 @@ public TsFileInsertionDataContainer(
} else {
// We need to create these objects here and remove them later.
deviceIsAlignedMap = readDeviceIsAlignedMap();
memoryRequiredInBytes += PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
memoryRequiredInBytes += PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);

measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
memoryRequiredInBytes += PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);

deviceMeasurementsMap = tsFileSequenceReader.getDeviceMeasurementsMap();
memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
}
allocatedMemoryBlock = PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -110,7 +107,8 @@ public PipeTabletMemoryBlock forceAllocateWithRetry(Tablet tablet)

synchronized (this) {
final PipeTabletMemoryBlock block =
(PipeTabletMemoryBlock) forceAllocate(calculateTabletSizeInBytes(tablet), true);
(PipeTabletMemoryBlock)
forceAllocate(PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet), true);
usedMemorySizeInBytesOfTablets += block.getMemoryUsageInBytes();
return block;
}
Expand Down Expand Up @@ -183,63 +181,6 @@ public synchronized PipeMemoryBlock forceAllocateIfSufficient(
return null;
}

public static long calculateTabletSizeInBytes(Tablet tablet) {
long totalSizeInBytes = 0;

if (tablet == null) {
return totalSizeInBytes;
}

// timestamps
if (tablet.timestamps != null) {
totalSizeInBytes += tablet.timestamps.length * 8L;
}

// values
final List<MeasurementSchema> timeseries = tablet.getSchemas();
if (timeseries != null) {
for (int column = 0; column < timeseries.size(); column++) {
final MeasurementSchema measurementSchema = timeseries.get(column);
if (measurementSchema == null) {
continue;
}

final TSDataType tsDataType = measurementSchema.getType();
if (tsDataType == null) {
continue;
}

if (tsDataType.isBinary()) {
if (tablet.values == null || tablet.values.length <= column) {
continue;
}
final Binary[] values = ((Binary[]) tablet.values[column]);
if (values == null) {
continue;
}
for (Binary value : values) {
totalSizeInBytes +=
value == null ? 0 : (value.getLength() == -1 ? 0 : value.getLength());
}
} else {
totalSizeInBytes += (long) tablet.timestamps.length * tsDataType.getDataTypeSize();
}
}
}

// bitMaps
if (tablet.bitMaps != null) {
for (int i = 0; i < tablet.bitMaps.length; i++) {
totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : tablet.bitMaps[i].getSize();
}
}

// estimate other dataStructures size
totalSizeInBytes += 100;

return totalSizeInBytes;
}

public synchronized PipeMemoryBlock tryAllocate(long sizeInBytes) {
return tryAllocate(sizeInBytes, currentSize -> currentSize * 2 / 3);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;

import java.util.List;
import java.util.Map;

public class PipeMemoryWeighUtil {
public class PipeMemoryWeightUtil {

/** Estimates memory usage of a {@link Map}<{@link IDeviceID}, {@link Boolean}>. */
public static long memoryOfIDeviceId2Bool(Map<IDeviceID, Boolean> map) {
long usageInBytes = 0L;
Expand Down Expand Up @@ -57,4 +61,61 @@ public static long memoryOfIDeviceID2StrList(Map<IDeviceID, List<String>> map) {
}
return usageInBytes + 16L; // add the overhead of map
}

public static long calculateTabletSizeInBytes(Tablet tablet) {
long totalSizeInBytes = 0;

if (tablet == null) {
return totalSizeInBytes;
}

// timestamps
if (tablet.timestamps != null) {
totalSizeInBytes += tablet.timestamps.length * 8L;
}

// values
final List<MeasurementSchema> timeseries = tablet.getSchemas();
if (timeseries != null) {
for (int column = 0; column < timeseries.size(); column++) {
final MeasurementSchema measurementSchema = timeseries.get(column);
if (measurementSchema == null) {
continue;
}

final TSDataType tsDataType = measurementSchema.getType();
if (tsDataType == null) {
continue;
}

if (tsDataType.isBinary()) {
if (tablet.values == null || tablet.values.length <= column) {
continue;
}
final Binary[] values = ((Binary[]) tablet.values[column]);
if (values == null) {
continue;
}
for (Binary value : values) {
totalSizeInBytes +=
value == null ? 0 : (value.getLength() == -1 ? 0 : value.getLength());
}
} else {
totalSizeInBytes += (long) tablet.timestamps.length * tsDataType.getDataTypeSize();
}
}
}

// bitMaps
if (tablet.bitMaps != null) {
for (int i = 0; i < tablet.bitMaps.length; i++) {
totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : tablet.bitMaps[i].getSize();
}
}

// estimate other dataStructures size
totalSizeInBytes += 100;

return totalSizeInBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;

Expand Down Expand Up @@ -205,7 +205,8 @@ synchronized boolean cacheObjectsIfAbsent() throws IOException {
try (TsFileSequenceReader sequenceReader =
new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
memoryRequiredInBytes += PipeMemoryWeighUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);

deviceIsAlignedMap = new HashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
Expand All @@ -214,10 +215,10 @@ synchronized boolean cacheObjectsIfAbsent() throws IOException {
final Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIsAlignedIterator.next();
deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(), deviceIsAlignedPair.getRight());
}
memoryRequiredInBytes += PipeMemoryWeighUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);
memoryRequiredInBytes += PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(deviceIsAlignedMap);

measurementDataTypeMap = sequenceReader.getFullPathDataTypeMap();
memoryRequiredInBytes += PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
memoryRequiredInBytes += PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
}
// Release memory of TsFileSequenceReader.
allocatedMemoryBlock.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
import org.apache.iotdb.pipe.api.event.Event;
Expand Down Expand Up @@ -158,7 +158,7 @@ private void prefetchOnce() {
tablets.addAll(currentTablets);
calculatedTabletsSizeInBytes +=
currentTablets.stream()
.map((PipeMemoryManager::calculateTabletSizeInBytes))
.map((PipeMemoryWeightUtil::calculateTabletSizeInBytes))
.reduce(Long::sum)
.orElse(0L);
enrichedEvents.add((EnrichedEvent) event);
Expand All @@ -172,7 +172,7 @@ private void prefetchOnce() {
tablets.addAll(currentTablets);
calculatedTabletsSizeInBytes +=
currentTablets.stream()
.map((PipeMemoryManager::calculateTabletSizeInBytes))
.map((PipeMemoryWeightUtil::calculateTabletSizeInBytes))
.reduce(Long::sum)
.orElse(0L);
}
Expand Down

0 comments on commit abb64f3

Please sign in to comment.