From 2ccb152d42e414113a898b7eddd6e164df7bafcd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 26 Jun 2026 14:19:52 +0800 Subject: [PATCH] Pipe: merge batched aligned chunks in scan parser (#18010) * Pipe: merge batched aligned chunks in scan parser * Test pipe batched aligned chunk memory boundaries * Pipe: fix batched aligned scan parser memory split * Update TsFileInsertionEventParserTest.java * Rename pending aligned chunk consumer (cherry picked from commit f96fc5824f62b4c637c0d9b5e9ea4adc9f8b1853) --- .../scan/SinglePageWholeChunkReader.java | 33 +- .../TsFileInsertionScanDataContainer.java | 527 +++++++++++------- .../resource/memory/PipeMemoryWeightUtil.java | 18 +- .../TsFileInsertionDataContainerTest.java | 312 +++++++++++ 4 files changed, 691 insertions(+), 199 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java index f41a6861120bd..1f741ddfc70fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan; +import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.enums.TSDataType; @@ -159,22 +160,44 @@ static List toSuffixMaxList(final List pageEstimatedMemoryUsageInByt static long estimatePageMemoryUsageInBytesWithBatchData( final PageHeader timePageHeader, final Chunk timeChunk, - final List valueDataTypeList) { + final List valueDataTypeList) + throws IOException { return estimatePageMemoryUsageInBytesWithBatchData( timePageHeader.getUncompressedSize(), getPageRowCount(timePageHeader, timeChunk), valueDataTypeList); } - static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) { + static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) throws IOException { if (isSinglePageChunk(chunk.getHeader())) { - return Objects.isNull(chunk.getChunkStatistic()) - ? 0 - : saturateToInt(chunk.getChunkStatistic().getCount()); + if (Objects.nonNull(chunk.getChunkStatistic())) { + return saturateToInt(chunk.getChunkStatistic().getCount()); + } + return isTimeChunk(chunk.getHeader()) ? countSinglePageTimeValues(chunk) : 0; } return saturateToInt(pageHeader.getNumOfValues()); } + private static int countSinglePageTimeValues(final Chunk chunk) throws IOException { + final ByteBuffer chunkDataBuffer = chunk.getData().duplicate(); + final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, chunk.getHeader()); + final ByteBuffer pageData = deserializePageData(pageHeader, chunkDataBuffer, chunk.getHeader()); + final Decoder decoder = + Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), TSDataType.INT64); + + int rowCount = 0; + while (decoder.hasNext(pageData)) { + decoder.readLong(pageData); + ++rowCount; + } + return rowCount; + } + + private static boolean isTimeChunk(final ChunkHeader chunkHeader) { + return (chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK; + } + private static int saturateToInt(final long value) { return (int) Math.min(Integer.MAX_VALUE, value); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java index 6c3d341f7740a..6a0cfa4f3c3b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java @@ -67,6 +67,7 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.LocalDate; import java.util.ArrayList; import java.util.Collections; @@ -103,11 +104,11 @@ public class TsFileInsertionScanDataContainer extends TsFileInsertionDataContain // Cached time chunk private final List timeChunkList = new ArrayList<>(); private final List isMultiPageList = new ArrayList<>(); - private final List timeChunkPageMemorySizeList = new ArrayList<>(); private final Map measurementIndexMap = new HashMap<>(); - private int lastIndex = -1; - private Chunk firstChunk4NextSequentialValueChunks; + private final List pendingAlignedChunkGroups = new ArrayList<>(); + private long pendingAlignedChunkSize; + private CachedAlignedValueChunk cachedAlignedValueChunk; private byte lastMarker = Byte.MIN_VALUE; @@ -521,14 +522,14 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin private void moveToNextChunkReader() throws IOException, IllegalStateException { ChunkHeader chunkHeader; - long valueChunkSize = 0; - long valueChunkPageMemorySize = 0; - final List valueChunkList = new ArrayList<>(); currentMeasurements.clear(); modsInfos.clear(); if (lastMarker == MetaMarker.SEPARATOR) { - chunkReader = null; + if (!useNextPendingAlignedChunk(lastMarker)) { + clearCachedAlignedChunkData(); + chunkReader = null; + } return; } @@ -536,8 +537,8 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { while ((marker = lastMarker != Byte.MIN_VALUE ? lastMarker - : Objects.nonNull(firstChunk4NextSequentialValueChunks) - ? toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader()) + : Objects.nonNull(cachedAlignedValueChunk) + ? toValueChunkMarker(cachedAlignedValueChunk.chunk.getHeader()) : tsFileSequenceReader.readMarker()) != MetaMarker.SEPARATOR) { lastMarker = Byte.MIN_VALUE; @@ -550,70 +551,19 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { // Notice that the data in one chunk group is either aligned or non-aligned // There is no need to consider non-aligned chunks when there are value chunks currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER; - long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; + final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - final long nextMarkerOffset = - tsFileSequenceReader.position() + chunkHeader.getDataSize(); - - if (Objects.isNull(currentDevice)) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - - if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) - == TsFileConstant.TIME_COLUMN_MASK) { - final Chunk timeChunk = - new Chunk( - chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); - final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER; - timeChunkList.add(timeChunk); - isMultiPageList.add(isMultiPage); - timeChunkPageMemorySizeList.add( - SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes( - timeChunk)); - break; - } - if (!pattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { - tsFileSequenceReader.position(nextMarkerOffset); + if (filterChunk(currentChunkHeaderOffset, chunkHeader, false, marker)) { break; } - // Skip the chunk if it is fully deleted by mods - if (!currentModifications.isEmpty()) { - Statistics statistics = null; - try { - statistics = - findNonAlignedChunkStatistics( - tsFileSequenceReader.getIChunkMetadataList( - CompactionPathUtils.getPath( - currentDevice, chunkHeader.getMeasurementID())), - currentChunkHeaderOffset); - } catch (IllegalPathException ignore) { - LOGGER.warn( - "Failed to get chunk metadata for {}.{}", - currentDevice, - chunkHeader.getMeasurementID()); - } - - if (statistics != null - && ModsOperationUtil.isAllDeletedByMods( - currentDevice, - chunkHeader.getMeasurementID(), - statistics.getStartTime(), - statistics.getEndTime(), - currentModifications)) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - } - if (chunkHeader.getDataSize() > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlockForChunk, chunkHeader.getDataSize()); } - Chunk chunk = + final Chunk chunk = new Chunk( chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); final List pageEstimatedMemoryUsageInBytesList = @@ -638,49 +588,16 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { case MetaMarker.VALUE_CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: { - Chunk chunk; - long currentValueChunkPageMemorySize = 0; - if (Objects.isNull(firstChunk4NextSequentialValueChunks)) { + CachedAlignedValueChunk valueChunk = cachedAlignedValueChunk; + if (Objects.isNull(valueChunk)) { final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); - final long nextMarkerOffset = - tsFileSequenceReader.position() + chunkHeader.getDataSize(); - if (Objects.isNull(currentDevice) - || !pattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { - tsFileSequenceReader.position(nextMarkerOffset); + if (filterChunk(currentChunkHeaderOffset, chunkHeader, true, marker)) { break; } - if (!currentModifications.isEmpty()) { - // Skip the chunk if it is fully deleted by mods - Statistics statistics = null; - try { - statistics = - findAlignedChunkStatistics( - tsFileSequenceReader.getIChunkMetadataList( - CompactionPathUtils.getPath( - currentDevice, chunkHeader.getMeasurementID())), - currentChunkHeaderOffset); - } catch (IllegalPathException ignore) { - LOGGER.warn( - "Failed to get chunk metadata for {}.{}", - currentDevice, - chunkHeader.getMeasurementID()); - } - if (statistics != null - && ModsOperationUtil.isAllDeletedByMods( - currentDevice, - chunkHeader.getMeasurementID(), - statistics.getStartTime(), - statistics.getEndTime(), - currentModifications)) { - tsFileSequenceReader.position(nextMarkerOffset); - break; - } - } - - // Increase value index + // Increase value index. final String measurementID = tabletStringInternPool.intern(chunkHeader.getMeasurementID()); final int valueIndex = @@ -688,86 +605,35 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { measurementID, (measurement, index) -> Objects.nonNull(index) ? index + 1 : 0); - // Emit when encountered non-sequential value chunk, or the chunk size exceeds - // certain value to avoid OOM - // Do not record or end current value chunks when there are empty chunks + // Do not record or end current value chunks when there are empty chunks. if (chunkHeader.getDataSize() == 0) { break; } - chunk = + final Chunk chunk = new Chunk( chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); - currentValueChunkPageMemorySize = calculateMaxPageMemorySize(chunk); - boolean needReturn = false; - final long timeChunkSize = - lastIndex >= 0 - ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed( - timeChunkList.get(lastIndex)) - : 0; - final long timeChunkPageMemorySize = - lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) : 0; - if (lastIndex >= 0) { - if (valueIndex != lastIndex) { - needReturn = recordAlignedChunk(valueChunkList, marker); - } else { - final long chunkSize = timeChunkSize + valueChunkSize; - final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; - if (chunkSize + chunkHeader.getDataSize() - > allocatedMemoryBlockForChunk.getMemoryUsageInBytes() - || timeChunkPageMemorySize > 0 - && currentValueChunkPageMemorySize > 0 - && pageMemorySize + currentValueChunkPageMemorySize - > getPageDataMemoryLimitInBytes()) { - needReturn = recordAlignedChunk(valueChunkList, marker); - } - } - } - lastIndex = valueIndex; - if (needReturn) { - firstChunk4NextSequentialValueChunks = chunk; - return; - } - resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); - resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - valueChunkList, currentValueChunkPageMemorySize); + valueChunk = + new CachedAlignedValueChunk(valueIndex, chunk, chunkHeader.getDataSize()); } else { - chunk = firstChunk4NextSequentialValueChunks; - chunkHeader = chunk.getHeader(); - firstChunk4NextSequentialValueChunks = null; - currentValueChunkPageMemorySize = calculateMaxPageMemorySize(chunk); - resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); - resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - valueChunkList, currentValueChunkPageMemorySize); + cachedAlignedValueChunk = null; } - valueChunkSize += chunkHeader.getDataSize(); - valueChunkPageMemorySize += currentValueChunkPageMemorySize; - valueChunkList.add(chunk); - final String measurementID = - tabletStringInternPool.intern(chunkHeader.getMeasurementID()); - currentMeasurements.add( - new MeasurementSchema(measurementID, chunkHeader.getDataType())); - modsInfos.addAll( - ModsOperationUtil.initializeMeasurementMods( - currentDevice, Collections.singletonList(measurementID), currentModifications)); + if (returnPendingAlignedChunkBeforeCaching(valueChunk)) { + return; + } + cacheAlignedValueChunk(valueChunk); break; } case MetaMarker.CHUNK_GROUP_HEADER: { - // Return before "currentDevice" changes - if (recordAlignedChunk(valueChunkList, marker)) { + // Return before "currentDevice" changes. + if (useNextPendingAlignedChunk(marker)) { return; } + clearCachedAlignedChunkData(); final String deviceID = ((PlainDeviceID) tsFileSequenceReader.readChunkGroupHeader().getDeviceID()) .toStringID(); - // Clear because the cached data will never be used in the next chunk group - lastIndex = -1; - timeChunkList.clear(); - isMultiPageList.clear(); - timeChunkPageMemorySizeList.clear(); - measurementIndexMap.clear(); - currentDevice = pattern.mayOverlapWithDevice(deviceID) ? tabletStringInternPool.intern(deviceID) @@ -785,7 +651,8 @@ > getPageDataMemoryLimitInBytes()) { } lastMarker = marker; - if (!recordAlignedChunk(valueChunkList, marker)) { + if (!useNextPendingAlignedChunk(marker)) { + clearCachedAlignedChunkData(); chunkReader = null; } } @@ -794,22 +661,106 @@ private long getPageDataMemoryLimitInBytes() { return PipeConfig.getInstance().getPipeMaxReaderChunkSize(); } - private boolean recordAlignedChunk(final List valueChunkList, final byte marker) + private long getChunkMemoryLimitInBytes() { + return PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + } + + private boolean filterChunk( + final long currentChunkHeaderOffset, + final ChunkHeader chunkHeader, + final boolean isAlignedValueChunk, + final byte marker) throws IOException { - if (!valueChunkList.isEmpty()) { - final Chunk timeChunk = timeChunkList.get(lastIndex); + final long nextMarkerOffset = tsFileSequenceReader.position() + chunkHeader.getDataSize(); + + if (Objects.isNull(currentDevice)) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + + if (!isAlignedValueChunk) { + if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK) { + final Chunk timeChunk = + new Chunk(chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); + timeChunkList.add(timeChunk); + final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER; + isMultiPageList.add(isMultiPage); + return true; + } + } + + if (!pattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + + // Skip the chunk if it is fully deleted by mods + if (!currentModifications.isEmpty()) { + Statistics statistics = null; + try { + statistics = + isAlignedValueChunk + ? findAlignedChunkStatistics( + tsFileSequenceReader.getIChunkMetadataList( + CompactionPathUtils.getPath(currentDevice, chunkHeader.getMeasurementID())), + currentChunkHeaderOffset) + : findNonAlignedChunkStatistics( + tsFileSequenceReader.getIChunkMetadataList( + CompactionPathUtils.getPath(currentDevice, chunkHeader.getMeasurementID())), + currentChunkHeaderOffset); + } catch (IllegalPathException ignore) { + LOGGER.warn( + "Failed to get chunk metadata for {}.", + currentDevice + "." + chunkHeader.getMeasurementID()); + } + + if (statistics != null + && ModsOperationUtil.isAllDeletedByMods( + currentDevice, + chunkHeader.getMeasurementID(), + statistics.getStartTime(), + statistics.getEndTime(), + currentModifications)) { + tsFileSequenceReader.position(nextMarkerOffset); + return true; + } + } + return false; + } + + private boolean useNextPendingAlignedChunk(final byte marker) throws IOException { + while (!pendingAlignedChunkGroups.isEmpty()) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup = pendingAlignedChunkGroups.remove(0); + pendingAlignedChunkSize = + Math.max(0, pendingAlignedChunkSize - pendingAlignedChunkGroup.chunkSize); + + if (pendingAlignedChunkGroup.valueChunkList.isEmpty()) { + continue; + } + + final Chunk timeChunk = timeChunkList.get(pendingAlignedChunkGroup.timeChunkIndex); timeChunk.getData().rewind(); - currentIsMultiPage = isMultiPageList.get(lastIndex); + for (final Chunk valueChunk : pendingAlignedChunkGroup.valueChunkList) { + valueChunk.getData().rewind(); + } + + currentMeasurements.clear(); + currentMeasurements.addAll(pendingAlignedChunkGroup.measurements); + modsInfos.clear(); + modsInfos.addAll(pendingAlignedChunkGroup.modsInfos); + + currentIsMultiPage = isMultiPageList.get(pendingAlignedChunkGroup.timeChunkIndex); if (!currentIsMultiPage) { resizePageDataMemoryIfNeeded( AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes( - timeChunk, valueChunkList)); + timeChunk, pendingAlignedChunkGroup.valueChunkList)); } final List pageEstimatedMemoryUsageInBytesList = currentIsMultiPage ? AlignedSinglePageWholeChunkReader .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList( - timeChunk, valueChunkList) + timeChunk, pendingAlignedChunkGroup.valueChunkList) : Collections.emptyList(); final long maxPageEstimatedMemoryUsageInBytes = pageEstimatedMemoryUsageInBytesList.isEmpty() @@ -819,50 +770,222 @@ private boolean recordAlignedChunk(final List valueChunkList, final byte chunkReader = currentIsMultiPage ? new MemoryControlledChunkReader( - new AlignedChunkReader(timeChunk, valueChunkList, filter), + new AlignedChunkReader( + timeChunk, pendingAlignedChunkGroup.valueChunkList, filter), pageEstimatedMemoryUsageInBytesList) - : new AlignedSinglePageWholeChunkReader(timeChunk, valueChunkList); + : new AlignedSinglePageWholeChunkReader( + timeChunk, pendingAlignedChunkGroup.valueChunkList); currentIsAligned = true; - lastMarker = marker; + if (marker != Byte.MIN_VALUE) { + lastMarker = marker; + } + return true; + } + return false; + } + + private boolean shouldReturnPendingAlignedChunkBeforeCaching( + final CachedAlignedValueChunk valueChunk) throws IOException { + validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex); + + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + findPendingAlignedChunkGroup(valueChunk.timeChunkIndex); + final boolean isFirstValueChunkInGroup = + Objects.isNull(pendingAlignedChunkGroup) + || pendingAlignedChunkGroup.valueChunkList.isEmpty(); + final long timeChunkSize = + Objects.isNull(pendingAlignedChunkGroup) + ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed( + timeChunkList.get(valueChunk.timeChunkIndex)) + : 0; + final long chunkSizeAfterCaching = + pendingAlignedChunkSize + timeChunkSize + valueChunk.valueChunkSize; + + if (isFirstValueChunkInGroup) { + final long firstValueChunkGroupSize = + timeChunkSize + + (Objects.isNull(pendingAlignedChunkGroup) ? 0 : pendingAlignedChunkGroup.chunkSize) + + valueChunk.valueChunkSize; + if (firstValueChunkGroupSize > getChunkMemoryLimitInBytes()) { + return !pendingAlignedChunkGroups.isEmpty(); + } + } + + if (!pendingAlignedChunkGroups.isEmpty() + && chunkSizeAfterCaching > getChunkMemoryLimitInBytes()) { + return true; + } + + final long pageMemorySizeAfterCaching = + calculateMaxAlignedPageMemorySizeWithBatchData( + valueChunk.timeChunkIndex, pendingAlignedChunkGroup, valueChunk); + return pageMemorySizeAfterCaching > getPageDataMemoryLimitInBytes() + && (!isFirstValueChunkInGroup || !pendingAlignedChunkGroups.isEmpty()); + } + + private boolean returnPendingAlignedChunkBeforeCaching(final CachedAlignedValueChunk valueChunk) + throws IOException { + if (!shouldReturnPendingAlignedChunkBeforeCaching(valueChunk)) { + return false; + } + + cachedAlignedValueChunk = valueChunk; + if (useNextPendingAlignedChunk(Byte.MIN_VALUE)) { return true; } + cachedAlignedValueChunk = null; return false; } + private void cacheAlignedValueChunk(final CachedAlignedValueChunk valueChunk) throws IOException { + validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex); + + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + getOrCreatePendingAlignedChunkGroup(valueChunk.timeChunkIndex); + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, valueChunk); + resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, valueChunk); + + pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk); + pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize; + pendingAlignedChunkSize += valueChunk.valueChunkSize; + + final ChunkHeader chunkHeader = valueChunk.chunk.getHeader(); + final String measurementID = tabletStringInternPool.intern(chunkHeader.getMeasurementID()); + pendingAlignedChunkGroup.measurements.add( + new MeasurementSchema(measurementID, chunkHeader.getDataType())); + pendingAlignedChunkGroup.modsInfos.addAll( + ModsOperationUtil.initializeMeasurementMods( + currentDevice, Collections.singletonList(measurementID), currentModifications)); + } + + private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final int timeChunkIndex) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + findPendingAlignedChunkGroup(timeChunkIndex); + if (Objects.nonNull(pendingAlignedChunkGroup)) { + return pendingAlignedChunkGroup; + } + + final PendingAlignedChunkGroup newPendingAlignedChunkGroup = + new PendingAlignedChunkGroup( + timeChunkIndex, + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex))); + pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize; + + for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) { + if (pendingAlignedChunkGroups.get(i).timeChunkIndex > timeChunkIndex) { + pendingAlignedChunkGroups.add(i, newPendingAlignedChunkGroup); + return newPendingAlignedChunkGroup; + } + } + pendingAlignedChunkGroups.add(newPendingAlignedChunkGroup); + return newPendingAlignedChunkGroup; + } + + private PendingAlignedChunkGroup findPendingAlignedChunkGroup(final int timeChunkIndex) { + for (final PendingAlignedChunkGroup pendingAlignedChunkGroup : pendingAlignedChunkGroups) { + if (pendingAlignedChunkGroup.timeChunkIndex == timeChunkIndex) { + return pendingAlignedChunkGroup; + } + } + return null; + } + + private void validateAlignedValueChunkTimeIndex(final int timeChunkIndex) throws IOException { + if (timeChunkIndex < 0 || timeChunkIndex >= timeChunkList.size()) { + throw new IOException( + String.format( + "Invalid aligned value chunk index %d, while there are %d time chunks.", + timeChunkIndex, timeChunkList.size())); + } + } + + private void clearCachedAlignedChunkData() { + pendingAlignedChunkGroups.clear(); + pendingAlignedChunkSize = 0; + cachedAlignedValueChunk = null; + timeChunkList.clear(); + isMultiPageList.clear(); + measurementIndexMap.clear(); + } + private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit( - final List valueChunkList, final ChunkHeader valueChunkHeader) { - if (!valueChunkList.isEmpty() || lastIndex < 0) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup, + final CachedAlignedValueChunk valueChunk) { + if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) { return; } - final long chunkSize = - PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex)) - + valueChunkHeader.getDataSize(); + final long chunkSize = pendingAlignedChunkGroup.chunkSize + valueChunk.valueChunkSize; if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, chunkSize); } } private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - final List valueChunkList, final long valueChunkPageMemorySize) { - if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize <= 0) { - return; - } - - final long timeChunkPageMemorySize = timeChunkPageMemorySizeList.get(lastIndex); - if (timeChunkPageMemorySize <= 0) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup, + final CachedAlignedValueChunk valueChunk) + throws IOException { + if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) { return; } - final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; + final long pageMemorySize = + calculateMaxAlignedPageMemorySizeWithBatchData( + pendingAlignedChunkGroup.timeChunkIndex, pendingAlignedChunkGroup, valueChunk); if (pageMemorySize > getPageDataMemoryLimitInBytes()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize); } } - private long calculateMaxPageMemorySize(final Chunk chunk) throws IOException { - return SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk); + private long calculateMaxAlignedPageMemorySizeWithBatchData( + final int timeChunkIndex, + final PendingAlignedChunkGroup pendingAlignedChunkGroup, + final CachedAlignedValueChunk valueChunk) + throws IOException { + final List valueChunkList = + new ArrayList<>( + (Objects.isNull(pendingAlignedChunkGroup) + ? 0 + : pendingAlignedChunkGroup.valueChunkList.size()) + + 1); + if (Objects.nonNull(pendingAlignedChunkGroup)) { + valueChunkList.addAll(pendingAlignedChunkGroup.valueChunkList); + } + valueChunkList.add(valueChunk.chunk); + + final Chunk timeChunk = timeChunkList.get(timeChunkIndex); + final int timeChunkDataPosition = timeChunk.getData().position(); + final List valueChunkDataPositions = new ArrayList<>(valueChunkList.size()); + for (final Chunk chunk : valueChunkList) { + valueChunkDataPositions.add(Objects.isNull(chunk) ? 0 : chunk.getData().position()); + } + + rewindChunkData(timeChunk); + valueChunkList.forEach(this::rewindChunkData); + try { + return AlignedSinglePageWholeChunkReader + .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk, valueChunkList); + } finally { + timeChunk.getData().position(timeChunkDataPosition); + for (int i = 0; i < valueChunkList.size(); ++i) { + final Chunk chunk = valueChunkList.get(i); + if (Objects.nonNull(chunk)) { + chunk.getData().position(valueChunkDataPositions.get(i)); + } + } + } + } + + private void rewindChunkData(final Chunk chunk) { + if (Objects.isNull(chunk)) { + return; + } + + final ByteBuffer chunkData = chunk.getData(); + if (Objects.nonNull(chunkData)) { + chunkData.rewind(); + } } private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) { @@ -915,4 +1038,32 @@ private Statistics findNonAlignedChunkStatistics( } return null; } + + private static class PendingAlignedChunkGroup { + + private final int timeChunkIndex; + private final List valueChunkList = new ArrayList<>(); + private final List measurements = new ArrayList<>(); + private final List modsInfos = new ArrayList<>(); + private long chunkSize; + + private PendingAlignedChunkGroup(final int timeChunkIndex, final long timeChunkSize) { + this.timeChunkIndex = timeChunkIndex; + this.chunkSize = timeChunkSize; + } + } + + private static class CachedAlignedValueChunk { + + private final int timeChunkIndex; + private final Chunk chunk; + private final long valueChunkSize; + + private CachedAlignedValueChunk( + final int timeChunkIndex, final Chunk chunk, final long valueChunkSize) { + this.timeChunkIndex = timeChunkIndex; + this.chunk = chunk; + this.valueChunkSize = valueChunkSize; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index a22522666c2ff..b02cff26256cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -179,6 +179,14 @@ private static Pair calculateTabletRowCountAndMemoryBySize( return new Pair<>(1, 0); } + final int configuredTabletRowSize = + PipeConfig.getInstance().getPipeDataStructureTabletRowSize(); + final boolean hasTabletRowSizeLimit = configuredTabletRowSize > 0; + final double inputSizeLimit = + hasTabletRowSizeLimit && inputNum > 0 + ? 100 + inputNum * (double) rowBytesUsed * 1.2 + : Integer.MAX_VALUE; + // Calculate row number according to the max size of a pipe tablet. "100" is the estimated size // of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. @@ -186,17 +194,15 @@ private static Pair calculateTabletRowCountAndMemoryBySize( (int) Math.min( IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), - Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) rowBytesUsed * 1.2)); + Math.min(Integer.MAX_VALUE, inputSizeLimit)); int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); - if ( // This means the row number is larger than the max row count of a pipe tablet - rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) { + // This means the row number is larger than the max row count of a pipe tablet. + if (hasTabletRowSizeLimit && rowNumber > configuredTabletRowSize) { // Bound the row number, the memory cost is rowSize * rowNumber - return new Pair<>( - PipeConfig.getInstance().getPipeDataStructureTabletRowSize(), - rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + return new Pair<>(configuredTabletRowSize, rowBytesUsed * configuredTabletRowSize); } else { return new Pair<>(rowNumber, sizeLimit); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java index 36f56e0e6068c..55a27348e3ea2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java @@ -31,6 +31,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -49,6 +51,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.read.common.TimeRange; @@ -56,8 +59,11 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -422,6 +428,172 @@ public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() throws E } } + @Test + public void testScanParserMergesBatchedAlignedValueChunkGroups() throws Exception { + final long originalPipeMaxReaderChunkSize = + CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + final int originalPipeDataStructureTabletRowSize = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize(); + + final int measurementCount = 20; + final int batchSize = 10; + final int rowCount = 4; + final File sourceTsFile = new File("aligned-source-for-batched-layout.tsfile"); + alignedTsFile = new File("aligned-batched-layout.tsfile"); + + try { + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(1024 * 1024L); + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0); + + final List schemaList = new ArrayList<>(); + for (int i = 0; i < measurementCount; ++i) { + schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64)); + } + + writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount); + rewriteAlignedTsFileWithBatchedValueChunks( + sourceTsFile, alignedTsFile, measurementCount, batchSize); + + int tabletCount = 0; + int pointCount = 0; + try (final TsFileInsertionScanDataContainer parser = + new TsFileInsertionScanDataContainer( + alignedTsFile, + new PrefixPipePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + Assert.assertTrue(tabletWithIsAligned.getRight()); + final Tablet tablet = tabletWithIsAligned.getLeft(); + ++tabletCount; + Assert.assertEquals(measurementCount, tablet.getSchemas().size()); + Assert.assertEquals(rowCount / 2, tablet.rowSize); + pointCount += getNonNullSize(tablet); + } + } + + Assert.assertEquals(measurementCount * rowCount, pointCount); + Assert.assertEquals(2, tabletCount); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + sourceTsFile.delete(); + } + } + + @Test + public void testScanParserFlushesBatchedAlignedValueChunkGroupsByMemoryLimit() throws Exception { + final long originalPipeMaxReaderChunkSize = + CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + final int originalPipeDataStructureTabletRowSize = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize(); + + final int measurementCount = 20; + final int batchSize = 10; + final int rowCount = 4; + final File sourceTsFile = new File("aligned-source-for-batched-layout-memory-limit.tsfile"); + alignedTsFile = new File("aligned-batched-layout-memory-limit.tsfile"); + + try { + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0); + + final List schemaList = new ArrayList<>(); + for (int i = 0; i < measurementCount; ++i) { + schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64)); + } + + writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount); + rewriteAlignedTsFileWithBatchedValueChunks( + sourceTsFile, alignedTsFile, measurementCount, batchSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize( + calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(alignedTsFile, batchSize)); + + int tabletCount = 0; + int maxMeasurementCount = 0; + int pointCount = 0; + try (final TsFileInsertionScanDataContainer parser = + new TsFileInsertionScanDataContainer( + alignedTsFile, + new PrefixPipePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + Assert.assertTrue(tabletWithIsAligned.getRight()); + final Tablet tablet = tabletWithIsAligned.getLeft(); + ++tabletCount; + maxMeasurementCount = Math.max(maxMeasurementCount, tablet.getSchemas().size()); + Assert.assertTrue(tablet.getSchemas().size() <= batchSize); + Assert.assertEquals(rowCount / 2, tablet.rowSize); + pointCount += getNonNullSize(tablet); + } + } + + Assert.assertEquals(batchSize, maxMeasurementCount); + Assert.assertEquals(measurementCount * rowCount, pointCount); + Assert.assertEquals(measurementCount / batchSize * 2, tabletCount); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + sourceTsFile.delete(); + } + } + + @Test + public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() { + final int originalPipeDataStructureTabletRowSize = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize(); + final int originalPipeDataStructureTabletSizeInBytes = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(); + + try { + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(1024 * 1024); + + final BatchData batchData = new BatchData(TSDataType.INT64); + for (int i = 0; i < 1000; ++i) { + batchData.putAnObject(i, (long) i); + } + + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2); + final int rowCountWithLimit = + PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0); + final int rowCountWithoutLimit = + PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(-1); + final int rowCountWithNegativeLimit = + PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + + Assert.assertEquals(rowCountWithoutLimit, rowCountWithNegativeLimit); + Assert.assertEquals(2, rowCountWithLimit); + Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes); + } + } + public void testToTabletInsertionEvents(final boolean isQuery) throws Exception { // Test empty chunk testMixedTsFileWithEmptyChunk(isQuery); @@ -1059,4 +1231,144 @@ private long calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(final File return chunkSizeLimit; } } + + private long calculateFirstBatchedAlignedValueChunkGroupMemoryLimit( + final File tsFile, final int batchSize) throws Exception { + try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + final IDeviceID deviceID = reader.getDeviceMeasurementsMap().keySet().iterator().next(); + final List alignedChunkMetadataList = + reader.getAlignedChunkMetadata(deviceID); + Assert.assertEquals(2, alignedChunkMetadataList.size()); + + final AlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0); + final Chunk timeChunk = + reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); + final List valueChunkMetadataList = + alignedChunkMetadata.getValueChunkMetadataList(); + Assert.assertTrue(valueChunkMetadataList.size() >= batchSize * 2); + + final List firstValueChunkBatch = new ArrayList<>(); + final List firstTwoValueChunkBatches = new ArrayList<>(); + long firstBatchChunkSize = PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk); + long firstTwoBatchChunkSize = firstBatchChunkSize; + for (int index = 0; index < batchSize * 2; ++index) { + final Chunk valueChunk = + reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(index)); + if (index < batchSize) { + firstValueChunkBatch.add(valueChunk); + firstBatchChunkSize += valueChunk.getHeader().getDataSize(); + } + firstTwoValueChunkBatches.add(valueChunk); + firstTwoBatchChunkSize += valueChunk.getHeader().getDataSize(); + } + + final long firstBatchPageMemorySize = + AlignedSinglePageWholeChunkReader + .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData( + timeChunk, firstValueChunkBatch); + final long firstTwoBatchPageMemorySize = + AlignedSinglePageWholeChunkReader + .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData( + timeChunk, firstTwoValueChunkBatches); + Assert.assertTrue(firstTwoBatchChunkSize > firstBatchChunkSize); + Assert.assertTrue(firstTwoBatchPageMemorySize > firstBatchPageMemorySize); + return Math.max(firstBatchChunkSize, firstBatchPageMemorySize); + } + } + + private void writeAlignedSourceTsFile( + final File tsFile, final List schemaList, final int rowCount) + throws IOException { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + Assert.assertEquals(0, rowCount % 2); + + final IDeviceID deviceID = new PlainDeviceID("root.sg.d"); + try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) { + writer.startChunkGroup(deviceID); + final int rowCountPerChunk = rowCount / 2; + for (int chunkIndex = 0; chunkIndex < 2; ++chunkIndex) { + final AlignedChunkWriterImpl alignedChunkWriter = + new AlignedChunkWriterImpl(new ArrayList(schemaList)); + for (int row = 0; row < rowCountPerChunk; ++row) { + final long time = (long) chunkIndex * rowCountPerChunk + row; + alignedChunkWriter.getTimeChunkWriter().write(time); + for (int measurementIndex = 0; measurementIndex < schemaList.size(); ++measurementIndex) { + alignedChunkWriter + .getValueChunkWriterByIndex(measurementIndex) + .write(time, time * 100 + measurementIndex, false); + } + } + alignedChunkWriter.writeToFileWriter(writer); + } + writer.endChunkGroup(); + writer.endFile(); + } + } + + private void rewriteAlignedTsFileWithBatchedValueChunks( + final File sourceTsFile, + final File targetTsFile, + final int measurementCount, + final int batchSize) + throws Exception { + if (targetTsFile.exists()) { + Assert.assertTrue(targetTsFile.delete()); + } + + try (final TsFileSequenceReader reader = + new TsFileSequenceReader(sourceTsFile.getAbsolutePath())) { + final IDeviceID deviceID = reader.getDeviceMeasurementsMap().keySet().iterator().next(); + final List sourceAlignedChunkMetadataList = + reader.getAlignedChunkMetadata(deviceID); + Assert.assertEquals(2, sourceAlignedChunkMetadataList.size()); + for (final AlignedChunkMetadata sourceAlignedChunkMetadata : sourceAlignedChunkMetadataList) { + Assert.assertEquals( + measurementCount, sourceAlignedChunkMetadata.getValueChunkMetadataList().size()); + } + + try (final CompactionTsFileWriter writer = + new CompactionTsFileWriter( + targetTsFile, Long.MAX_VALUE, CompactionType.INNER_SEQ_COMPACTION)) { + writer.startChunkGroup(deviceID); + writer.markStartingWritingAligned(); + for (final AlignedChunkMetadata sourceAlignedChunkMetadata : + sourceAlignedChunkMetadataList) { + final ChunkMetadata timeChunkMetadata = + (ChunkMetadata) sourceAlignedChunkMetadata.getTimeChunkMetadata(); + writer.writeChunk(reader.readMemChunk(timeChunkMetadata), timeChunkMetadata); + } + + for (int start = 0; start < measurementCount; start += batchSize) { + writeValueChunkBatch( + reader, + writer, + sourceAlignedChunkMetadataList, + start, + Math.min(start + batchSize, measurementCount)); + } + writer.markEndingWritingAligned(); + writer.endChunkGroup(); + writer.endFile(); + } + } + } + + private void writeValueChunkBatch( + final TsFileSequenceReader reader, + final CompactionTsFileWriter writer, + final List alignedChunkMetadataList, + final int start, + final int end) + throws IOException { + for (final AlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { + final List valueChunkMetadataList = + alignedChunkMetadata.getValueChunkMetadataList(); + for (int index = start; index < end; ++index) { + final ChunkMetadata valueChunkMetadata = (ChunkMetadata) valueChunkMetadataList.get(index); + writer.writeChunk(reader.readMemChunk(valueChunkMetadata), valueChunkMetadata); + } + } + } }