diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..fd5c568ac9 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -39,6 +39,7 @@ import org.apache.parquet.column.values.factory.ValuesWriterFactory; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; /** @@ -135,6 +136,8 @@ public static WriterVersion fromString(String name) { private final Map extraMetaData; private final ColumnProperty statistics; private final ColumnProperty sizeStatistics; + private final ColumnProperty columnCodecs; + private final ColumnProperty columnCompressionLevels; private ParquetProperties(Builder builder) { this.pageSizeThreshold = builder.pageSize; @@ -167,6 +170,8 @@ private ParquetProperties(Builder builder) { this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); this.sizeStatistics = builder.sizeStatistics.build(); + this.columnCodecs = builder.columnCodecs.build(); + this.columnCompressionLevels = builder.columnCompressionLevels.build(); } public static Builder builder() { @@ -370,6 +375,28 @@ public boolean getSizeStatisticsEnabled(ColumnDescriptor column) { return sizeStatisticsEnabled; } + /** + * Returns the compression codec configured for the given column, or {@code null} if no + * column-specific codec has been set (the caller should fall back to the job-level codec). + * + * @param column the column descriptor + * @return the per-column codec, or {@code null} if not set + */ + public CompressionCodecName getColumnCodec(ColumnDescriptor column) { + return columnCodecs.getValue(column); + } + + /** + * Returns the compression level configured for the given column, or {@code null} if no + * column-specific level has been set. + * + * @param column the column descriptor + * @return the per-column compression level, or {@code null} if not set + */ + public Integer getColumnCompressionLevel(ColumnDescriptor column) { + return columnCompressionLevels.getValue(column); + } + @Override public String toString() { return "Parquet page size to " + getPageSizeThreshold() + '\n' @@ -388,7 +415,9 @@ public String toString() { + "Page row count limit to " + getPageRowCountLimit() + '\n' + "Writing page checksums is: " + (getPageWriteChecksumEnabled() ? "on" : "off") + '\n' + "Statistics enabled: " + statisticsEnabled + '\n' - + "Size statistics enabled: " + sizeStatisticsEnabled; + + "Size statistics enabled: " + sizeStatisticsEnabled + '\n' + + "Per-column codecs: " + columnCodecs + '\n' + + "Per-column compression levels: " + columnCompressionLevels; } public static class Builder { @@ -419,6 +448,8 @@ public static class Builder { private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; private final ColumnProperty.Builder sizeStatistics; + private final ColumnProperty.Builder columnCodecs; + private final ColumnProperty.Builder columnCompressionLevels; private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -436,6 +467,8 @@ private Builder() { ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER); statistics = ColumnProperty.builder().withDefaultValue(DEFAULT_STATISTICS_ENABLED); sizeStatistics = ColumnProperty.builder().withDefaultValue(DEFAULT_SIZE_STATISTICS_ENABLED); + columnCodecs = ColumnProperty.builder().withDefaultValue(null); + columnCompressionLevels = ColumnProperty.builder().withDefaultValue(null); } private Builder(ParquetProperties toCopy) { @@ -448,7 +481,11 @@ private Builder(ParquetProperties toCopy) { this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck; this.valuesWriterFactory = toCopy.valuesWriterFactory; this.allocator = toCopy.allocator; + this.pageValueCountThreshold = toCopy.pageValueCountThreshold; this.pageRowCountLimit = toCopy.pageRowCountLimit; + this.rowGroupRowCountLimit = toCopy.rowGroupRowCountLimit; + this.columnIndexTruncateLength = toCopy.columnIndexTruncateLength; + this.statisticsTruncateLength = toCopy.statisticsTruncateLength; this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled; this.bloomFilterNDVs = ColumnProperty.builder(toCopy.bloomFilterNDVs); this.bloomFilterFPPs = ColumnProperty.builder(toCopy.bloomFilterFPPs); @@ -459,7 +496,11 @@ private Builder(ParquetProperties toCopy) { this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled); this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); + this.statisticsEnabled = toCopy.statisticsEnabled; this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); + this.sizeStatisticsEnabled = toCopy.sizeStatisticsEnabled; + this.columnCodecs = ColumnProperty.builder(toCopy.columnCodecs); + this.columnCompressionLevels = ColumnProperty.builder(toCopy.columnCompressionLevels); } /** @@ -756,6 +797,32 @@ public Builder withSizeStatisticsEnabled(String columnPath, boolean enabled) { return this; } + /** + * Set the compression codec for the specified column. + * + * @param columnPath the path of the column (dot-string) + * @param codec the compression codec to use for this column + * @return this builder for method chaining + */ + public Builder withCompressionCodec(String columnPath, CompressionCodecName codec) { + this.columnCodecs.withValue(columnPath, Objects.requireNonNull(codec, "codec cannot be null")); + return this; + } + + /** + * Set the compression level for the specified column. + * The valid range is codec-specific (e.g. ZSTD: 1–22 (default 3), GZIP: 0–9 or -1 (default 6), BROTLI: 0–11 (default 1)). + * Pass {@code null} to unset a previously configured level for that column. + * + * @param columnPath the path of the column (dot-string) + * @param level the compression level, or {@code null} to unset + * @return this builder for method chaining + */ + public Builder withCompressionLevel(String columnPath, Integer level) { + this.columnCompressionLevels.withValue(columnPath, level); + return this; + } + public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(this); // we pass a constructed but uninitialized factory to ParquetProperties above as currently diff --git a/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java b/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java new file mode 100644 index 0000000000..8456e42f4f --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/TestParquetProperties.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column; + +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; + +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Before; +import org.junit.Test; + +public class TestParquetProperties { + + private static final MessageType SCHEMA = MessageTypeParser.parseMessageType( + "message test { required binary col_a; required int32 col_b; required double col_c; }"); + + private ColumnDescriptor colA; + private ColumnDescriptor colB; + private ColumnDescriptor colC; + + @Before + public void setUp() { + colA = SCHEMA.getColumns().get(0); + colB = SCHEMA.getColumns().get(1); + colC = SCHEMA.getColumns().get(2); + } + + @Test + public void columnCodec_notSet_returnsNull() { + ParquetProperties props = ParquetProperties.builder().build(); + assertNull(props.getColumnCodec(colA)); + } + + @Test + public void columnLevel_notSet_returnsNull() { + ParquetProperties props = ParquetProperties.builder().build(); + assertNull(props.getColumnCompressionLevel(colA)); + } + + @Test + public void columnCodec_setForColumn_returnsConfiguredCodec() { + ParquetProperties props = + ParquetProperties.builder().withCompressionCodec("col_a", ZSTD).build(); + assertEquals(ZSTD, props.getColumnCodec(colA)); + } + + @Test + public void columnCodec_setMultipleTimes_lastValueWins() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_a", SNAPPY) + .build(); + assertEquals(SNAPPY, props.getColumnCodec(colA)); + } + + @Test + public void columnCodec_otherColumnsUnaffected() { + ParquetProperties props = + ParquetProperties.builder().withCompressionCodec("col_a", ZSTD).build(); + assertNull(props.getColumnCodec(colB)); + assertNull(props.getColumnCodec(colC)); + } + + @Test + public void columnLevel_setForColumn_returnsConfiguredLevel() { + ParquetProperties props = + ParquetProperties.builder().withCompressionLevel("col_a", 10).build(); + assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA)); + } + + @Test + public void columnLevel_otherColumnsUnaffected() { + ParquetProperties props = + ParquetProperties.builder().withCompressionLevel("col_a", 10).build(); + assertNull(props.getColumnCompressionLevel(colB)); + } + + @Test + public void columnCodecAndLevel_multipleColumns_eachGetsOwn() { + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 10) + .withCompressionCodec("col_b", SNAPPY) + .withCompressionCodec("col_c", GZIP) + .withCompressionLevel("col_c", 5) + .build(); + + assertEquals(ZSTD, props.getColumnCodec(colA)); + assertEquals(Integer.valueOf(10), props.getColumnCompressionLevel(colA)); + + assertEquals(SNAPPY, props.getColumnCodec(colB)); + assertNull(props.getColumnCompressionLevel(colB)); + + assertEquals(GZIP, props.getColumnCodec(colC)); + assertEquals(Integer.valueOf(5), props.getColumnCompressionLevel(colC)); + } + + @Test + public void withCompressionCodec_nullCodec_throwsNullPointerException() { + assertThrows( + NullPointerException.class, () -> ParquetProperties.builder().withCompressionCodec("col_a", null)); + } + + @Test + public void copyBuilder_preservesColumnCodecAndLevel() { + ParquetProperties original = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 7) + .withCompressionCodec("col_b", SNAPPY) + .build(); + + ParquetProperties copy = ParquetProperties.copy(original).build(); + + assertEquals(ZSTD, copy.getColumnCodec(colA)); + assertEquals(Integer.valueOf(7), copy.getColumnCompressionLevel(colA)); + assertEquals(SNAPPY, copy.getColumnCodec(colB)); + assertNull(copy.getColumnCompressionLevel(colB)); + assertNull(copy.getColumnCodec(colC)); + } +} diff --git a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java index 561dcb899c..187c74a0ed 100644 --- a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java +++ b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java @@ -46,6 +46,30 @@ public interface CompressionCodecFactory { */ BytesInputCompressor getCompressor(CompressionCodecName codecName); + /** + * Returns a {@link BytesInputCompressor} instance for the specified codec name and compression level. + *

+ * The compression level controls the trade-off between compression speed and ratio. The valid range + * and meaning of the level is codec-specific: + *

    + *
  • ZSTD: 1 (fastest) to 22 (best compression), default 3
  • + *
  • GZIP: 1 (fastest) to 9 (best compression), default 6
  • + *
  • BROTLI: 0 (fastest) to 11 (best compression), default 1
  • + *
+ * Implementations that do not support compression levels should ignore the {@code level} parameter + * and delegate to {@link #getCompressor(CompressionCodecName)}. + *

+ * The compressor is not thread-safe, so one instance for each working thread is required. + * + * @param codecName the codec name which the compressor instance is to be returned + * @param level the compression level; codec-specific, ignored if the codec does not support levels + * @return the compressor instance for the specified codec name and level + * @see BytesInputCompressor#release() + */ + default BytesInputCompressor getCompressor(CompressionCodecName codecName, int level) { + return getCompressor(codecName); + } + /** * Returns a {@link BytesInputDecompressor} instance for the specified codec name to be used for decompressing page * data. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 98b49835a6..c676d2b500 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -32,6 +32,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.zlib.ZlibCompressor; import org.apache.hadoop.util.ReflectionUtils; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferAllocator; @@ -43,13 +44,20 @@ import org.apache.parquet.hadoop.codec.ZstandardCodec; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.ConfigurationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CodecFactory implements CompressionCodecFactory { + private static final Logger LOG = LoggerFactory.getLogger(CodecFactory.class); + protected static final Map CODEC_BY_NAME = Collections.synchronizedMap(new HashMap()); - private final Map compressors = new HashMap<>(); + static final String GZIP_COMPRESS_LEVEL = "zlib.compress.level"; + static final String BROTLI_COMPRESS_QUALITY = "compression.brotli.quality"; + + private final Map compressors = new HashMap<>(); private final Map decompressors = new HashMap<>(); protected final ParquetConfiguration conf; @@ -252,10 +260,22 @@ public CompressionCodecName getCodecName() { @Override public BytesCompressor getCompressor(CompressionCodecName codecName) { - BytesCompressor comp = compressors.get(codecName); + String key = cacheKey(codecName); + BytesCompressor comp = compressors.get(key); if (comp == null) { comp = createCompressor(codecName); - compressors.put(codecName, comp); + compressors.put(key, comp); + } + return comp; + } + + @Override + public BytesCompressor getCompressor(CompressionCodecName codecName, int level) { + String key = cacheKey(codecName, level); + BytesCompressor comp = compressors.get(key); + if (comp == null) { + comp = createCompressorAtLevel(codecName, level); + compressors.put(key, comp); } return comp; } @@ -271,8 +291,7 @@ public BytesDecompressor getDecompressor(CompressionCodecName codecName) { } protected BytesCompressor createCompressor(CompressionCodecName codecName) { - CompressionCodec codec = getCodec(codecName); - return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + return compressorForCodec(codecName, getCodec(codecName)); } protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { @@ -280,6 +299,116 @@ protected BytesDecompressor createDecompressor(CompressionCodecName codecName) { return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec); } + protected BytesCompressor createCompressorAtLevel(CompressionCodecName codecName, int level) { + return compressorForCodec(codecName, getCodecAtLevel(codecName, level)); + } + + private BytesCompressor compressorForCodec(CompressionCodecName codecName, CompressionCodec codec) { + return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec); + } + + static void validateZstdLevel(int level) { + if (level < 1 || level > 22) { + throw new BadConfigurationException("Unsupported ZSTD compression level: " + level + + ". Valid range is 1 (fastest) to 22 (best compression)."); + } + } + + private static void validateBrotliLevel(int level) { + if (level < 0 || level > 11) { + throw new BadConfigurationException("Unsupported Brotli compression level: " + level + + ". Valid range is 0 (fastest) to 11 (best compression)."); + } + } + + private static void validateGzipLevel(int level) { + if (level != -1 && (level < 0 || level > 9)) { + throw new BadConfigurationException("Unsupported GZIP compression level: " + level + + ". Valid range is 0 (no compression) to 9 (best compression), or -1 for default."); + } + } + + private static ZlibCompressor.CompressionLevel zlibCompressionLevel(int level) { + switch (level) { + case -1: + return ZlibCompressor.CompressionLevel.DEFAULT_COMPRESSION; + case 0: + return ZlibCompressor.CompressionLevel.NO_COMPRESSION; + case 1: + return ZlibCompressor.CompressionLevel.BEST_SPEED; + case 2: + return ZlibCompressor.CompressionLevel.TWO; + case 3: + return ZlibCompressor.CompressionLevel.THREE; + case 4: + return ZlibCompressor.CompressionLevel.FOUR; + case 5: + return ZlibCompressor.CompressionLevel.FIVE; + case 6: + return ZlibCompressor.CompressionLevel.SIX; + case 7: + return ZlibCompressor.CompressionLevel.SEVEN; + case 8: + return ZlibCompressor.CompressionLevel.EIGHT; + case 9: + return ZlibCompressor.CompressionLevel.BEST_COMPRESSION; + default: + throw new BadConfigurationException("Unsupported GZIP compression level: " + level + + ". Valid range is 0 (no compression) to 9 (best compression), or -1 for default."); + } + } + + /** + * Returns a {@link CompressionCodec} instance configured at the specified compression level. + * A level-specific {@link Configuration} snapshot is built so that the codec is initialized + * with the requested level rather than the global configuration value. + * For codecs that do not support levels the method falls back to {@link #getCodec(CompressionCodecName)}. + */ + private CompressionCodec getCodecAtLevel(CompressionCodecName codecName, int level) { + String codecClassName = codecName.getHadoopCompressionCodecClassName(); + if (codecClassName == null) { + return null; + } + String key = cacheKey(codecName, level); + CompressionCodec codec = CODEC_BY_NAME.get(key); + if (codec != null) { + return codec; + } + // Build a Configuration snapshot with the level explicitly set for this codec. + Configuration levelConf = new Configuration(ConfigurationUtil.createHadoopConfiguration(conf)); + switch (codecName) { + case ZSTD: + validateZstdLevel(level); + levelConf.setInt(ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, level); + break; + case GZIP: + validateGzipLevel(level); + levelConf.setEnum(GZIP_COMPRESS_LEVEL, zlibCompressionLevel(level)); + break; + case BROTLI: + validateBrotliLevel(level); + levelConf.setInt(BROTLI_COMPRESS_QUALITY, level); + break; + default: + // Codec does not support levels; fall back to the default codec instance. + LOG.warn("Compression level {} is not supported for codec {} and will be ignored.", level, codecName); + return getCodec(codecName); + } + try { + Class codecClass; + try { + codecClass = Class.forName(codecClassName); + } catch (ClassNotFoundException e) { + codecClass = new Configuration(false).getClassLoader().loadClass(codecClassName); + } + codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, levelConf); + CODEC_BY_NAME.put(key, codec); + return codec; + } catch (ClassNotFoundException e) { + throw new BadConfigurationException("Class " + codecClassName + " was not found", e); + } + } + /** * @param codecName the requested codec * @return the corresponding hadoop codec. null if UNCOMPRESSED @@ -316,10 +445,10 @@ private String cacheKey(CompressionCodecName codecName) { String level = null; switch (codecName) { case GZIP: - level = conf.get("zlib.compress.level"); + level = conf.get(GZIP_COMPRESS_LEVEL); break; case BROTLI: - level = conf.get("compression.brotli.quality"); + level = conf.get(BROTLI_COMPRESS_QUALITY); break; case ZSTD: level = conf.get("parquet.compression.codec.zstd.level"); @@ -331,6 +460,11 @@ private String cacheKey(CompressionCodecName codecName) { return level == null ? codecClass : codecClass + ":" + level; } + private String cacheKey(CompressionCodecName codecName, int level) { + String codecClass = codecName.getHadoopCompressionCodecClassName(); + return (codecClass == null ? codecName.name() : codecClass) + ":" + level; + } + @Override public void release() { for (BytesCompressor compressor : compressors.values()) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index c0cf216cc9..69c28b2cdf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -25,8 +25,10 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.zip.CRC32; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferReleaser; @@ -44,6 +46,7 @@ import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; import org.apache.parquet.crypto.AesCipher; import org.apache.parquet.crypto.InternalColumnEncryptionSetup; @@ -53,6 +56,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory.BytesCompressor; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.io.ParquetEncodingException; @@ -583,21 +587,7 @@ public ColumnChunkPageWriteStore( int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) { this.schema = schema; - for (ColumnDescriptor path : schema.getColumns()) { - writers.put( - path, - new ColumnChunkPageWriter( - path, - compressor, - allocator, - columnIndexTruncateLength, - pageWriteChecksumEnabled, - null, - null, - null, - -1, - -1)); - } + initWriters(col -> compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, null, -1); } @Deprecated @@ -628,13 +618,138 @@ public ColumnChunkPageWriteStore( InternalFileEncryptor fileEncryptor, int rowGroupOrdinal) { this.schema = schema; + initWriters( + col -> compressor, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal); + } + + private ColumnChunkPageWriteStore( + Function compressorProvider, + MessageType schema, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor fileEncryptor, + int rowGroupOrdinal) { + this.schema = schema; + initWriters( + compressorProvider, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal); + } + + static Function compressorProvider( + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties) { + return column -> resolveCompressor(column, codecFactory, defaultCodec, parquetProperties); + } + + private static BytesInputCompressor resolveCompressor( + ColumnDescriptor column, + CompressionCodecFactory codecFactory, + CompressionCodecName defaultCodec, + ParquetProperties parquetProperties) { + CompressionCodecName columnCodec = parquetProperties.getColumnCodec(column); + CompressionCodecName codec = columnCodec != null ? columnCodec : defaultCodec; + Integer level = parquetProperties.getColumnCompressionLevel(column); + if (level != null && columnCodec == null) { + LOG.warn( + "Column '{}': compression level {} set without a per-column codec; " + + "applying level to the default codec ({}).", + ColumnPath.get(column.getPath()), + level, + defaultCodec); + } + return level != null ? codecFactory.getCompressor(codec, level) : codecFactory.getCompressor(codec); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private Function compressorProvider; + private MessageType schema; + private ByteBufferAllocator allocator; + private int columnIndexTruncateLength = ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; + private boolean pageWriteChecksumEnabled = ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; + private InternalFileEncryptor fileEncryptor = null; + private int rowGroupOrdinal = -1; + + private Builder() {} + + public Builder withCompressorProvider(Function compressorProvider) { + this.compressorProvider = compressorProvider; + return this; + } + + public Builder withSchema(MessageType schema) { + this.schema = schema; + return this; + } + + public Builder withAllocator(ByteBufferAllocator allocator) { + this.allocator = allocator; + return this; + } + + public Builder withColumnIndexTruncateLength(int columnIndexTruncateLength) { + this.columnIndexTruncateLength = columnIndexTruncateLength; + return this; + } + + public Builder withPageWriteChecksumEnabled(boolean pageWriteChecksumEnabled) { + this.pageWriteChecksumEnabled = pageWriteChecksumEnabled; + return this; + } + + public Builder withFileEncryptor(InternalFileEncryptor fileEncryptor) { + this.fileEncryptor = fileEncryptor; + return this; + } + + public Builder withRowGroupOrdinal(int rowGroupOrdinal) { + this.rowGroupOrdinal = rowGroupOrdinal; + return this; + } + + public ColumnChunkPageWriteStore build() { + Objects.requireNonNull(compressorProvider, "compressorProvider cannot be null"); + Objects.requireNonNull(schema, "schema cannot be null"); + Objects.requireNonNull(allocator, "allocator cannot be null"); + return new ColumnChunkPageWriteStore( + compressorProvider, + schema, + allocator, + columnIndexTruncateLength, + pageWriteChecksumEnabled, + fileEncryptor, + rowGroupOrdinal); + } + } + + private void initWriters( + Function compressorFn, + ByteBufferAllocator allocator, + int columnIndexTruncateLength, + boolean pageWriteChecksumEnabled, + InternalFileEncryptor fileEncryptor, + int rowGroupOrdinal) { if (null == fileEncryptor) { for (ColumnDescriptor path : schema.getColumns()) { writers.put( path, new ColumnChunkPageWriter( path, - compressor, + compressorFn.apply(path), allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, @@ -666,7 +781,7 @@ public ColumnChunkPageWriteStore( path, new ColumnChunkPageWriter( path, - compressor, + compressorFn.apply(path), allocator, columnIndexTruncateLength, pageWriteChecksumEnabled, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index b2b5233eeb..312ccedf8f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -110,6 +110,20 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName) } } + @Override + protected BytesCompressor createCompressorAtLevel(final CompressionCodecName codecName, final int level) { + switch (codecName) { + case SNAPPY: + // Snappy has no compression level; keep using the direct compressor. + LOG.warn("Compression level {} is not supported for codec {} and will be ignored.", level, codecName); + return new SnappyCompressor(); + case ZSTD: + return new ZstdCompressor(level); + default: + return super.createCompressorAtLevel(codecName, level); + } + } + @Override protected BytesDecompressor createDecompressor(final CompressionCodecName codecName) { switch (codecName) { @@ -409,11 +423,21 @@ private class ZstdCompressor extends BaseCompressor { private final ZstdCompressCtx context; ZstdCompressor() { - context = new ZstdCompressCtx(); - context.setLevel(conf.getInt( + context = newContext(conf.getInt( ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL)); - context.setWorkers(conf.getInt( + } + + ZstdCompressor(int level) { + validateZstdLevel(level); + context = newContext(level); + } + + private ZstdCompressCtx newContext(int level) { + ZstdCompressCtx ctx = new ZstdCompressCtx(); + ctx.setLevel(level); + ctx.setWorkers(conf.getInt( ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); + return ctx; } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index dd51d1ef09..d503cbb4eb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -25,6 +25,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.function.Function; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; @@ -51,7 +53,7 @@ class InternalParquetRecordWriter { private long rowGroupSizeThreshold; private final int rowGroupRecordCountThreshold; private long nextRowGroupSize; - private final BytesInputCompressor compressor; + private final Function compressorProvider; private final boolean validating; private final ParquetProperties props; @@ -76,7 +78,6 @@ class InternalParquetRecordWriter { * @param schema the schema of the records * @param extraMetaData extra meta data to write in the footer of the file * @param rowGroupSize the size of a block in the file (this will be approximate) - * @param compressor the codec used to compress */ public InternalParquetRecordWriter( ParquetFileWriter parquetFileWriter, @@ -84,7 +85,7 @@ public InternalParquetRecordWriter( MessageType schema, Map extraMetaData, long rowGroupSize, - BytesInputCompressor compressor, + Function compressorProvider, boolean validating, ParquetProperties props) { this.parquetFileWriter = parquetFileWriter; @@ -94,7 +95,7 @@ public InternalParquetRecordWriter( this.rowGroupSizeThreshold = rowGroupSize; this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); this.nextRowGroupSize = rowGroupSizeThreshold; - this.compressor = compressor; + this.compressorProvider = compressorProvider; this.validating = validating; this.props = props; this.fileEncryptor = parquetFileWriter.getEncryptor(); @@ -108,14 +109,15 @@ public ParquetMetadata getFooter() { } private void initStore() { - ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore( - compressor, - schema, - props.getAllocator(), - props.getColumnIndexTruncateLength(), - props.getPageWriteChecksumEnabled(), - fileEncryptor, - rowGroupOrdinal); + ColumnChunkPageWriteStore columnChunkPageWriteStore = ColumnChunkPageWriteStore.builder() + .withCompressorProvider(compressorProvider) + .withSchema(schema) + .withAllocator(props.getAllocator()) + .withColumnIndexTruncateLength(props.getColumnIndexTruncateLength()) + .withPageWriteChecksumEnabled(props.getPageWriteChecksumEnabled()) + .withFileEncryptor(fileEncryptor) + .withRowGroupOrdinal(rowGroupOrdinal) + .build(); pageStore = columnChunkPageWriteStore; bloomFilterWriteStore = columnChunkPageWriteStore; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 868ae634c1..4db288f455 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -163,6 +163,7 @@ public static enum JobSummaryLevel { public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; public static final String STATISTICS_ENABLED = "parquet.column.statistics.enabled"; public static final String SIZE_STATISTICS_ENABLED = "parquet.size.statistics.enabled"; + public static final String COLUMN_COMPRESSION_LEVEL_PREFIX = "parquet.compression.level"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -222,6 +223,14 @@ public static void setCompression(Job job, CompressionCodecName compression) { getConfiguration(job).set(COMPRESSION, compression.name()); } + public static void setColumnCompression(Job job, String columnPath, CompressionCodecName codec) { + getConfiguration(job).set(COMPRESSION + '#' + columnPath, codec.name()); + } + + public static void setColumnCompressionLevel(Job job, String columnPath, int level) { + getConfiguration(job).setInt(COLUMN_COMPRESSION_LEVEL_PREFIX + '#' + columnPath, level); + } + public static void setEnableDictionary(Job job, boolean enableDictionary) { getConfiguration(job).setBoolean(ENABLE_DICTIONARY, enableDictionary); } @@ -546,6 +555,14 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp STATISTICS_ENABLED, key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED), propsBuilder::withStatisticsEnabled) + .withColumnConfig( + COMPRESSION, + key -> CompressionCodecName.fromConf(conf.get(key)), + propsBuilder::withCompressionCodec) + .withColumnConfig( + COLUMN_COMPRESSION_LEVEL_PREFIX, + key -> conf.getInt(key, -1), + propsBuilder::withCompressionLevel) .parseConfig(conf); ParquetProperties props = propsBuilder.build(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index 51528b10be..80a6c69d11 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -104,7 +104,7 @@ public ParquetRecordWriter( .withWriterVersion(writerVersion) .build(); internalWriter = new InternalParquetRecordWriter( - w, writeSupport, schema, extraMetaData, blockSize, compressor, validating, props); + w, writeSupport, schema, extraMetaData, blockSize, col -> compressor, validating, props); this.memoryManager = null; this.codecFactory = null; } @@ -173,7 +173,7 @@ public ParquetRecordWriter( .withWriterVersion(writerVersion) .build(); internalWriter = new InternalParquetRecordWriter( - w, writeSupport, schema, extraMetaData, blockSize, compressor, validating, props); + w, writeSupport, schema, extraMetaData, blockSize, col -> compressor, validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); memoryManager.addWriter(internalWriter, blockSize); this.codecFactory = null; @@ -207,7 +207,7 @@ public ParquetRecordWriter( schema, extraMetaData, blockSize, - codecFactory.getCompressor(codec), + ColumnChunkPageWriteStore.compressorProvider(codecFactory, codec, props), validating, props); this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 8eb5f7f17b..41d9d1264a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -395,7 +395,6 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport fileWriter.start(); this.codecFactory = codecFactory; - CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName); final Map extraMetadata; if (encodingProps.getExtraMetaData() == null @@ -418,7 +417,14 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport } this.writer = new InternalParquetRecordWriter( - fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps); + fileWriter, + writeSupport, + schema, + extraMetadata, + rowGroupSize, + ColumnChunkPageWriteStore.compressorProvider(codecFactory, compressionCodecName, encodingProps), + validating, + encodingProps); } public void write(T object) throws IOException { @@ -569,6 +575,32 @@ public SELF withCompressionCodec(CompressionCodecName codecName) { return self(); } + /** + * Override the compression codec for a specific column. + * + * @param columnPath dot-string path of the column (e.g. {@code "my_col"}) + * @param codecName the codec to use for that column + * @return this builder for method chaining. + */ + public SELF withCompressionCodec(String columnPath, CompressionCodecName codecName) { + encodingPropsBuilder.withCompressionCodec(columnPath, codecName); + return self(); + } + + /** + * Set the compression level for a specific column. + * The valid range is codec-specific (e.g. ZSTD: 1–22 (default 3), GZIP: 0–9 or -1 (default 6), BROTLI: 0–11 (default 1)). + * Pass {@code null} to unset a previously configured level for that column. + * + * @param columnPath dot-string path of the column (e.g. {@code "my_col"}) + * @param level codec-specific compression level, or {@code null} to unset + * @return this builder for method chaining. + */ + public SELF withCompressionLevel(String columnPath, Integer level) { + encodingPropsBuilder.withCompressionLevel(columnPath, level); + return self(); + } + /** * Set the {@link CompressionCodecFactory codec factory} used by the * constructed writer. diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java index a17cf678f5..3b6a82e76d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java @@ -22,7 +22,9 @@ import static org.apache.parquet.column.Encoding.RLE; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.schema.OriginalType.UTF8; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; @@ -32,6 +34,8 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -42,6 +46,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -319,4 +324,132 @@ public void testColumnOrderV1() throws IOException { private BytesInputCompressor compressor(CompressionCodecName codec) { return new CodecFactory(conf, pageSize).getCompressor(codec); } + + @Test + public void perColumnCodec_defaultUsedWhenNotSet() throws Exception { + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder().build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(SNAPPY, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = + ParquetProperties.builder().withCompressionCodec("col_a", ZSTD).build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_allColumnsOverridden() throws Exception { + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_b", GZIP) + .build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_withCodec_roundTrip() throws Exception { + MessageType schema = + MessageTypeParser.parseMessageType("message test { required binary col_a; required int32 col_b; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 10) + .build(); + + Map codecs = writeAndReadCodecs(schema, SNAPPY, props); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType("message test { required binary col_a; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 23) + .build(); + + BadConfigurationException ex = + assertThrows(BadConfigurationException.class, () -> writeAndReadCodecs(schema, SNAPPY, props)); + assertTrue(ex.getMessage().contains("23")); + } + + @Test + public void perColumnLevel_invalidGzipLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType("message test { required binary col_a; }"); + ParquetProperties props = ParquetProperties.builder() + .withCompressionCodec("col_a", GZIP) + .withCompressionLevel("col_a", 10) + .build(); + + BadConfigurationException ex = + assertThrows(BadConfigurationException.class, () -> writeAndReadCodecs(schema, SNAPPY, props)); + assertTrue(ex.getMessage().contains("10")); + } + + private Map writeAndReadCodecs( + MessageType schema, CompressionCodecName defaultCodec, ParquetProperties props) throws Exception { + Path file = new Path("target/test/TestColumnChunkPageWriteStore/perColumnCodec.parquet"); + FileSystem fs = file.getFileSystem(conf); + fs.delete(file, false); + fs.mkdirs(file.getParent()); + + allocator = TrackingByteBufferAllocator.wrap(new HeapByteBufferAllocator()); + CodecFactory codecFactory = new CodecFactory(conf, pageSize); + + OutputFileForTesting outputFile = new OutputFileForTesting(file, conf); + ParquetFileWriter writer = new ParquetFileWriter( + outputFile, + schema, + Mode.CREATE, + ParquetWriter.DEFAULT_BLOCK_SIZE, + ParquetWriter.MAX_PADDING_SIZE_DEFAULT, + null, + ParquetProperties.builder().withAllocator(allocator).build()); + writer.start(); + writer.startBlock(1); + + try (ColumnChunkPageWriteStore store = ColumnChunkPageWriteStore.builder() + .withCompressorProvider(ColumnChunkPageWriteStore.compressorProvider(codecFactory, defaultCodec, props)) + .withSchema(schema) + .withAllocator(allocator) + .withColumnIndexTruncateLength(Integer.MAX_VALUE) + .build()) { + for (ColumnDescriptor col : schema.getColumns()) { + Statistics stats = + Statistics.getBuilderForReading(col.getPrimitiveType()).build(); + store.getPageWriter(col).writePage(BytesInput.fromInt(42), 1, 1, stats, RLE, RLE, PLAIN); + } + store.flushToFileWriter(writer); + } + + writer.endBlock(); + writer.end(new HashMap<>()); + + ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER); + Map result = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + result.put(col.getPath().toDotString(), col.getCodec()); + } + return result; + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index c78ee09ecc..a0cdc93427 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -18,12 +18,17 @@ package org.apache.parquet.hadoop; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.BROTLI; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4_RAW; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZO; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.HashSet; import java.util.Random; import java.util.Set; @@ -285,4 +290,158 @@ public void cachingKeysZstd() { Assert.assertEquals(codec_2_1, codec_2_2); Assert.assertNotEquals(codec_2_1, codec_5_1); } + + @Test + public void levelAwareCompressor_sameLevel_returnsCachedInstance() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor c1 = factory.getCompressor(ZSTD, 3); + BytesInputCompressor c2 = factory.getCompressor(ZSTD, 3); + Assert.assertSame("Same codec+level should return the cached instance", c1, c2); + factory.release(); + } + + @Test + public void levelAwareCompressor_differentLevels_returnsDifferentInstances() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor c1 = factory.getCompressor(ZSTD, 1); + BytesInputCompressor c3 = factory.getCompressor(ZSTD, 3); + Assert.assertNotSame("Different levels should return different compressor instances", c1, c3); + factory.release(); + } + + @Test + public void levelAwareCompressor_levelCacheIsolatedFromNoLevelCache() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor noLevel = factory.getCompressor(ZSTD); + BytesInputCompressor withLevel = factory.getCompressor(ZSTD, 3); + Assert.assertNotSame( + "Level-aware and no-level compressors should use separate cache entries", noLevel, withLevel); + factory.release(); + } + + @Test + public void levelAwareCompressor_uncompressed_returnsNoOpCompressor() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor comp = factory.getCompressor(UNCOMPRESSED, 5); + Assert.assertSame(CodecFactory.NO_OP_COMPRESSOR, comp); + factory.release(); + } + + @Test + public void levelAwareCompressor_snappy_ignoresLevel() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + BytesInputCompressor comp = factory.getCompressor(SNAPPY, 99); + Assert.assertNotNull(comp); + Assert.assertEquals(SNAPPY, comp.getCodecName()); + factory.release(); + } + + @Test + public void levelAwareCompressor_gzip_invalidLevel_throwsBadConfigurationException() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + try { + BadConfigurationException ex = + Assert.assertThrows(BadConfigurationException.class, () -> factory.getCompressor(GZIP, 99)); + Assert.assertTrue(ex.getMessage().contains("99")); + } finally { + factory.release(); + } + } + + @Test + public void levelAwareCompressor_gzip_validBoundaryLevels_noException() { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + for (int level : new int[] {-1, 0, 1, 9}) { + BytesInputCompressor comp = factory.getCompressor(GZIP, level); + Assert.assertNotNull("Compressor should not be null for GZIP level " + level, comp); + Assert.assertEquals("Codec name should be GZIP for level " + level, GZIP, comp.getCodecName()); + } + factory.release(); + } + + @Test + public void levelAwareCompressor_zstd_roundTrip() throws IOException { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + byte[] original = "hello parquet per-column compression".getBytes(StandardCharsets.UTF_8); + BytesInputDecompressor decompressor = factory.getDecompressor(ZSTD); + for (int level : new int[] {1, 3, 10, 22}) { + BytesInput compressed = factory.getCompressor(ZSTD, level).compress(BytesInput.from(original)); + byte[] result = decompressor.decompress(compressed, original.length).toByteArray(); + Assert.assertArrayEquals("Round-trip failed at ZSTD level " + level, original, result); + } + factory.release(); + } + + @Test + public void levelAwareCompressor_gzip_roundTrip() throws IOException { + CodecFactory factory = new CodecFactory(new Configuration(), pageSize); + byte[] original = "hello parquet per-column compression".getBytes(StandardCharsets.UTF_8); + BytesInputDecompressor decompressor = factory.getDecompressor(GZIP); + for (int level : new int[] {1, 5, 9}) { + BytesInput compressed = factory.getCompressor(GZIP, level).compress(BytesInput.from(original)); + byte[] result = decompressor.decompress(compressed, original.length).toByteArray(); + Assert.assertArrayEquals("Round-trip failed at GZIP level " + level, original, result); + } + factory.release(); + } + + @Test + public void directFactory_levelAwareCompressor_usesDirectCompressorAndRoundTrips() throws IOException { + CodecFactory heap = new CodecFactory(new Configuration(), pageSize); + CodecFactory direct = + CodecFactory.createDirectCodecFactory(new Configuration(), new DirectByteBufferAllocator(), pageSize); + try { + // Sanity: the heap factory's leveled path yields a HeapBytesCompressor. + Assert.assertTrue( + "heap factory should produce a HeapBytesCompressor", + heap.getCompressor(ZSTD, 3) instanceof CodecFactory.HeapBytesCompressor); + + // The direct factory must not fall back to the heap/Hadoop path for leveled ZSTD/SNAPPY. + BytesInputCompressor directZstd = direct.getCompressor(ZSTD, 3); + Assert.assertFalse( + "direct factory ZSTD(level) should not fall back to HeapBytesCompressor", + directZstd instanceof CodecFactory.HeapBytesCompressor); + Assert.assertEquals(ZSTD, directZstd.getCodecName()); + Assert.assertEquals( + "leveled ZSTD should use the same direct compressor type as the no-level path", + direct.getCompressor(ZSTD).getClass(), + directZstd.getClass()); + + BytesInputCompressor directSnappy = direct.getCompressor(SNAPPY, 5); + Assert.assertFalse( + "direct factory SNAPPY(level) should not fall back to HeapBytesCompressor", + directSnappy instanceof CodecFactory.HeapBytesCompressor); + Assert.assertEquals( + "leveled SNAPPY should use the same direct compressor type as the no-level path", + direct.getCompressor(SNAPPY).getClass(), + directSnappy.getClass()); + + // The direct ZSTD level path must compress/decompress correctly at each level. + byte[] original = "hello parquet per-column zstd direct compression".getBytes(StandardCharsets.UTF_8); + BytesInputDecompressor decompressor = heap.getDecompressor(ZSTD); + for (int level : new int[] {1, 3, 22}) { + byte[] compressed = direct.getCompressor(ZSTD, level) + .compress(BytesInput.from(original)) + .toByteArray(); + byte[] result = decompressor + .decompress(BytesInput.from(compressed), original.length) + .toByteArray(); + Assert.assertArrayEquals("Direct ZSTD round-trip failed at level " + level, original, result); + } + } finally { + heap.release(); + direct.release(); + } + } + + @Test + public void directFactory_levelAwareCompressor_invalidZstdLevel_throwsBadConfigurationException() { + CodecFactory direct = + CodecFactory.createDirectCodecFactory(new Configuration(), new DirectByteBufferAllocator(), pageSize); + try { + Assert.assertThrows(BadConfigurationException.class, () -> direct.getCompressor(ZSTD, 23)); + } finally { + direct.release(); + } + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 6b3b0aae25..a8b4783fe2 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -27,7 +27,10 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.ZSTD; import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; @@ -55,6 +58,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; @@ -86,6 +91,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopOutputFile; @@ -826,6 +832,231 @@ public void testParquetWriterBuilderCanNotConfigurePathAndFile() throws IOExcept ExampleParquetWriter.builder(path).withFile(outputFile).build()); } + @Test + public void perColumnCodec_overridesDefaultForOneColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnCodec_defaultUsedWhenNoOverride() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(GZIP) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + assertEquals("Column " + col.getPath().toDotString() + " should use default codec", GZIP, col.getCodec()); + } + } + + @Test + public void perColumnLevel_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 1) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 42)); + } + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals(42, group.getInteger("col_b", 0)); + assertNull(reader.read()); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void perColumnLevel_invalidZstdLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // ZSTD only supports levels 1-22; level 23 is invalid + Assert.assertThrows(BadConfigurationException.class, () -> { + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec("col_a", ZSTD) + .withCompressionLevel("col_a", 23) + .build()) { + // exception expected before first write + } + }); + } + + @Test + public void perColumnLevel_invalidLevel_throwsBadConfigurationException() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // GZIP only supports levels -1 (default) through 9; level 10 is invalid + Assert.assertThrows(BadConfigurationException.class, () -> { + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec("col_a", GZIP) + .withCompressionLevel("col_a", 10) + .build()) { + // exception expected before first write + } + }); + } + + @Test + public void perColumnLevel_differentLevelsPerColumn_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(BINARY) + .as(stringType()) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + // Both columns use ZSTD (from default), but at different levels. + // This exercises the level-only override path in resolveCompressor(). + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(ZSTD) + .withCompressionLevel("col_a", 1) + .withCompressionLevel("col_b", 10) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "fast").append("col_b", "best")); + } + + // Both columns must report ZSTD in the footer + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + assertEquals("Column " + col.getPath().toDotString() + " should use ZSTD", ZSTD, col.getCodec()); + } + + // Data must survive the round-trip at both levels + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("fast", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals("best", group.getBinary("col_b", 0).toStringUsingUTF8()); + assertNull(reader.read()); + } + } + + @Test + public void perColumnCodec_allColumnsOverridden() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withType(schema) + .withCompressionCodec(SNAPPY) + .withCompressionCodec("col_a", ZSTD) + .withCompressionCodec("col_b", GZIP) + .build()) { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("col_a", "hello").append("col_b", 1)); + } + + ParquetMetadata footer = readFooter(new Configuration(), path, NO_FILTER); + Map codecs = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + codecs.put(col.getPath().toDotString(), col.getCodec()); + } + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + @Test public void testNoFlushAfterException() throws Exception { final File testDir = temp.newFile(); @@ -864,6 +1095,163 @@ public void testNoFlushAfterException() throws Exception { assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0); } + @Test + public void outputFormat_setColumnCompression_overridesDefaultForOneColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompression_allColumnsOverridden() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + ParquetOutputFormat.setColumnCompression(job, "col_b", GZIP); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompression_defaultUsedWhenNotSet() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, GZIP, job, path); + + assertEquals(GZIP, codecs.get("col_a")); + assertEquals(GZIP, codecs.get("col_b")); + } + + @Test + public void outputFormat_setColumnCompressionLevel_withCodec_dataRoundTrips() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(INT32) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompression(job, "col_a", ZSTD); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_a", 1); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, SNAPPY, job, path); + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(SNAPPY, codecs.get("col_b")); + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals(42, group.getInteger("col_b", 0)); + assertNull(reader.read()); + } + } + + @Test + public void outputFormat_setColumnCompressionLevel_differentLevelsPerColumn() throws Exception { + MessageType schema = Types.buildMessage() + .required(BINARY) + .as(stringType()) + .named("col_a") + .required(BINARY) + .as(stringType()) + .named("col_b") + .named("test"); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + Job job = Job.getInstance(); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_a", 1); + ParquetOutputFormat.setColumnCompressionLevel(job, "col_b", 10); + + Map codecs = writeAndReadCodecsViaOutputFormat(schema, ZSTD, job, path); + assertEquals(ZSTD, codecs.get("col_a")); + assertEquals(ZSTD, codecs.get("col_b")); + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + Group group = reader.read(); + assertEquals("hello", group.getBinary("col_a", 0).toStringUsingUTF8()); + assertEquals("fast", group.getBinary("col_b", 0).toStringUsingUTF8()); + assertNull(reader.read()); + } + } + + @SuppressWarnings("unchecked") + private Map writeAndReadCodecsViaOutputFormat( + MessageType schema, CompressionCodecName defaultCodec, Job job, Path file) throws Exception { + Configuration conf = job.getConfiguration(); + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + + Group group = f.newGroup(); + for (int i = 0; i < schema.getFieldCount(); i++) { + String name = schema.getFieldName(i); + switch (schema.getType(i).asPrimitiveType().getPrimitiveTypeName()) { + case BINARY: + group.append(name, name.equals("col_a") ? "hello" : "fast"); + break; + case INT32: + group.append(name, 42); + break; + default: + break; + } + } + + ParquetOutputFormat outputFormat = new ParquetOutputFormat<>(new GroupWriteSupport()); + RecordWriter writer = outputFormat.getRecordWriter(conf, file, defaultCodec); + writer.write(null, group); + writer.close(null); + + ParquetMetadata footer = readFooter(conf, file, NO_FILTER); + Map result = new HashMap<>(); + for (ColumnChunkMetaData col : footer.getBlocks().get(0).getColumns()) { + result.put(col.getPath().toDotString(), col.getCodec()); + } + return result; + } + @Test public void testV2PageNullCountWithStatisticsDisabled() throws Exception { // Regression test: when using PARQUET_2_0 with statistics disabled on a nullable column,