From 9b10eb4bbcdfa22ae14bf237783bf4757d5e1798 Mon Sep 17 00:00:00 2001 From: mengnalin Date: Fri, 20 Mar 2026 12:37:50 -0700 Subject: [PATCH 1/9] Add level-aware getCompressor default method to CompressionCodecFactory --- .../compression/CompressionCodecFactory.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java index 561dcb899c..187c74a0ed 100644 --- a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java +++ b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java @@ -46,6 +46,30 @@ public interface CompressionCodecFactory { */ BytesInputCompressor getCompressor(CompressionCodecName codecName); + /** + * Returns a {@link BytesInputCompressor} instance for the specified codec name and compression level. + *

+ * The compression level controls the trade-off between compression speed and ratio. The valid range + * and meaning of the level is codec-specific: + *

+ * Implementations that do not support compression levels should ignore the {@code level} parameter + * and delegate to {@link #getCompressor(CompressionCodecName)}. + *

+ * The compressor is not thread-safe, so one instance for each working thread is required. + * + * @param codecName the codec name which the compressor instance is to be returned + * @param level the compression level; codec-specific, ignored if the codec does not support levels + * @return the compressor instance for the specified codec name and level + * @see BytesInputCompressor#release() + */ + default BytesInputCompressor getCompressor(CompressionCodecName codecName, int level) { + return getCompressor(codecName); + } + /** * Returns a {@link BytesInputDecompressor} instance for the specified codec name to be used for decompressing page * data. From 979144285a48d8b5a913f5d1b21d233a98a988dc Mon Sep 17 00:00:00 2001 From: mengnalin Date: Mon, 23 Mar 2026 13:43:15 -0700 Subject: [PATCH 2/9] Add level-aware getCompressor override to CodecFactory --- .../apache/parquet/hadoop/CodecFactory.java | 136 +++++++++++++++++- .../hadoop/TestDirectCodecFactory.java | 99 +++++++++++++ 2 files changed, 228 insertions(+), 7 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index eee5fa6083..8a56ac8ef5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -29,9 +29,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor; import org.apache.hadoop.util.ReflectionUtils; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferAllocator; @@ -45,10 +48,15 @@ public class CodecFactory implements CompressionCodecFactory { + private static final Logger LOG = LoggerFactory.getLogger(CodecFactory.class); + protected static final Map CODEC_BY_NAME = Collections.synchronizedMap(new HashMap()); - private final Map compressors = new HashMap<>(); + static final String GZIP_COMPRESS_LEVEL = "zlib.compress.level"; + static final String BROTLI_COMPRESS_QUALITY = "compression.brotli.quality"; + + private final Map compressors = new HashMap<>(); private final Map decompressors = new HashMap<>(); protected final ParquetConfiguration conf; @@ -250,10 +258,22 @@ public CompressionCodecName getCodecName() { @Override public BytesCompressor getCompressor(CompressionCodecName codecName) { - BytesCompressor comp = compressors.get(codecName); + String key = cacheKey(codecName); + BytesCompressor comp = compressors.get(key); if (comp == null) { comp = createCompressor(codecName); - compressors.put(codecName, comp); + compressors.put(key, comp); + } + return comp; + } + + @Override + public BytesCompressor getCompressor(CompressionCodecName codecName, int level) { + String key = cacheKey(codecName, level); + BytesCompressor comp = compressors.get(key); + if (comp == null) { + comp = createCompressorAtLevel(codecName, level); + compressors.put(key, comp); } return comp; } @@ -269,8 +289,7 @@ public BytesDecompressor getDecompressor(CompressionCodecName codecName) { } protected BytesCompressor createCompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + return compressorForCodec(codecName, getCodec(codecName)); } protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { @@ -278,6 +297,104 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); } + private BytesCompressor createCompressorAtLevel(CompressionCodecName codecName, int level) { + return compressorForCodec(codecName, getCodecAtLevel(codecName, level)); + } + + private BytesCompressor compressorForCodec(CompressionCodecName codecName, CompressionCodec codec) { + return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + } + + private static void validateZstdLevel(int level) { + if (level < 1 || level > 22) { + throw new BadConfigurationException("Unsupported ZSTD compression level: " + level + + ". Valid range is 1 (fastest) to 22 (best compression)."); + } + } + + private static void validateBrotliLevel(int level) { + if (level < 0 || level > 11) { + throw new BadConfigurationException("Unsupported Brotli compression level: " + level + + ". Valid range is 0 (fastest) to 11 (best compression)."); + } + } + + private static void validateGzipLevel(int level) { + if (level != -1 && (level < 0 || level > 9)) { + throw new BadConfigurationException("Unsupported GZIP compression level: " + level + + ". Valid range is 0 (no compression) to 9 (best compression), or -1 for default."); + } + } + + private static ZlibCompressor.CompressionLevel zlibCompressionLevel(int level) { + switch (level) { + case -1: return ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION; + case 0: return ZlibCompressor.CompressionLevel.NO_COMPRESSION; + case 1: return ZlibCompressor.CompressionLevel.BEST_SPEED; + case 2: return ZlibCompressor.CompressionLevel.TWO; + case 3: return ZlibCompressor.CompressionLevel.THREE; + case 4: return ZlibCompressor.CompressionLevel.FOUR; + case 5: return ZlibCompressor.CompressionLevel.FIVE; + case 6: return ZlibCompressor.CompressionLevel.SIX; + case 7: return ZlibCompressor.CompressionLevel.SEVEN; + case 8: return ZlibCompressor.CompressionLevel.EIGHT; + case 9: return ZlibCompressor.CompressionLevel.BEST_COMPRESSION; + default: throw new BadConfigurationException("Unsupported GZIP compression level: " + level + + ". Valid range is 0 (no compression) to 9 (best compression), or -1 for default."); + } + } + + /** + * Returns a {@link CompressionCodec} instance configured at the specified compression level. + * A level-specific {@link Configuration} snapshot is built so that the codec is initialized + * with the requested level rather than the global configuration value. + * For codecs that do not support levels the method falls back to {@link #getCodec(CompressionCodecName)}. + */ + private CompressionCodec getCodecAtLevel(CompressionCodecName codecName, int level) { + String codecClassName = codecName.getHadoopCompressionCodecClassName(); + if (codecClassName == null) { + return null; + } + String key = cacheKey(codecName, level); + CompressionCodec codec = CODEC_BY_NAME.get(key); + if (codec != null) { + return codec; + } + // Build a Configuration snapshot with the level explicitly set for this codec. + Configuration levelConf = new Configuration(ConfigurationUtil.createHadoopConfiguration(conf)); + switch (codecName) { + case ZSTD: + validateZstdLevel(level); + levelConf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, level); + break; + case GZIP: + validateGzipLevel(level); + levelConf.setEnum(GZIP_COMPRESS_LEVEL, zlibCompressionLevel(level)); + break; + case BROTLI: + validateBrotliLevel(level); + levelConf.setInt(BROTLI_COMPRESS_QUALITY, level); + break; + default: + // Codec does not support levels; fall back to the default codec instance. + LOG.warn("Compression level {} is not supported for codec {} and will be ignored.", level, codecName); + return getCodec(codecName); + } + try { + Class codecClass; + try { + codecClass = Class.forName(codecClassName); + } catch (ClassNotFoundException e) { + codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName); + } + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, levelConf); + CODEC_BY_NAME.put(key, codec); + return codec; + } catch (ClassNotFoundException e) { + throw new BadConfigurationException("Class " + codecClassName + " was not found", e); + } + } + /** * @param codecName the requested codec * @return the corresponding hadoop codec. null if UNCOMPRESSED @@ -314,10 +431,10 @@ private String cacheKey(CompressionCodecName codecName) { String level = null; switch (codecName) { case GZIP: - level = conf.get("zlib.compress.level"); + level = conf.get(GZIP_COMPRESS_LEVEL); break; case BROTLI: - level = conf.get("compression.brotli.quality"); + level = conf.get(BROTLI_COMPRESS_QUALITY); break; case ZSTD: level = conf.get("parquet.compression.codec.zstd.level"); @@ -329,6 +446,11 @@ private String cacheKey(CompressionCodecName codecName) { return level == null ? codecClass : codecClass + ":" + level; } + private String cacheKey(CompressionCodecName codecName, int level) { + String codecClass = codecName.getHadoopCompressionCodecClassName(); + return (codecClass == null ? codecName.name() : codecClass) + ":" + level; + } + @Override public void release() { for (BytesCompressor compressor : compressors.values()) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index c78ee09ecc..3d4b7db880 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -18,12 +18,17 @@ package org.apache.parquet.hadoop; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.BROTLI; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4_RAW; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZO; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Random; import java.util.Set; @@ -285,4 +290,98 @@ public void cachingKeysZstd() { Assert.assertEquals(codec_2_1, codec_2_2); Assert.assertNotEquals(codec_2_1, codec_5_1); } + + @Test + public void levelAwareCompressor_sameLevel_returnsCachedInstance() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor c1 = factory.getCompressor(ZSTD, 3); + BytesInputCompressor c2 = factory.getCompressor(ZSTD, 3); + Assert.assertSame("Same codec+level should return the cached instance", c1, c2); + factory.release(); + } + + @Test + public void levelAwareCompressor_differentLevels_returnsDifferentInstances() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor c1 = factory.getCompressor(ZSTD, 1); + BytesInputCompressor c3 = factory.getCompressor(ZSTD, 3); + Assert.assertNotSame("Different levels should return different compressor instances", c1, c3); + factory.release(); + } + + @Test + public void levelAwareCompressor_levelCacheIsolatedFromNoLevelCache() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor noLevel = factory.getCompressor(ZSTD); + BytesInputCompressor withLevel = factory.getCompressor(ZSTD, 3); + Assert.assertNotSame( + "Level-aware and no-level compressors should use separate cache entries", noLevel, withLevel); + factory.release(); + } + + @Test + public void levelAwareCompressor_uncompressed_returnsNoOpCompressor() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor comp = factory.getCompressor(UNCOMPRESSED, 5); + Assert.assertSame(CodecFactory.NO_OP_COMPRESSOR, comp); + factory.release(); + } + + @Test + public void levelAwareCompressor_snappy_ignoresLevel() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor comp = factory.getCompressor(SNAPPY, 99); + Assert.assertNotNull(comp); + Assert.assertEquals(SNAPPY, comp.getCodecName()); + factory.release(); + } + + @Test + public void levelAwareCompressor_gzip_invalidLevel_throwsBadConfigurationException() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + try { + BadConfigurationException ex = Assert.assertThrows(BadConfigurationException.class, + () -> factory.getCompressor(GZIP, 99)); + Assert.assertTrue(ex.getMessage().contains("99")); + } finally { + factory.release(); + } + } + + @Test + public void levelAwareCompressor_gzip_validBoundaryLevels_noException() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + for (int level : new int[] {-1, 0, 1, 9}) { + BytesInputCompressor comp = factory.getCompressor(GZIP, level); + Assert.assertNotNull("Compressor should not be null for GZIP level " + level, comp); + Assert.assertEquals("Codec name should be GZIP for level " + level, GZIP, comp.getCodecName()); + } + factory.release(); + } + + @Test + public void levelAwareCompressor_zstd_roundTrip() throws IOException { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + byte[] original = "hello parquet per-column compression".getBytes(StandardCharsets.UTF_8); + BytesInputDecompressor decompressor = factory.getDecompressor(ZSTD); + for (int level : new int[] {1, 3, 10, 22}) { + BytesInput compressed = factory.getCompressor(ZSTD, level).compress(BytesInput.from(original)); + byte[] result = decompressor.decompress(compressed, original.length).toByteArray(); + Assert.assertArrayEquals("Round-trip failed at ZSTD level " + level, original, result); + } + factory.release(); + } + + @Test + public void levelAwareCompressor_gzip_roundTrip() throws IOException { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + byte[] original = "hello parquet per-column compression".getBytes(StandardCharsets.UTF_8); + BytesInputDecompressor decompressor = factory.getDecompressor(GZIP); + for (int level : new int[] {1, 5, 9}) { + BytesInput compressed = factory.getCompressor(GZIP, level).compress(BytesInput.from(original)); + byte[] result = decompressor.decompress(compressed, original.length).toByteArray(); + Assert.assertArrayEquals("Round-trip failed at GZIP level " + level, original, result); + } + factory.release(); + } } From 1c1053325c739a53f7f4db327ffae39f4d876909 Mon Sep 17 00:00:00 2001 From: mengnalin Date: Mon, 23 Mar 2026 13:57:58 -0700 Subject: [PATCH 3/9] Add per-column compression codec and level to ParquetProperties --- .../parquet/column/ParquetProperties.java | 63 +++++++- .../parquet/column/TestParquetProperties.java | 145 ++++++++++++++++++ 2 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..b87697ee66 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -39,6 +39,7 @@ import org.apache.parquet.column.values.factory.ValuesWriterFactory; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; /** @@ -135,6 +136,8 @@ public static WriterVersion fromString(String name) { private final Map extraMetaData; private final ColumnProperty statistics; private final ColumnProperty sizeStatistics; + private final ColumnProperty columnCodecs; + private final ColumnProperty columnCompressionLevels; private ParquetProperties(Builder builder) { this.pageSizeThreshold = builder.pageSize; @@ -167,6 +170,8 @@ private ParquetProperties(Builder builder) { this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); this.sizeStatistics = builder.sizeStatistics.build(); + this.columnCodecs = builder.columnCodecs.build(); + this.columnCompressionLevels = builder.columnCompressionLevels.build(); } public static Builder builder() { @@ -370,6 +375,28 @@ public boolean getSizeStatisticsEnabled(ColumnDescriptor column) { return sizeStatisticsEnabled; } + /** + * Returns the compression codec configured for the given column, or {@code null} if no + * column-specific codec has been set (the caller should fall back to the job-level codec). + * + * @param column the column descriptor + * @return the per-column codec, or {@code null} if not set + */ + public CompressionCodecName getColumnCodec(ColumnDescriptor column) { + return columnCodecs.getValue(column); + } + + /** + * Returns the compression level configured for the given column, or {@code null} if no + * column-specific level has been set. + * + * @param column the column descriptor + * @return the per-column compression level, or {@code null} if not set + */ + public Integer getColumnCompressionLevel(ColumnDescriptor column) { + return columnCompressionLevels.getValue(column); + } + @Override public String toString() { return "Parquet page size to " + getPageSizeThreshold() + '\n' @@ -388,7 +415,9 @@ public String toString() { + "Page row count limit to " + getPageRowCountLimit() + '\n' + "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off") + '\n' + "Statistics enabled: " + statisticsEnabled + '\n' - + "Size statistics enabled: " + sizeStatisticsEnabled; + + "Size statistics enabled: " + sizeStatisticsEnabled + '\n' + + "Per-column codecs: " + columnCodecs + '\n' + + "Per-column compression levels: " + columnCompressionLevels; } public static class Builder { @@ -419,6 +448,8 @@ public static class Builder { private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; private final ColumnProperty.Builder sizeStatistics; + private final ColumnProperty.Builder columnCodecs; + private final ColumnProperty.Builder columnCompressionLevels; private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -436,6 +467,8 @@ private Builder() { ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER); statistics = ColumnProperty.builder().withDefaultValue(DEFAULT_STATISTICS_ENABLED); sizeStatistics = ColumnProperty.builder().withDefaultValue(DEFAULT_SIZE_STATISTICS_ENABLED); + columnCodecs = ColumnProperty.builder().withDefaultValue(null); + columnCompressionLevels = ColumnProperty.builder().withDefaultValue(null); } private Builder(ParquetProperties toCopy) { @@ -460,6 +493,8 @@ private Builder(ParquetProperties toCopy) { this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); + this.columnCodecs = ColumnProperty.builder(toCopy.columnCodecs); + this.columnCompressionLevels = ColumnProperty.builder(toCopy.columnCompressionLevels); } /** @@ -756,6 +791,32 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) { return this; } + /** + * Set the compression codec for the specified column. + * + * @param columnPath the path of the column (dot-string) + * @param codec the compression codec to use for this column + * @return this builder for method chaining + */ + public Builder withCompressionCodec(String columnPath, CompressionCodecName codec) { + this.columnCodecs.withValue(columnPath, Objects.requireNonNull(codec, "codec cannot be null")); + return this; + } + + /** + * Set the compression level for the specified column. + * The valid range is codec-specific (e.g. ZSTD: 1–22 (default 3), GZIP: 0–9 or -1 (default 6), BROTLI: 0–11 (default 1)). + * Pass {@code null} to unset a previously configured level for that column. + * + * @param columnPath the path of the column (dot-string) + * @param level the compression level, or {@code null} to unset + * @return this builder for method chaining + */ + public Builder withCompressionLevel(String columnPath, Integer level) { + this.columnCompressionLevels.withValue(columnPath, level); + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java b/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java new file mode 100644 index 0000000000..be6b518559 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column; + +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Before; +import org.junit.Test; + +public class TestParquetProperties { + + private static final MessageType SCHEMA = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; required double col_c; }"); + + private ColumnDescriptor colA; + private ColumnDescriptor colB; + private ColumnDescriptor colC; + + @Before + public void setUp() { + colA = SCHEMA.getColumns().get(0); + colB = SCHEMA.getColumns().get(1); + colC = SCHEMA.getColumns().get(2); + } + + @Test + public void columnCodec_notSet_returnsNull() { + ParquetProperties props = ParquetProperties.builder().build(); + assertNull(props.getColumnCodec(colA)); + } + + @Test + public void columnLevel_notSet_returnsNull() { + ParquetProperties props = ParquetProperties.builder().build(); + assertNull(props.getColumnCompressionLevel(colA)); + } + + @Test + public void columnCodec_setForColumn_returnsConfiguredCodec() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .build(); + assertEquals(ZSTD, props.getColumnCodec(colA)); + } + + @Test + public void columnCodec_setMultipleTimes_lastValueWins() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_a", SNAPPY) + .build(); + assertEquals(SNAPPY, props.getColumnCodec(colA)); + } + + @Test + public void columnCodec_otherColumnsUnaffected() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .build(); + assertNull(props.getColumnCodec(colB)); + assertNull(props.getColumnCodec(colC)); + } + + @Test + public void columnLevel_setForColumn_returnsConfiguredLevel() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionLevel("col_a", 10) + .build(); + assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA)); + } + + @Test + public void columnLevel_otherColumnsUnaffected() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionLevel("col_a", 10) + .build(); + assertNull(props.getColumnCompressionLevel(colB)); + } + + @Test + public void columnCodecAndLevel_multipleColumns_eachGetsOwn() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 10) + .withCompressionCodec("col_b", SNAPPY) + .withCompressionCodec("col_c", GZIP) + .withCompressionLevel("col_c", 5) + .build(); + + assertEquals(ZSTD, props.getColumnCodec(colA)); + assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA)); + + assertEquals(SNAPPY, props.getColumnCodec(colB)); + assertNull(props.getColumnCompressionLevel(colB)); + + assertEquals(GZIP, props.getColumnCodec(colC)); + assertEquals(Integer.valueOf(5), props.getColumnCompressionLevel(colC)); + } + + @Test + public void withCompressionCodec_nullCodec_throwsNullPointerException() { + assertThrows(NullPointerException.class, + () -> ParquetProperties.builder().withCompressionCodec("col_a", null)); + } + + @Test + public void copyBuilder_preservesColumnCodecAndLevel() { + ParquetProperties original = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 7) + .withCompressionCodec("col_b", SNAPPY) + .build(); + + ParquetProperties copy = ParquetProperties.copy(original).build(); + + assertEquals(ZSTD, copy.getColumnCodec(colA)); + assertEquals(Integer.valueOf(7), copy.getColumnCompressionLevel(colA)); + assertEquals(SNAPPY, copy.getColumnCodec(colB)); + assertNull(copy.getColumnCompressionLevel(colB)); + assertNull(copy.getColumnCodec(colC)); + } +} From 1a2ccd9a85968610cfefb44dde429846e0bc4d16 Mon Sep 17 00:00:00 2001 From: mengnalin Date: Mon, 23 Mar 2026 21:14:24 -0700 Subject: [PATCH 4/9] Add per-column codec resolution to ColumnChunkPageWriteStore --- .../hadoop/ColumnChunkPageWriteStore.java | 87 ++++++++--- .../hadoop/TestColumnChunkPageWriteStore.java | 138 ++++++++++++++++++ 2 files changed, 208 insertions(+), 17 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index d9e6ea0990..f46efc1f4d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.function.Function; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -44,6 +45,7 @@ import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.InternalColumnEncryptionSetup; @@ -53,6 +55,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.io.ParquetEncodingException; @@ -577,21 +580,7 @@ public ColumnChunkPageWriteStore( int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) { this.schema = schema; - for (ColumnDescriptor path : schema.getColumns()) { - writers.put( - path, - new ColumnChunkPageWriter( - path, - compressor, - allocator, - columnIndexTruncateLength, - pageWriteChecksumEnabled, - null, - null, - null, - -1, - -1)); - } + initWriters(col -> compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, null, -1); } @Deprecated @@ -622,13 +611,77 @@ public ColumnChunkPageWriteStore( InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) { this.schema = schema; + initWriters(col -> compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, fileEncryptor, rowGroupOrdinal); + } + + public ColumnChunkPageWriteStore( + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties, + MessageType schema, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled) { + this.schema = schema; + initWriters( + col -> resolveCompressor(col, codecFactory, defaultCodec, parquetProperties), + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + null, + -1); + } + + public ColumnChunkPageWriteStore( + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties, + MessageType schema, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor fileEncryptor, + int rowGroupOrdinal) { + this.schema = schema; + initWriters( + col -> resolveCompressor(col, codecFactory, defaultCodec, parquetProperties), + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal); + } + + private static BytesInputCompressor resolveCompressor( + ColumnDescriptor column, + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties) { + CompressionCodecName columnCodec = parquetProperties.getColumnCodec(column); + CompressionCodecName codec = columnCodec != null ? columnCodec : defaultCodec; + Integer level = parquetProperties.getColumnCompressionLevel(column); + if (level != null && columnCodec == null) { + LOG.warn("Column '{}': compression level {} set without a per-column codec; " + + "applying level to the default codec ({}).", + ColumnPath.get(column.getPath()), level, defaultCodec); + } + return level != null ? codecFactory.getCompressor(codec, level) : codecFactory.getCompressor(codec); + } + + private void initWriters( + Function compressorFn, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor fileEncryptor, + int rowGroupOrdinal) { if (null == fileEncryptor) { for (ColumnDescriptor path : schema.getColumns()) { writers.put( path, new ColumnChunkPageWriter( path, - compressor, + compressorFn.apply(path), allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, @@ -660,7 +713,7 @@ public ColumnChunkPageWriteStore( path, new ColumnChunkPageWriter( path, - compressor, + compressorFn.apply(path), allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a17cf678f5..f43b0ff11e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -22,7 +22,9 @@ import static org.apache.parquet.column.Encoding.RLE; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.schema.OriginalType.UTF8; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; @@ -32,6 +34,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -42,6 +46,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -63,6 +68,7 @@ import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.hadoop.ParquetFileWriter.Mode; +import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -319,4 +325,136 @@ public void testColumnOrderV1() throws IOException { private BytesInputCompressor compressor(CompressionCodecName codec) { return new CodecFactory(conf, pageSize).getCompressor(codec); } + + @Test + public void perColumnCodec_defaultUsedWhenNotSet() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder().build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(SNAPPY, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_allColumnsOverridden() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_b", GZIP) + .build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_withCodec_roundTrip() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 10) + .build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 23) + .build(); + + BadConfigurationException ex = assertThrows(BadConfigurationException.class, + () -> writeAndReadCodecs(schema, SNAPPY, props)); + assertTrue(ex.getMessage().contains("23")); + } + + @Test + public void perColumnLevel_invalidGzipLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType( + "message test { required binary col_a; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", GZIP) + .withCompressionLevel("col_a", 10) + .build(); + + BadConfigurationException ex = assertThrows(BadConfigurationException.class, + () -> writeAndReadCodecs(schema, SNAPPY, props)); + assertTrue(ex.getMessage().contains("10")); + } + + private Map writeAndReadCodecs( + MessageType schema, CompressionCodecName defaultCodec, ParquetProperties props) throws Exception { + Path file = new Path("target/test/TestColumnChunkPageWriteStore/perColumnCodec.parquet"); + FileSystem fs = file.getFileSystem(conf); + fs.delete(file, false); + fs.mkdirs(file.getParent()); + + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + CodecFactory codecFactory = new CodecFactory(conf, pageSize); + + OutputFileForTesting outputFile = new OutputFileForTesting(file, conf); + ParquetFileWriter writer = new ParquetFileWriter( + outputFile, + schema, + Mode.CREATE, + ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.MAX_PADDING_SIZE_DEFAULT, + null, + ParquetProperties.builder().withAllocator(allocator).build()); + writer.start(); + writer.startBlock(1); + + try (ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore( + codecFactory, + defaultCodec, + props, + schema, + allocator, + Integer.MAX_VALUE, + ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED)) { + for (ColumnDescriptor col : schema.getColumns()) { + Statistics stats = Statistics.getBuilderForReading(col.getPrimitiveType()).build(); + store.getPageWriter(col).writePage(BytesInput.fromInt(42), 1, 1, stats, RLE, RLE, PLAIN); + } + store.flushToFileWriter(writer); + } + + writer.endBlock(); + writer.end(new HashMap<>()); + + ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER); + Map result = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + result.put(col.getPath().toDotString(), col.getCodec()); + } + return result; + } } From 71da18937f9114e9254ada18de70668edcdf266e Mon Sep 17 00:00:00 2001 From: mengnalin Date: Mon, 23 Mar 2026 22:40:49 -0700 Subject: [PATCH 5/9] Thread CompressionCodecFactory through record writer stack --- .../hadoop/InternalParquetRecordWriter.java | 19 +- .../parquet/hadoop/ParquetRecordWriter.java | 33 ++- .../apache/parquet/hadoop/ParquetWriter.java | 29 ++- .../parquet/hadoop/TestParquetWriter.java | 213 ++++++++++++++++++ 4 files changed, 283 insertions(+), 11 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index dd51d1ef09..8aae03d522 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -28,10 +28,11 @@ import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; -import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.crypto.InternalFileEncryptor; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; @@ -51,7 +52,8 @@ class InternalParquetRecordWriter { private long rowGroupSizeThreshold; private final int rowGroupRecordCountThreshold; private long nextRowGroupSize; - private final BytesInputCompressor compressor; + private final CompressionCodecFactory codecFactory; + private final CompressionCodecName defaultCodec; private final boolean validating; private final ParquetProperties props; @@ -76,7 +78,8 @@ class InternalParquetRecordWriter { * @param schema the schema of the records * @param extraMetaData extra meta data to write in the footer of the file * @param rowGroupSize the size of a block in the file (this will be approximate) - * @param compressor the codec used to compress + * @param codecFactory factory used to create per-column compressors + * @param defaultCodec the default codec to use when no per-column codec is configured */ public InternalParquetRecordWriter( ParquetFileWriter parquetFileWriter, @@ -84,7 +87,8 @@ public InternalParquetRecordWriter( MessageType schema, Map extraMetaData, long rowGroupSize, - BytesInputCompressor compressor, + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, boolean validating, ParquetProperties props) { this.parquetFileWriter = parquetFileWriter; @@ -94,7 +98,8 @@ public InternalParquetRecordWriter( this.rowGroupSizeThreshold = rowGroupSize; this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); this.nextRowGroupSize = rowGroupSizeThreshold; - this.compressor = compressor; + this.codecFactory = codecFactory; + this.defaultCodec = defaultCodec; this.validating = validating; this.props = props; this.fileEncryptor = parquetFileWriter.getEncryptor(); @@ -109,7 +114,9 @@ public ParquetMetadata getFooter() { private void initStore() { ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore( - compressor, + codecFactory, + defaultCodec, + props, schema, props.getAllocator(), props.getColumnIndexTruncateLength(), diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index 51528b10be..acf6ca3161 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -26,7 +26,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -104,7 +106,8 @@ public ParquetRecordWriter( .withWriterVersion(writerVersion) .build(); internalWriter = new InternalParquetRecordWriter( - w, writeSupport, schema, extraMetaData, blockSize, compressor, validating, props); + w, writeSupport, schema, extraMetaData, blockSize, + singleCompressorFactory(compressor), compressor.getCodecName(), validating, props); this.memoryManager = null; this.codecFactory = null; } @@ -173,7 +176,8 @@ public ParquetRecordWriter( .withWriterVersion(writerVersion) .build(); internalWriter = new InternalParquetRecordWriter( - w, writeSupport, schema, extraMetaData, blockSize, compressor, validating, props); + w, writeSupport, schema, extraMetaData, blockSize, + singleCompressorFactory(compressor), compressor.getCodecName(), validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); this.codecFactory = null; @@ -207,13 +211,36 @@ public ParquetRecordWriter( schema, extraMetaData, blockSize, - codecFactory.getCompressor(codec), + codecFactory, + codec, validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); } + private static CompressionCodecFactory singleCompressorFactory(BytesInputCompressor compressor) { + return new CompressionCodecFactory() { + @Override + public BytesInputCompressor getCompressor(CompressionCodecName codecName) { + if (codecName != compressor.getCodecName()) { + throw new IllegalArgumentException( + "Per-column codec overrides are not supported by this writer. " + + "Requested: " + codecName + ", configured: " + compressor.getCodecName()); + } + return compressor; + } + + @Override + public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) { + throw new UnsupportedOperationException("Decompression is not supported by this factory"); + } + + @Override + public void release() {} + }; + } + /** * {@inheritDoc} */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..b791a26bf2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -395,7 +395,6 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport fileWriter.start(); this.codecFactory = codecFactory; - CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName); final Map extraMetadata; if (encodingProps.getExtraMetaData() == null @@ -418,7 +417,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport } this.writer = new InternalParquetRecordWriter( - fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps); + fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, codecFactory, compressionCodecName, validating, encodingProps); } public void write(T object) throws IOException { @@ -569,6 +568,32 @@ public SELF withCompressionCodec(CompressionCodecName codecName) { return self(); } + /** + * Override the compression codec for a specific column. + * + * @param columnPath dot-string path of the column (e.g. {@code "my_col"}) + * @param codecName the codec to use for that column + * @return this builder for method chaining. + */ + public SELF withCompressionCodec(String columnPath, CompressionCodecName codecName) { + encodingPropsBuilder.withCompressionCodec(columnPath, codecName); + return self(); + } + + /** + * Set the compression level for a specific column. + * The valid range is codec-specific (e.g. ZSTD: 1–22 (default 3), GZIP: 0–9 or -1 (default 6), BROTLI: 0–11 (default 1)). + * Pass {@code null} to unset a previously configured level for that column. + * + * @param columnPath dot-string path of the column (e.g. {@code "my_col"}) + * @param level codec-specific compression level, or {@code null} to unset + * @return this builder for method chaining. + */ + public SELF withCompressionLevel(String columnPath, Integer level) { + encodingPropsBuilder.withCompressionLevel(columnPath, level); + return self(); + } + /** * Set the {@link CompressionCodecFactory codec factory} used by the * constructed writer. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index a7888b58d8..58492ca76c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -18,6 +18,9 @@ */ package org.apache.parquet.hadoop; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY; import static org.apache.parquet.column.Encoding.PLAIN; import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; @@ -81,6 +84,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopOutputFile; @@ -821,6 +825,215 @@ public void testParquetWriterBuilderCanNotConfigurePathAndFile() throws IOExcept ExampleParquetWriter.builder(path).withFile(outputFile).build()); } + @Test + public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_defaultUsedWhenNoOverride() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(GZIP) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + assertEquals( + "Column " + col.getPath().toDotString() + " should use default codec", + GZIP, + col.getCodec()); + } + } + + @Test + public void perColumnLevel_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 1) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 42)); + } + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals(42, group.getInteger("col_b", 0)); + assertNull(reader.read()); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // ZSTD only supports levels 1-22; level 23 is invalid + Assert.assertThrows(BadConfigurationException.class, () -> { + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 23) + .build()) { + // exception expected before first write + } + }); + } + + @Test + public void perColumnLevel_invalidLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // GZIP only supports levels -1 (default) through 9; level 10 is invalid + Assert.assertThrows(BadConfigurationException.class, () -> { + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec("col_a", GZIP) + .withCompressionLevel("col_a", 10) + .build()) { + // exception expected before first write + } + }); + } + + @Test + public void perColumnLevel_differentLevelsPerColumn_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(BINARY).as(stringType()).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // Both columns use ZSTD (from default), but at different levels. + // This exercises the level-only override path in resolveCompressor(). + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(ZSTD) + .withCompressionLevel("col_a", 1) + .withCompressionLevel("col_b", 10) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "fast").append("col_b", "best")); + } + + // Both columns must report ZSTD in the footer + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + assertEquals( + "Column " + col.getPath().toDotString() + " should use ZSTD", + ZSTD, + col.getCodec()); + } + + // Data must survive the round-trip at both levels + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("fast", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals("best", group.getBinary("col_b", 0).toStringUsingUTF8()); + assertNull(reader.read()); + } + } + + @Test + public void perColumnCodec_allColumnsOverridden() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_b", GZIP) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + @Test public void testNoFlushAfterException() throws Exception { final File testDir = temp.newFile(); From f3e18ed143dd6434174e159ebc8e8c7502a8db4d Mon Sep 17 00:00:00 2001 From: mengnalin Date: Tue, 24 Mar 2026 11:22:42 -0700 Subject: [PATCH 6/9] Wire per-column compression into ParquetOutputFormat --- .../parquet/hadoop/ParquetOutputFormat.java | 17 +++ .../parquet/hadoop/TestParquetWriter.java | 138 ++++++++++++++++++ 2 files changed, 155 insertions(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 868ae634c1..4db288f455 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -163,6 +163,7 @@ public static enum JobSummaryLevel { public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled"; public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled"; + public static final String COLUMN_COMPRESSION_LEVEL_PREFIX = "parquet.compression.level"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -222,6 +223,14 @@ public static void setCompression(Job job, CompressionCodecName compression) { getConfiguration(job).set(COMPRESSION, compression.name()); } + public static void setColumnCompression(Job job, String columnPath, CompressionCodecName codec) { + getConfiguration(job).set(COMPRESSION + '#' + columnPath, codec.name()); + } + + public static void setColumnCompressionLevel(Job job, String columnPath, int level) { + getConfiguration(job).setInt(COLUMN_COMPRESSION_LEVEL_PREFIX + '#' + columnPath, level); + } + public static void setEnableDictionary(Job job, boolean enableDictionary) { getConfiguration(job).setBoolean(ENABLE_DICTIONARY, enableDictionary); } @@ -546,6 +555,14 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp STATISTICS_ENABLED, key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED), propsBuilder::withStatisticsEnabled) + .withColumnConfig( + COMPRESSION, + key -> CompressionCodecName.fromConf(conf.get(key)), + propsBuilder::withCompressionCodec) + .withColumnConfig( + COLUMN_COMPRESSION_LEVEL_PREFIX, + key -> conf.getInt(key, -1), + propsBuilder::withCompressionLevel) .parseConfig(conf); ParquetProperties props = propsBuilder.build(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 58492ca76c..78edac8a94 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -53,6 +53,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordWriter; import net.openhft.hashing.LongHashFunction; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; @@ -1071,4 +1073,140 @@ public void testNoFlushAfterException() throws Exception { FileSystem fs = file.getFileSystem(conf); assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0); } + + + @Test + public void outputFormat_setColumnCompression_overridesDefaultForOneColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompression_allColumnsOverridden() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + ParquetOutputFormat.setColumnCompression(job, "col_b", GZIP); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompression_defaultUsedWhenNotSet() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, GZIP, job, path); + + assertEquals(GZIP, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompressionLevel_withCodec_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(INT32).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_a", 1); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals(42, group.getInteger("col_b", 0)); + assertNull(reader.read()); + } + } + + @Test + public void outputFormat_setColumnCompressionLevel_differentLevelsPerColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY).as(stringType()).named("col_a") + .required(BINARY).as(stringType()).named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_a", 1); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_b", 10); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, ZSTD, job, path); + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(ZSTD, codecs.get("col_b")); + + try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals("fast", group.getBinary("col_b", 0).toStringUsingUTF8()); + assertNull(reader.read()); + } + } + + @SuppressWarnings("unchecked") + private Map writeAndReadCodecsViaOutputFormat( + MessageType schema, CompressionCodecName defaultCodec, Job job, Path file) + throws Exception { + Configuration conf = job.getConfiguration(); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + + Group group = f.newGroup(); + for (int i = 0; i < schema.getFieldCount(); i++) { + String name = schema.getFieldName(i); + switch (schema.getType(i).asPrimitiveType().getPrimitiveTypeName()) { + case BINARY: group.append(name, name.equals("col_a") ? "hello" : "fast"); break; + case INT32: group.append(name, 42); break; + default: break; + } + } + + ParquetOutputFormat outputFormat = new ParquetOutputFormat<>(new GroupWriteSupport()); + RecordWriter writer = outputFormat.getRecordWriter(conf, file, defaultCodec); + writer.write(null, group); + writer.close(null); + + ParquetMetadata footer = readFooter(conf, file, NO_FILTER); + Map result = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + result.put(col.getPath().toDotString(), col.getCodec()); + } + return result; + } } From 5587e3323da4e6b6a932fff9aa48ee8899392817 Mon Sep 17 00:00:00 2001 From: mengnalin Date: Wed, 1 Jul 2026 11:35:14 -0700 Subject: [PATCH 7/9] Add ColumnChunkPageWriteStore builder and fix ParquetProperties.copy() Thread a per-column compressor-provider function through the writer stack, replace the duplicate store constructors with a validating builder, and fix the copy constructor to preserve previously-dropped fields. --- .../parquet/column/ParquetProperties.java | 6 ++ .../hadoop/ColumnChunkPageWriteStore.java | 99 ++++++++++++++----- .../hadoop/InternalParquetRecordWriter.java | 35 +++---- .../parquet/hadoop/ParquetRecordWriter.java | 31 +----- .../apache/parquet/hadoop/ParquetWriter.java | 9 +- .../hadoop/TestColumnChunkPageWriteStore.java | 14 ++- 6 files changed, 114 insertions(+), 80 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index b87697ee66..fd5c568ac9 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -481,7 +481,11 @@ private Builder(ParquetProperties toCopy) { this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck; this.valuesWriterFactory = toCopy.valuesWriterFactory; this.allocator = toCopy.allocator; + this.pageValueCountThreshold = toCopy.pageValueCountThreshold; this.pageRowCountLimit = toCopy.pageRowCountLimit; + this.rowGroupRowCountLimit = toCopy.rowGroupRowCountLimit; + this.columnIndexTruncateLength = toCopy.columnIndexTruncateLength; + this.statisticsTruncateLength = toCopy.statisticsTruncateLength; this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs); this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs); @@ -492,7 +496,9 @@ private Builder(ParquetProperties toCopy) { this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled); this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); + this.statisticsEnabled = toCopy.statisticsEnabled; this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); + this.sizeStatisticsEnabled = toCopy.sizeStatisticsEnabled; this.columnCodecs = ColumnProperty.builder(toCopy.columnCodecs); this.columnCompressionLevels = ColumnProperty.builder(toCopy.columnCompressionLevels); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index c6dedbf47d..cf02b78a16 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.zip.CRC32; @@ -620,28 +621,8 @@ public ColumnChunkPageWriteStore( initWriters(col -> compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, fileEncryptor, rowGroupOrdinal); } - public ColumnChunkPageWriteStore( - CompressionCodecFactory codecFactory, - CompressionCodecName defaultCodec, - ParquetProperties parquetProperties, - MessageType schema, - ByteBufferAllocator allocator, - int columnIndexTruncateLength, - boolean pageWriteChecksumEnabled) { - this.schema = schema; - initWriters( - col -> resolveCompressor(col, codecFactory, defaultCodec, parquetProperties), - allocator, - columnIndexTruncateLength, - pageWriteChecksumEnabled, - null, - -1); - } - - public ColumnChunkPageWriteStore( - CompressionCodecFactory codecFactory, - CompressionCodecName defaultCodec, - ParquetProperties parquetProperties, + private ColumnChunkPageWriteStore( + Function compressorProvider, MessageType schema, ByteBufferAllocator allocator, int columnIndexTruncateLength, @@ -650,7 +631,7 @@ public ColumnChunkPageWriteStore( int rowGroupOrdinal) { this.schema = schema; initWriters( - col -> resolveCompressor(col, codecFactory, defaultCodec, parquetProperties), + compressorProvider, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, @@ -658,6 +639,13 @@ public ColumnChunkPageWriteStore( rowGroupOrdinal); } + static Function compressorProvider( + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties) { + return column -> resolveCompressor(column, codecFactory, defaultCodec, parquetProperties); + } + private static BytesInputCompressor resolveCompressor( ColumnDescriptor column, CompressionCodecFactory codecFactory, @@ -674,6 +662,71 @@ private static BytesInputCompressor resolveCompressor( return level != null ? codecFactory.getCompressor(codec, level) : codecFactory.getCompressor(codec); } + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Function compressorProvider; + private MessageType schema; + private ByteBufferAllocator allocator; + private int columnIndexTruncateLength = ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; + private boolean pageWriteChecksumEnabled = ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; + private InternalFileEncryptor fileEncryptor = null; + private int rowGroupOrdinal = -1; + + private Builder() {} + + public Builder withCompressorProvider(Function compressorProvider) { + this.compressorProvider = compressorProvider; + return this; + } + + public Builder withSchema(MessageType schema) { + this.schema = schema; + return this; + } + + public Builder withAllocator(ByteBufferAllocator allocator) { + this.allocator = allocator; + return this; + } + + public Builder withColumnIndexTruncateLength(int columnIndexTruncateLength) { + this.columnIndexTruncateLength = columnIndexTruncateLength; + return this; + } + + public Builder withPageWriteChecksumEnabled(boolean pageWriteChecksumEnabled) { + this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; + return this; + } + + public Builder withFileEncryptor(InternalFileEncryptor fileEncryptor) { + this.fileEncryptor = fileEncryptor; + return this; + } + + public Builder withRowGroupOrdinal(int rowGroupOrdinal) { + this.rowGroupOrdinal = rowGroupOrdinal; + return this; + } + + public ColumnChunkPageWriteStore build() { + Objects.requireNonNull(compressorProvider, "compressorProvider cannot be null"); + Objects.requireNonNull(schema, "schema cannot be null"); + Objects.requireNonNull(allocator, "allocator cannot be null"); + return new ColumnChunkPageWriteStore( + compressorProvider, + schema, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal); + } + } + private void initWriters( Function compressorFn, ByteBufferAllocator allocator, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index 8aae03d522..d503cbb4eb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -25,14 +25,15 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; -import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.crypto.InternalFileEncryptor; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; @@ -52,8 +53,7 @@ class InternalParquetRecordWriter { private long rowGroupSizeThreshold; private final int rowGroupRecordCountThreshold; private long nextRowGroupSize; - private final CompressionCodecFactory codecFactory; - private final CompressionCodecName defaultCodec; + private final Function compressorProvider; private final boolean validating; private final ParquetProperties props; @@ -78,8 +78,6 @@ class InternalParquetRecordWriter { * @param schema the schema of the records * @param extraMetaData extra meta data to write in the footer of the file * @param rowGroupSize the size of a block in the file (this will be approximate) - * @param codecFactory factory used to create per-column compressors - * @param defaultCodec the default codec to use when no per-column codec is configured */ public InternalParquetRecordWriter( ParquetFileWriter parquetFileWriter, @@ -87,8 +85,7 @@ public InternalParquetRecordWriter( MessageType schema, Map extraMetaData, long rowGroupSize, - CompressionCodecFactory codecFactory, - CompressionCodecName defaultCodec, + Function compressorProvider, boolean validating, ParquetProperties props) { this.parquetFileWriter = parquetFileWriter; @@ -98,8 +95,7 @@ public InternalParquetRecordWriter( this.rowGroupSizeThreshold = rowGroupSize; this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); this.nextRowGroupSize = rowGroupSizeThreshold; - this.codecFactory = codecFactory; - this.defaultCodec = defaultCodec; + this.compressorProvider = compressorProvider; this.validating = validating; this.props = props; this.fileEncryptor = parquetFileWriter.getEncryptor(); @@ -113,16 +109,15 @@ public ParquetMetadata getFooter() { } private void initStore() { - ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore( - codecFactory, - defaultCodec, - props, - schema, - props.getAllocator(), - props.getColumnIndexTruncateLength(), - props.getPageWriteChecksumEnabled(), - fileEncryptor, - rowGroupOrdinal); + ColumnChunkPageWriteStore columnChunkPageWriteStore = ColumnChunkPageWriteStore.builder() + .withCompressorProvider(compressorProvider) + .withSchema(schema) + .withAllocator(props.getAllocator()) + .withColumnIndexTruncateLength(props.getColumnIndexTruncateLength()) + .withPageWriteChecksumEnabled(props.getPageWriteChecksumEnabled()) + .withFileEncryptor(fileEncryptor) + .withRowGroupOrdinal(rowGroupOrdinal) + .build(); pageStore = columnChunkPageWriteStore; bloomFilterWriteStore = columnChunkPageWriteStore; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index acf6ca3161..78abdd2a7a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -26,9 +26,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; -import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; -import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -107,7 +105,7 @@ public ParquetRecordWriter( .build(); internalWriter = new InternalParquetRecordWriter( w, writeSupport, schema, extraMetaData, blockSize, - singleCompressorFactory(compressor), compressor.getCodecName(), validating, props); + col -> compressor, validating, props); this.memoryManager = null; this.codecFactory = null; } @@ -177,7 +175,7 @@ public ParquetRecordWriter( .build(); internalWriter = new InternalParquetRecordWriter( w, writeSupport, schema, extraMetaData, blockSize, - singleCompressorFactory(compressor), compressor.getCodecName(), validating, props); + col -> compressor, validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); this.codecFactory = null; @@ -211,36 +209,13 @@ public ParquetRecordWriter( schema, extraMetaData, blockSize, - codecFactory, - codec, + ColumnChunkPageWriteStore.compressorProvider(codecFactory, codec, props), validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); } - private static CompressionCodecFactory singleCompressorFactory(BytesInputCompressor compressor) { - return new CompressionCodecFactory() { - @Override - public BytesInputCompressor getCompressor(CompressionCodecName codecName) { - if (codecName != compressor.getCodecName()) { - throw new IllegalArgumentException( - "Per-column codec overrides are not supported by this writer. " - + "Requested: " + codecName + ", configured: " + compressor.getCodecName()); - } - return compressor; - } - - @Override - public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) { - throw new UnsupportedOperationException("Decompression is not supported by this factory"); - } - - @Override - public void release() {} - }; - } - /** * {@inheritDoc} */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index b791a26bf2..41d9d1264a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -417,7 +417,14 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport } this.writer = new InternalParquetRecordWriter( - fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, codecFactory, compressionCodecName, validating, encodingProps); + fileWriter, + writeSupport, + schema, + extraMetadata, + rowGroupSize, + ColumnChunkPageWriteStore.compressorProvider(codecFactory, compressionCodecName, encodingProps), + validating, + encodingProps); } public void write(T object) throws IOException { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index f43b0ff11e..abca3a5811 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -432,14 +432,12 @@ private Map writeAndReadCodecs( writer.start(); writer.startBlock(1); - try (ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore( - codecFactory, - defaultCodec, - props, - schema, - allocator, - Integer.MAX_VALUE, - ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED)) { + try (ColumnChunkPageWriteStore store = ColumnChunkPageWriteStore.builder() + .withCompressorProvider(ColumnChunkPageWriteStore.compressorProvider(codecFactory, defaultCodec, props)) + .withSchema(schema) + .withAllocator(allocator) + .withColumnIndexTruncateLength(Integer.MAX_VALUE) + .build()) { for (ColumnDescriptor col : schema.getColumns()) { Statistics stats = Statistics.getBuilderForReading(col.getPrimitiveType()).build(); store.getPageWriter(col).writePage(BytesInput.fromInt(42), 1, 1, stats, RLE, RLE, PLAIN); From c59c14e354ccb6988ca47318881233559f1e7a34 Mon Sep 17 00:00:00 2001 From: mengnalin Date: Thu, 2 Jul 2026 14:15:20 -0700 Subject: [PATCH 8/9] Fix DirectCodecFactory per-column compression level path and fix import order Route the level-aware getCompressor(codec, level) path through an overridable createCompressorAtLevel so DirectCodecFactory returns its direct SNAPPY/ZSTD compressors (with the level honored for ZSTD) instead of falling back to the heap/Hadoop path. Add a direct-factory test for this. Also fix Spotless import ordering flagged in review. --- .../parquet/column/TestParquetProperties.java | 24 ++-- .../apache/parquet/hadoop/CodecFactory.java | 46 ++++--- .../hadoop/ColumnChunkPageWriteStore.java | 19 ++- .../parquet/hadoop/DirectCodecFactory.java | 21 +++ .../parquet/hadoop/ParquetRecordWriter.java | 6 +- .../hadoop/TestColumnChunkPageWriteStore.java | 39 +++--- .../hadoop/TestDirectCodecFactory.java | 51 ++++++- .../parquet/hadoop/TestParquetWriter.java | 126 ++++++++++++------ 8 files changed, 225 insertions(+), 107 deletions(-) diff --git a/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java b/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java index be6b518559..8456e42f4f 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java @@ -60,9 +60,8 @@ public void columnLevel_notSet_returnsNull() { @Test public void columnCodec_setForColumn_returnsConfiguredCodec() { - ParquetProperties props = ParquetProperties.builder() - .withCompressionCodec("col_a", ZSTD) - .build(); + ParquetProperties props = + ParquetProperties.builder().withCompressionCodec("col_a", ZSTD).build(); assertEquals(ZSTD, props.getColumnCodec(colA)); } @@ -77,26 +76,23 @@ public void columnCodec_setMultipleTimes_lastValueWins() { @Test public void columnCodec_otherColumnsUnaffected() { - ParquetProperties props = ParquetProperties.builder() - .withCompressionCodec("col_a", ZSTD) - .build(); + ParquetProperties props = + ParquetProperties.builder().withCompressionCodec("col_a", ZSTD).build(); assertNull(props.getColumnCodec(colB)); assertNull(props.getColumnCodec(colC)); } @Test public void columnLevel_setForColumn_returnsConfiguredLevel() { - ParquetProperties props = ParquetProperties.builder() - .withCompressionLevel("col_a", 10) - .build(); + ParquetProperties props = + ParquetProperties.builder().withCompressionLevel("col_a", 10).build(); assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA)); } @Test public void columnLevel_otherColumnsUnaffected() { - ParquetProperties props = ParquetProperties.builder() - .withCompressionLevel("col_a", 10) - .build(); + ParquetProperties props = + ParquetProperties.builder().withCompressionLevel("col_a", 10).build(); assertNull(props.getColumnCompressionLevel(colB)); } @@ -122,8 +118,8 @@ public void columnCodecAndLevel_multipleColumns_eachGetsOwn() { @Test public void withCompressionCodec_nullCodec_throwsNullPointerException() { - assertThrows(NullPointerException.class, - () -> ParquetProperties.builder().withCompressionCodec("col_a", null)); + assertThrows( + NullPointerException.class, () -> ParquetProperties.builder().withCompressionCodec("col_a", null)); } @Test diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 178e8a4df3..c676d2b500 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -29,8 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; @@ -46,6 +44,8 @@ import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ConfigurationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CodecFactory implements CompressionCodecFactory { @@ -299,7 +299,7 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); } - private BytesCompressor createCompressorAtLevel(CompressionCodecName codecName, int level) { + protected BytesCompressor createCompressorAtLevel(CompressionCodecName codecName, int level) { return compressorForCodec(codecName, getCodecAtLevel(codecName, level)); } @@ -307,7 +307,7 @@ private BytesCompressor compressorForCodec(CompressionCodecName codecName, Compr return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); } - private static void validateZstdLevel(int level) { + static void validateZstdLevel(int level) { if (level < 1 || level > 22) { throw new BadConfigurationException("Unsupported ZSTD compression level: " + level + ". Valid range is 1 (fastest) to 22 (best compression)."); @@ -330,19 +330,31 @@ private static void validateGzipLevel(int level) { private static ZlibCompressor.CompressionLevel zlibCompressionLevel(int level) { switch (level) { - case -1: return ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION; - case 0: return ZlibCompressor.CompressionLevel.NO_COMPRESSION; - case 1: return ZlibCompressor.CompressionLevel.BEST_SPEED; - case 2: return ZlibCompressor.CompressionLevel.TWO; - case 3: return ZlibCompressor.CompressionLevel.THREE; - case 4: return ZlibCompressor.CompressionLevel.FOUR; - case 5: return ZlibCompressor.CompressionLevel.FIVE; - case 6: return ZlibCompressor.CompressionLevel.SIX; - case 7: return ZlibCompressor.CompressionLevel.SEVEN; - case 8: return ZlibCompressor.CompressionLevel.EIGHT; - case 9: return ZlibCompressor.CompressionLevel.BEST_COMPRESSION; - default: throw new BadConfigurationException("Unsupported GZIP compression level: " + level - + ". Valid range is 0 (no compression) to 9 (best compression), or -1 for default."); + case -1: + return ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION; + case 0: + return ZlibCompressor.CompressionLevel.NO_COMPRESSION; + case 1: + return ZlibCompressor.CompressionLevel.BEST_SPEED; + case 2: + return ZlibCompressor.CompressionLevel.TWO; + case 3: + return ZlibCompressor.CompressionLevel.THREE; + case 4: + return ZlibCompressor.CompressionLevel.FOUR; + case 5: + return ZlibCompressor.CompressionLevel.FIVE; + case 6: + return ZlibCompressor.CompressionLevel.SIX; + case 7: + return ZlibCompressor.CompressionLevel.SEVEN; + case 8: + return ZlibCompressor.CompressionLevel.EIGHT; + case 9: + return ZlibCompressor.CompressionLevel.BEST_COMPRESSION; + default: + throw new BadConfigurationException("Unsupported GZIP compression level: " + level + + ". Valid range is 0 (no compression) to 9 (best compression), or -1 for default."); } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index cf02b78a16..69c28b2cdf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.function.Function; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -29,6 +28,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.zip.CRC32; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferReleaser; @@ -618,7 +618,13 @@ public ColumnChunkPageWriteStore( InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) { this.schema = schema; - initWriters(col -> compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, fileEncryptor, rowGroupOrdinal); + initWriters( + col -> compressor, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal); } private ColumnChunkPageWriteStore( @@ -655,9 +661,12 @@ private static BytesInputCompressor resolveCompressor( CompressionCodecName codec = columnCodec != null ? columnCodec : defaultCodec; Integer level = parquetProperties.getColumnCompressionLevel(column); if (level != null && columnCodec == null) { - LOG.warn("Column '{}': compression level {} set without a per-column codec; " - + "applying level to the default codec ({}).", - ColumnPath.get(column.getPath()), level, defaultCodec); + LOG.warn( + "Column '{}': compression level {} set without a per-column codec; " + + "applying level to the default codec ({}).", + ColumnPath.get(column.getPath()), + level, + defaultCodec); } return level != null ? codecFactory.getCompressor(codec, level) : codecFactory.getCompressor(codec); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index b2b5233eeb..cc7a9e6a1d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -110,6 +110,19 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName) } } + @Override + protected BytesCompressor createCompressorAtLevel(final CompressionCodecName codecName, final int level) { + switch (codecName) { + case SNAPPY: + // Snappy has no compression level; keep using the direct compressor. + return new SnappyCompressor(); + case ZSTD: + return new ZstdCompressor(level); + default: + return super.createCompressorAtLevel(codecName, level); + } + } + @Override protected BytesDecompressor createDecompressor(final CompressionCodecName codecName) { switch (codecName) { @@ -416,6 +429,14 @@ private class ZstdCompressor extends BaseCompressor { ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); } + ZstdCompressor(int level) { + validateZstdLevel(level); + context = new ZstdCompressCtx(); + context.setLevel(level); + context.setWorkers(conf.getInt( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); + } + @Override public CompressionCodecName getCodecName() { return CompressionCodecName.ZSTD; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index 78abdd2a7a..80a6c69d11 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -104,8 +104,7 @@ public ParquetRecordWriter( .withWriterVersion(writerVersion) .build(); internalWriter = new InternalParquetRecordWriter( - w, writeSupport, schema, extraMetaData, blockSize, - col -> compressor, validating, props); + w, writeSupport, schema, extraMetaData, blockSize, col -> compressor, validating, props); this.memoryManager = null; this.codecFactory = null; } @@ -174,8 +173,7 @@ public ParquetRecordWriter( .withWriterVersion(writerVersion) .build(); internalWriter = new InternalParquetRecordWriter( - w, writeSupport, schema, extraMetaData, blockSize, - col -> compressor, validating, props); + w, writeSupport, schema, extraMetaData, blockSize, col -> compressor, validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); this.codecFactory = null; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index abca3a5811..3b6a82e76d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -68,7 +68,6 @@ import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.hadoop.ParquetFileWriter.Mode; -import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -328,8 +327,8 @@ private BytesInputCompressor compressor(CompressionCodecName codec) { @Test public void perColumnCodec_defaultUsedWhenNotSet() throws Exception { - MessageType schema = MessageTypeParser.parseMessageType( - "message test { required binary col_a; required int32 col_b; }"); + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary col_a; required int32 col_b; }"); ParquetProperties props = ParquetProperties.builder().build(); Map codecs = writeAndReadCodecs(schema, SNAPPY, props); @@ -340,11 +339,10 @@ public void perColumnCodec_defaultUsedWhenNotSet() throws Exception { @Test public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { - MessageType schema = MessageTypeParser.parseMessageType( - "message test { required binary col_a; required int32 col_b; }"); - ParquetProperties props = ParquetProperties.builder() - .withCompressionCodec("col_a", ZSTD) - .build(); + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = + ParquetProperties.builder().withCompressionCodec("col_a", ZSTD).build(); Map codecs = writeAndReadCodecs(schema, SNAPPY, props); @@ -354,8 +352,8 @@ public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { @Test public void perColumnCodec_allColumnsOverridden() throws Exception { - MessageType schema = MessageTypeParser.parseMessageType( - "message test { required binary col_a; required int32 col_b; }"); + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary col_a; required int32 col_b; }"); ParquetProperties props = ParquetProperties.builder() .withCompressionCodec("col_a", ZSTD) .withCompressionCodec("col_b", GZIP) @@ -369,8 +367,8 @@ public void perColumnCodec_allColumnsOverridden() throws Exception { @Test public void perColumnLevel_withCodec_roundTrip() throws Exception { - MessageType schema = MessageTypeParser.parseMessageType( - "message test { required binary col_a; required int32 col_b; }"); + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary col_a; required int32 col_b; }"); ParquetProperties props = ParquetProperties.builder() .withCompressionCodec("col_a", ZSTD) .withCompressionLevel("col_a", 10) @@ -384,29 +382,27 @@ public void perColumnLevel_withCodec_roundTrip() throws Exception { @Test public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() throws Exception { - MessageType schema = MessageTypeParser.parseMessageType( - "message test { required binary col_a; }"); + MessageType schema = MessageTypeParser.parseMessageType("message test { required binary col_a; }"); ParquetProperties props = ParquetProperties.builder() .withCompressionCodec("col_a", ZSTD) .withCompressionLevel("col_a", 23) .build(); - BadConfigurationException ex = assertThrows(BadConfigurationException.class, - () -> writeAndReadCodecs(schema, SNAPPY, props)); + BadConfigurationException ex = + assertThrows(BadConfigurationException.class, () -> writeAndReadCodecs(schema, SNAPPY, props)); assertTrue(ex.getMessage().contains("23")); } @Test public void perColumnLevel_invalidGzipLevel_throwsBadConfigurationException() throws Exception { - MessageType schema = MessageTypeParser.parseMessageType( - "message test { required binary col_a; }"); + MessageType schema = MessageTypeParser.parseMessageType("message test { required binary col_a; }"); ParquetProperties props = ParquetProperties.builder() .withCompressionCodec("col_a", GZIP) .withCompressionLevel("col_a", 10) .build(); - BadConfigurationException ex = assertThrows(BadConfigurationException.class, - () -> writeAndReadCodecs(schema, SNAPPY, props)); + BadConfigurationException ex = + assertThrows(BadConfigurationException.class, () -> writeAndReadCodecs(schema, SNAPPY, props)); assertTrue(ex.getMessage().contains("10")); } @@ -439,7 +435,8 @@ private Map writeAndReadCodecs( .withColumnIndexTruncateLength(Integer.MAX_VALUE) .build()) { for (ColumnDescriptor col : schema.getColumns()) { - Statistics stats = Statistics.getBuilderForReading(col.getPrimitiveType()).build(); + Statistics stats = + Statistics.getBuilderForReading(col.getPrimitiveType()).build(); store.getPageWriter(col).writePage(BytesInput.fromInt(42), 1, 1, stats, RLE, RLE, PLAIN); } store.flushToFileWriter(writer); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index 3d4b7db880..b307467ae8 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -340,8 +340,8 @@ public void levelAwareCompressor_snappy_ignoresLevel() { public void levelAwareCompressor_gzip_invalidLevel_throwsBadConfigurationException() { CodecFactory factory = new CodecFactory(new Configuration(), pageSize); try { - BadConfigurationException ex = Assert.assertThrows(BadConfigurationException.class, - () -> factory.getCompressor(GZIP, 99)); + BadConfigurationException ex = + Assert.assertThrows(BadConfigurationException.class, () -> factory.getCompressor(GZIP, 99)); Assert.assertTrue(ex.getMessage().contains("99")); } finally { factory.release(); @@ -384,4 +384,51 @@ public void levelAwareCompressor_gzip_roundTrip() throws IOException { } factory.release(); } + + @Test + public void directFactory_levelAwareCompressor_usesDirectCompressorNotHeap() { + CodecFactory heap = new CodecFactory(new Configuration(), pageSize); + CodecFactory direct = + CodecFactory.createDirectCodecFactory(new Configuration(), new DirectByteBufferAllocator(), pageSize); + try { + // Sanity: the heap factory's leveled path yields a HeapBytesCompressor. + Assert.assertTrue( + "heap factory should produce a HeapBytesCompressor", + heap.getCompressor(ZSTD, 3) instanceof CodecFactory.HeapBytesCompressor); + + // The direct factory must not fall back to the heap/Hadoop path for leveled ZSTD/SNAPPY. + BytesInputCompressor directZstd = direct.getCompressor(ZSTD, 3); + Assert.assertFalse( + "direct factory ZSTD(level) should not fall back to HeapBytesCompressor", + directZstd instanceof CodecFactory.HeapBytesCompressor); + Assert.assertEquals(ZSTD, directZstd.getCodecName()); + Assert.assertEquals( + "leveled ZSTD should use the same direct compressor type as the no-level path", + direct.getCompressor(ZSTD).getClass(), + directZstd.getClass()); + + BytesInputCompressor directSnappy = direct.getCompressor(SNAPPY, 5); + Assert.assertFalse( + "direct factory SNAPPY(level) should not fall back to HeapBytesCompressor", + directSnappy instanceof CodecFactory.HeapBytesCompressor); + Assert.assertEquals( + "leveled SNAPPY should use the same direct compressor type as the no-level path", + direct.getCompressor(SNAPPY).getClass(), + directSnappy.getClass()); + } finally { + heap.release(); + direct.release(); + } + } + + @Test + public void directFactory_levelAwareCompressor_invalidZstdLevel_throwsBadConfigurationException() { + CodecFactory direct = + CodecFactory.createDirectCodecFactory(new Configuration(), new DirectByteBufferAllocator(), pageSize); + try { + Assert.assertThrows(BadConfigurationException.class, () -> direct.getCompressor(ZSTD, 23)); + } finally { + direct.release(); + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 88cd234e3e..a8b4783fe2 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -18,9 +18,6 @@ */ package org.apache.parquet.hadoop; -import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; -import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; -import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.column.Encoding.DELTA_BYTE_ARRAY; import static org.apache.parquet.column.Encoding.PLAIN; import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; @@ -30,7 +27,10 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; @@ -53,13 +53,13 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordWriter; import net.openhft.hashing.LongHashFunction; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; @@ -835,8 +835,11 @@ public void testParquetWriterBuilderCanNotConfigurePathAndFile() throws IOExcept @Test public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(INT32).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -864,8 +867,11 @@ public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { @Test public void perColumnCodec_defaultUsedWhenNoOverride() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(INT32).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -882,18 +888,18 @@ public void perColumnCodec_defaultUsedWhenNoOverride() throws Exception { ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { - assertEquals( - "Column " + col.getPath().toDotString() + " should use default codec", - GZIP, - col.getCodec()); + assertEquals("Column " + col.getPath().toDotString() + " should use default codec", GZIP, col.getCodec()); } } @Test public void perColumnLevel_dataRoundTrips() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(INT32).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -910,7 +916,8 @@ public void perColumnLevel_dataRoundTrips() throws Exception { writer.write(f.newGroup().append("col_a", "hello").append("col_b", 42)); } - try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { Group group = reader.read(); assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); assertEquals(42, group.getInteger("col_b", 0)); @@ -929,7 +936,9 @@ public void perColumnLevel_dataRoundTrips() throws Exception { @Test public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") + .required(BINARY) + .as(stringType()) + .named("col_a") .named("test"); File file = temp.newFile(); file.delete(); @@ -951,7 +960,9 @@ public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() th @Test public void perColumnLevel_invalidLevel_throwsBadConfigurationException() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") + .required(BINARY) + .as(stringType()) + .named("col_a") .named("test"); File file = temp.newFile(); file.delete(); @@ -973,8 +984,12 @@ public void perColumnLevel_invalidLevel_throwsBadConfigurationException() throws @Test public void perColumnLevel_differentLevelsPerColumn_dataRoundTrips() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(BINARY).as(stringType()).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(BINARY) + .as(stringType()) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -996,14 +1011,12 @@ public void perColumnLevel_differentLevelsPerColumn_dataRoundTrips() throws Exce // Both columns must report ZSTD in the footer ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { - assertEquals( - "Column " + col.getPath().toDotString() + " should use ZSTD", - ZSTD, - col.getCodec()); + assertEquals("Column " + col.getPath().toDotString() + " should use ZSTD", ZSTD, col.getCodec()); } // Data must survive the round-trip at both levels - try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { Group group = reader.read(); assertEquals("fast", group.getBinary("col_a", 0).toStringUsingUTF8()); assertEquals("best", group.getBinary("col_b", 0).toStringUsingUTF8()); @@ -1014,8 +1027,11 @@ public void perColumnLevel_differentLevelsPerColumn_dataRoundTrips() throws Exce @Test public void perColumnCodec_allColumnsOverridden() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(INT32).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -1082,8 +1098,11 @@ public void testNoFlushAfterException() throws Exception { @Test public void outputFormat_setColumnCompression_overridesDefaultForOneColumn() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(INT32).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -1100,8 +1119,11 @@ public void outputFormat_setColumnCompression_overridesDefaultForOneColumn() thr @Test public void outputFormat_setColumnCompression_allColumnsOverridden() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(INT32).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -1119,8 +1141,11 @@ public void outputFormat_setColumnCompression_allColumnsOverridden() throws Exce @Test public void outputFormat_setColumnCompression_defaultUsedWhenNotSet() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(INT32).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -1136,8 +1161,11 @@ public void outputFormat_setColumnCompression_defaultUsedWhenNotSet() throws Exc @Test public void outputFormat_setColumnCompressionLevel_withCodec_dataRoundTrips() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(INT32).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -1150,7 +1178,8 @@ public void outputFormat_setColumnCompressionLevel_withCodec_dataRoundTrips() th assertEquals(ZSTD, codecs.get("col_a")); assertEquals(SNAPPY, codecs.get("col_b")); - try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { Group group = reader.read(); assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); assertEquals(42, group.getInteger("col_b", 0)); @@ -1161,8 +1190,12 @@ public void outputFormat_setColumnCompressionLevel_withCodec_dataRoundTrips() th @Test public void outputFormat_setColumnCompressionLevel_differentLevelsPerColumn() throws Exception { MessageType schema = Types.buildMessage() - .required(BINARY).as(stringType()).named("col_a") - .required(BINARY).as(stringType()).named("col_b") + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(BINARY) + .as(stringType()) + .named("col_b") .named("test"); File file = temp.newFile(); file.delete(); @@ -1175,7 +1208,8 @@ public void outputFormat_setColumnCompressionLevel_differentLevelsPerColumn() th assertEquals(ZSTD, codecs.get("col_a")); assertEquals(ZSTD, codecs.get("col_b")); - try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), path).build()) { + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { Group group = reader.read(); assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); assertEquals("fast", group.getBinary("col_b", 0).toStringUsingUTF8()); @@ -1185,8 +1219,7 @@ public void outputFormat_setColumnCompressionLevel_differentLevelsPerColumn() th @SuppressWarnings("unchecked") private Map writeAndReadCodecsViaOutputFormat( - MessageType schema, CompressionCodecName defaultCodec, Job job, Path file) - throws Exception { + MessageType schema, CompressionCodecName defaultCodec, Job job, Path file) throws Exception { Configuration conf = job.getConfiguration(); GroupWriteSupport.setSchema(schema, conf); SimpleGroupFactory f = new SimpleGroupFactory(schema); @@ -1195,9 +1228,14 @@ private Map writeAndReadCodecsViaOutputFormat( for (int i = 0; i < schema.getFieldCount(); i++) { String name = schema.getFieldName(i); switch (schema.getType(i).asPrimitiveType().getPrimitiveTypeName()) { - case BINARY: group.append(name, name.equals("col_a") ? "hello" : "fast"); break; - case INT32: group.append(name, 42); break; - default: break; + case BINARY: + group.append(name, name.equals("col_a") ? "hello" : "fast"); + break; + case INT32: + group.append(name, 42); + break; + default: + break; } } From 6081f6151a70e8c41a177f5f9eaca2d4d30ba1ac Mon Sep 17 00:00:00 2001 From: mengnalin Date: Thu, 2 Jul 2026 15:15:26 -0700 Subject: [PATCH 9/9] Warn on ignored SNAPPY level and add direct-factory ZSTD round-trip coverage test --- .../parquet/hadoop/DirectCodecFactory.java | 17 ++++++++++------- .../parquet/hadoop/TestDirectCodecFactory.java | 15 ++++++++++++++- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index cc7a9e6a1d..312ccedf8f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -115,6 +115,7 @@ protected BytesCompressor createCompressorAtLevel(final CompressionCodecName cod switch (codecName) { case SNAPPY: // Snappy has no compression level; keep using the direct compressor. + LOG.warn("Compression level {} is not supported for codec {} and will be ignored.", level, codecName); return new SnappyCompressor(); case ZSTD: return new ZstdCompressor(level); @@ -422,19 +423,21 @@ private class ZstdCompressor extends BaseCompressor { private final ZstdCompressCtx context; ZstdCompressor() { - context = new ZstdCompressCtx(); - context.setLevel(conf.getInt( + context = newContext(conf.getInt( ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL)); - context.setWorkers(conf.getInt( - ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); } ZstdCompressor(int level) { validateZstdLevel(level); - context = new ZstdCompressCtx(); - context.setLevel(level); - context.setWorkers(conf.getInt( + context = newContext(level); + } + + private ZstdCompressCtx newContext(int level) { + ZstdCompressCtx ctx = new ZstdCompressCtx(); + ctx.setLevel(level); + ctx.setWorkers(conf.getInt( ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); + return ctx; } @Override diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index b307467ae8..a0cdc93427 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -386,7 +386,7 @@ public void levelAwareCompressor_gzip_roundTrip() throws IOException { } @Test - public void directFactory_levelAwareCompressor_usesDirectCompressorNotHeap() { + public void directFactory_levelAwareCompressor_usesDirectCompressorAndRoundTrips() throws IOException { CodecFactory heap = new CodecFactory(new Configuration(), pageSize); CodecFactory direct = CodecFactory.createDirectCodecFactory(new Configuration(), new DirectByteBufferAllocator(), pageSize); @@ -415,6 +415,19 @@ public void directFactory_levelAwareCompressor_usesDirectCompressorNotHeap() { "leveled SNAPPY should use the same direct compressor type as the no-level path", direct.getCompressor(SNAPPY).getClass(), directSnappy.getClass()); + + // The direct ZSTD level path must compress/decompress correctly at each level. + byte[] original = "hello parquet per-column zstd direct compression".getBytes(StandardCharsets.UTF_8); + BytesInputDecompressor decompressor = heap.getDecompressor(ZSTD); + for (int level : new int[] {1, 3, 22}) { + byte[] compressed = direct.getCompressor(ZSTD, level) + .compress(BytesInput.from(original)) + .toByteArray(); + byte[] result = decompressor + .decompress(BytesInput.from(compressed), original.length) + .toByteArray(); + Assert.assertArrayEquals("Direct ZSTD round-trip failed at level " + level, original, result); + } } finally { heap.release(); direct.release();