From ace5d4856b4c0c2fcc9bb591b3af642268061abe Mon Sep 17 00:00:00 2001 From: zhujt Date: Thu, 22 Aug 2024 10:01:04 +0800 Subject: [PATCH 1/8] add encrypt related codes --- .../fast/reader/CompactionAlignedChunkReader.java | 10 +++++++--- .../executor/fast/reader/CompactionChunkReader.java | 7 ++++++- .../executor/readchunk/loader/InstantChunkLoader.java | 3 ++- .../executor/readchunk/loader/InstantPageLoader.java | 9 +++++++-- .../compaction/repair/RepairDataFileScanUtil.java | 6 ++++-- .../read/filescan/impl/DiskAlignedChunkHandleImpl.java | 9 ++++++++- .../read/filescan/impl/DiskChunkHandleImpl.java | 8 +++++++- .../dataregion/utils/SharedTimeDataBuffer.java | 6 +++++- pom.xml | 2 +- 9 files changed, 47 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java index 282035ee6619..7b7c5d9ac723 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionAlignedChunkReader.java @@ -24,6 +24,7 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.encrypt.IDecryptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.header.PageHeader; @@ -46,6 +47,8 @@ public class CompactionAlignedChunkReader { private final List valueChunkHeaderList = new ArrayList<>(); private final IUnCompressor timeUnCompressor; + + private final IDecryptor decryptor; private final Decoder timeDecoder = Decoder.getDecoderByType( TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), @@ -61,7 +64,7 @@ public class CompactionAlignedChunkReader { public CompactionAlignedChunkReader(Chunk timeChunk, List valueChunkList) { ChunkHeader timeChunkHeader = timeChunk.getHeader(); this.timeUnCompressor = IUnCompressor.getUnCompressor(timeChunkHeader.getCompressionType()); - + this.decryptor = timeChunk.getDecryptor(); valueChunkList.forEach( chunk -> { this.valueChunkHeaderList.add(chunk == null ? null : chunk.getHeader()); @@ -108,7 +111,7 @@ public AlignedPageReader getAlignedPageReader( // uncompress time page data ByteBuffer uncompressedTimePageData = - uncompressPageData(timePageHeader, timeUnCompressor, compressedTimePageData); + uncompressPageData(timePageHeader, timeUnCompressor, compressedTimePageData, decryptor); // uncompress value page datas List uncompressedValuePageDatas = new ArrayList<>(); List valueTypes = new ArrayList<>(); @@ -124,7 +127,8 @@ public AlignedPageReader getAlignedPageReader( uncompressPageData( valuePageHeaders.get(i), IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()), - compressedValuePageDatas.get(i))); + compressedValuePageDatas.get(i), + decryptor)); TSDataType valueType = valueChunkHeader.getDataType(); valueDecoders.add(Decoder.getDecoderByType(valueChunkHeader.getEncodingType(), valueType)); valueTypes.add(valueType); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java index 81836b43ea49..e84b176115e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java @@ -22,6 +22,7 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.encrypt.IDecryptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.MetaMarker; import org.apache.tsfile.file.header.ChunkHeader; @@ -46,6 +47,8 @@ public class CompactionChunkReader { private final ChunkHeader chunkHeader; private ByteBuffer chunkDataBuffer; private final IUnCompressor unCompressor; + + private final IDecryptor decryptor; private final Decoder timeDecoder = Decoder.getDecoderByType( TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), @@ -66,6 +69,7 @@ public CompactionChunkReader(Chunk chunk) { this.unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()); this.deleteIntervalList = chunk.getDeleteIntervalList(); this.chunkStatistic = chunk.getChunkStatistic(); + this.decryptor = chunk.getDecryptor(); } /** @@ -127,7 +131,8 @@ public static ByteBuffer readCompressedPageData(PageHeader pageHeader, ByteBuffe public TsBlock readPageData(PageHeader pageHeader, ByteBuffer compressedPageData) throws IOException { // uncompress page data - ByteBuffer pageData = uncompressPageData(pageHeader, unCompressor, compressedPageData); + ByteBuffer pageData = + uncompressPageData(pageHeader, unCompressor, compressedPageData, decryptor); // decode page data TSDataType dataType = chunkHeader.getDataType(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/loader/InstantChunkLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/loader/InstantChunkLoader.java index 41d88fc3d79b..9f82ba0623bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/loader/InstantChunkLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/loader/InstantChunkLoader.java @@ -93,7 +93,8 @@ public List getPages() { chunk.getHeader().getDataType(), chunk.getHeader().getEncodingType(), chunkMetadata, - pageModifiedStatus)); + pageModifiedStatus, + chunk.getDecryptor())); } return pageList; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/loader/InstantPageLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/loader/InstantPageLoader.java index 8da39abda78c..66ee0197fd29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/loader/InstantPageLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/readchunk/loader/InstantPageLoader.java @@ -22,6 +22,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.tsfile.compress.IUnCompressor; +import org.apache.tsfile.encrypt.IDecryptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.header.PageHeader; @@ -39,6 +40,8 @@ public class InstantPageLoader extends PageLoader { private ByteBuffer pageData; + private IDecryptor decryptor; + public InstantPageLoader() {} public InstantPageLoader( @@ -49,9 +52,11 @@ public InstantPageLoader( TSDataType dataType, TSEncoding encoding, ChunkMetadata chunkMetadata, - ModifiedStatus modifiedStatus) { + ModifiedStatus modifiedStatus, + IDecryptor decryptor) { super(file, pageHeader, compressionType, dataType, encoding, chunkMetadata, modifiedStatus); this.pageData = pageData; + this.decryptor = decryptor; } @Override @@ -62,7 +67,7 @@ public ByteBuffer getCompressedData() { @Override public ByteBuffer getUnCompressedData() throws IOException { IUnCompressor unCompressor = IUnCompressor.getUnCompressor(compressionType); - return uncompressPageData(pageHeader, unCompressor, pageData); + return uncompressPageData(pageHeader, unCompressor, pageData, decryptor); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java index e94bd9c78352..cb76b852cd6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java @@ -138,7 +138,8 @@ private void checkAlignedDeviceSeries( uncompressPageData( pageHeader, IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()), - pageData); + pageData, + timeChunk.getDecryptor()); Decoder decoder = Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()); while (decoder.hasNext(uncompressedPageData)) { @@ -196,7 +197,8 @@ private void checkSingleNonAlignedSeries( uncompressPageData( pageHeader, IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()), - pageData); + pageData, + chunk.getDecryptor()); ByteBuffer timeBuffer = getTimeBufferFromNonAlignedPage(uncompressedPageData); Decoder timeDecoder = Decoder.getDecoderByType( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskAlignedChunkHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskAlignedChunkHandleImpl.java index b231a4bc1185..f2e28c24f517 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskAlignedChunkHandleImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskAlignedChunkHandleImpl.java @@ -21,6 +21,7 @@ import org.apache.iotdb.db.storageengine.dataregion.utils.SharedTimeDataBuffer; +import org.apache.tsfile.encrypt.IDecryptor; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.TsFileSequenceReader; @@ -37,6 +38,8 @@ public class DiskAlignedChunkHandleImpl extends DiskChunkHandleImpl { private final SharedTimeDataBuffer sharedTimeDataBuffer; private int pageIndex = 0; + private IDecryptor decryptor; + public DiskAlignedChunkHandleImpl( IDeviceID deviceID, String measurement, @@ -53,13 +56,17 @@ public DiskAlignedChunkHandleImpl( protected void init(TsFileSequenceReader reader) throws IOException { sharedTimeDataBuffer.init(reader); super.init(reader); + this.decryptor = reader.getDecryptor(); } @Override public long[] getDataTime() throws IOException { ByteBuffer currentPageDataBuffer = ChunkReader.deserializePageData( - this.currentPageHeader, this.currentChunkDataBuffer, this.currentChunkHeader); + this.currentPageHeader, + this.currentChunkDataBuffer, + this.currentChunkHeader, + this.decryptor); int size = ReadWriteIOUtils.readInt(currentPageDataBuffer); byte[] bitmap = new byte[(size + 7) / 8]; currentPageDataBuffer.get(bitmap); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskChunkHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskChunkHandleImpl.java index 68ad659d90f5..63a63251d6b0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskChunkHandleImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/DiskChunkHandleImpl.java @@ -24,6 +24,7 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.encrypt.IDecryptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.MetaMarker; import org.apache.tsfile.file.header.ChunkHeader; @@ -46,6 +47,7 @@ public class DiskChunkHandleImpl implements IChunkHandle { private final IDeviceID deviceID; private final String measurement; private final String filePath; + private IDecryptor decryptor; protected ChunkHeader currentChunkHeader; protected PageHeader currentPageHeader; protected ByteBuffer currentChunkDataBuffer; @@ -81,6 +83,7 @@ protected void init(TsFileSequenceReader reader) throws IOException { Chunk chunk = reader.readMemChunk(offset); this.currentChunkDataBuffer = chunk.getData(); this.currentChunkHeader = chunk.getHeader(); + this.decryptor = chunk.getDecryptor(); } // Check if there is more pages to be scanned in Chunk. @@ -127,7 +130,10 @@ public long[] getPageStatisticsTime() { public long[] getDataTime() throws IOException { ByteBuffer currentPageDataBuffer = ChunkReader.deserializePageData( - currentPageHeader, this.currentChunkDataBuffer, this.currentChunkHeader); + currentPageHeader, + this.currentChunkDataBuffer, + this.currentChunkHeader, + this.decryptor); int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(currentPageDataBuffer); ByteBuffer timeBuffer = currentPageDataBuffer.slice(); timeBuffer.limit(timeBufferLength); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/SharedTimeDataBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/SharedTimeDataBuffer.java index 8e01e85757eb..0813ceb6e0f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/SharedTimeDataBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/SharedTimeDataBuffer.java @@ -21,6 +21,7 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.encrypt.IDecryptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.header.PageHeader; @@ -45,6 +46,8 @@ public class SharedTimeDataBuffer { TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), TSDataType.INT64); + private IDecryptor decryptor; + public SharedTimeDataBuffer(IChunkMetadata timeChunkMetaData) { this.timeChunkMetaData = timeChunkMetaData; this.timeData = new ArrayList<>(); @@ -58,6 +61,7 @@ public void init(TsFileSequenceReader reader) throws IOException { Chunk timeChunk = reader.readMemChunk(timeChunkMetaData.getOffsetOfChunkHeader()); timeChunkHeader = timeChunk.getHeader(); timeBuffer = timeChunk.getData(); + decryptor = timeChunk.getDecryptor(); } public long[] getPageTime(int pageId) throws IOException { @@ -82,7 +86,7 @@ private void loadPageData() throws IOException { ? PageHeader.deserializeFrom(timeBuffer, timeChunkMetaData.getStatistics()) : PageHeader.deserializeFrom(timeBuffer, timeChunkHeader.getDataType()); ByteBuffer timePageData = - ChunkReader.deserializePageData(timePageHeader, timeBuffer, timeChunkHeader); + ChunkReader.deserializePageData(timePageHeader, timeBuffer, timeChunkHeader, decryptor); long[] pageData = new long[(int) timePageHeader.getNumOfValues()]; int index = 0; while (defaultTimeDecoder.hasNext(timePageData)) { diff --git a/pom.xml b/pom.xml index a9ad6a420872..e5257f3a54d0 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ 0.14.1 1.9 1.5.6-3 - 1.2.0-b31fb57c-SNAPSHOT + 1.2.0-SNAPSHOT @@ -166,7 +167,7 @@ 0.14.1 1.9 1.5.6-3 - 1.2.0-240830-SNAPSHOT + 1.2.0-SNAPSHOT