diff --git a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java index 55e23fe4..a1b3028a 100644 --- a/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java +++ b/ice-rest-catalog/src/main/java/com/altinity/ice/rest/catalog/internal/maintenance/DataCompaction.java @@ -170,6 +170,7 @@ private void merge( Parquet.WriteBuilder writeBuilder = Parquet.write(outputFile) + .setAll(table.properties()) .overwrite(false) .createWriterFunc(GenericParquetWriter::buildWriter) .metricsConfig(metricsConfig) diff --git a/ice/src/main/java/com/altinity/ice/cli/Main.java b/ice/src/main/java/com/altinity/ice/cli/Main.java index a10e1da0..76564df7 100644 --- a/ice/src/main/java/com/altinity/ice/cli/Main.java +++ b/ice/src/main/java/com/altinity/ice/cli/Main.java @@ -414,6 +414,11 @@ void insert( description = "Number of threads to use for inserting data", defaultValue = "-1") int threadCount, + @CommandLine.Option( + names = {"--compression"}, + description = + "Parquet compression codec: gzip (default), zstd, snappy, lz4, brotli, uncompressed, or as-source") + String compression, @CommandLine.Option( names = {"--watch"}, description = "Event queue. Supported: AWS SQS") @@ -506,6 +511,7 @@ void insert( .sortOrderList(sortOrders) .threadCount( threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount) + .compression(compression) .build(); if (!watchMode) { diff --git a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java index 1e510680..31b1a7aa 100644 --- a/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java +++ b/ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java @@ -28,8 +28,10 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -76,6 +78,7 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.rest.RESTCatalog; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; @@ -108,6 +111,19 @@ public static Result run( return new Result(0, 0); } + if (options.compression() != null) { + Set valid = + Arrays.stream(CompressionCodecName.values()) + .map(c -> c.name().toLowerCase(Locale.ENGLISH)) + .collect(Collectors.toCollection(HashSet::new)); + valid.add("as-source"); + if (!valid.contains(options.compression().toLowerCase(Locale.ENGLISH))) { + String accepted = String.join(", ", new TreeSet<>(valid)); + throw new IllegalArgumentException( + "Unknown --compression value: " + options.compression() + ". Accepted: " + accepted); + } + } + Table table = catalog.loadTable(nsTable); // Create transaction and pass it to updatePartitionAndSortOrderMetadata @@ -501,66 +517,92 @@ private static List processFile( .build()); dataFileSizeInBytes = inputFile.getLength(); dataFile = dstDataFile; - } else if (partitionSpec.isPartitioned() && partitionKey == null) { - return copyPartitionedAndSorted( - file, - tableSchema, - partitionSpec, - sortOrder, - metricsConfig, - tableIO, - inputFile, - dstDataFileSource); - } else if (sortOrder.isSorted() && !sorted) { - return Collections.singletonList( - copySorted( - file, - dstDataFileSource.get(file), - tableSchema, - partitionSpec, - sortOrder, - metricsConfig, - tableIO, - inputFile, - dataFileNamingStrategy, - partitionKey)); } else { - // Table isn't partitioned or sorted. Copy as is. - String dstDataFile = dstDataFileSource.get(file); - if (checkNotExists.apply(dstDataFile)) { - return Collections.emptyList(); + // Copy path: compute compression override from CLI or as-source + String compressionCodecOverride = null; + if (options.compression() != null) { + if ("as-source".equalsIgnoreCase(options.compression())) { + var blocks = metadata.getBlocks(); + if (!blocks.isEmpty()) { + compressionCodecOverride = + blocks.get(0).getColumns().get(0).getCodec().name().toLowerCase(); + } + } else { + compressionCodecOverride = options.compression().toLowerCase(); + } } - OutputFile outputFile = - tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); - // TODO: support transferTo below (note that compression, etc. might be different) - // try (var d = outputFile.create()) { - // try (var s = inputFile.newStream()) { s.transferTo(d); } - // } - Parquet.ReadBuilder readBuilder = - Parquet.read(inputFile) - .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) - .project(tableSchema) - .reuseContainers(); - Parquet.WriteBuilder writeBuilder = - Parquet.write(outputFile) - .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) - .createWriterFunc(GenericParquetWriter::buildWriter) - .metricsConfig(metricsConfig) - .schema(tableSchema); + if (partitionSpec.isPartitioned() && partitionKey == null) { + return copyPartitionedAndSorted( + file, + tableSchema, + partitionSpec, + sortOrder, + metricsConfig, + tableIO, + inputFile, + dstDataFileSource, + table.properties(), + compressionCodecOverride); + } else if (sortOrder.isSorted() && !sorted) { + return Collections.singletonList( + copySorted( + file, + dstDataFileSource.get(file), + tableSchema, + partitionSpec, + sortOrder, + metricsConfig, + tableIO, + inputFile, + dataFileNamingStrategy, + partitionKey, + table.properties(), + compressionCodecOverride)); + } else { + // Table isn't partitioned or sorted. Copy as is. + String dstDataFile = dstDataFileSource.get(file); + if (checkNotExists.apply(dstDataFile)) { + return Collections.emptyList(); + } + OutputFile outputFile = + tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://")); + // TODO: support transferTo below (note that compression, etc. might be different) + // try (var d = outputFile.create()) { + // try (var s = inputFile.newStream()) { s.transferTo(d); } + // } + Parquet.ReadBuilder readBuilder = + Parquet.read(inputFile) + .createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s)) + .project(tableSchema) + .reuseContainers(); + + Parquet.WriteBuilder writeBuilder = + Parquet.write(outputFile) + .setAll(table.properties()) + .overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) + .createWriterFunc(GenericParquetWriter::buildWriter) + .metricsConfig(metricsConfig) + .schema(tableSchema); + if (compressionCodecOverride != null) { + + writeBuilder = + writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride); + } - logger.info("{}: copying to {}", file, dstDataFile); + logger.info("{}: copying to {}", file, dstDataFile); - try (CloseableIterable parquetReader = readBuilder.build()) { - try (FileAppender writer = writeBuilder.build()) { - writer.addAll(parquetReader); - writer.close(); // for write.length() - dataFileSizeInBytes = writer.length(); - metrics = writer.metrics(); + try (CloseableIterable parquetReader = readBuilder.build()) { + try (FileAppender writer = writeBuilder.build()) { + writer.addAll(parquetReader); + writer.close(); // for write.length() + dataFileSizeInBytes = writer.length(); + metrics = writer.metrics(); + } } - } - dataFile = dstDataFile; + dataFile = dstDataFile; + } } logger.info( "{}: adding data file (copy took {}s)", file, (System.currentTimeMillis() - start) / 1000); @@ -588,7 +630,9 @@ private static List copyPartitionedAndSorted( MetricsConfig metricsConfig, FileIO tableIO, InputFile inputFile, - DataFileNamingStrategy dstDataFileSource) + DataFileNamingStrategy dstDataFileSource, + Map tableProperties, + @Nullable String compressionCodecOverride) throws IOException { logger.info("{}: partitioning{}", file, sortOrder.isSorted() ? "+sorting" : ""); @@ -622,10 +666,15 @@ private static List copyPartitionedAndSorted( Parquet.WriteBuilder writeBuilder = Parquet.write(outputFile) + .setAll(tableProperties) .overwrite(true) // FIXME .createWriterFunc(GenericParquetWriter::buildWriter) .metricsConfig(metricsConfig) .schema(tableSchema); + if (compressionCodecOverride != null) { + writeBuilder = + writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride); + } try (FileAppender writer = writeBuilder.build()) { for (Record record : records) { @@ -668,7 +717,9 @@ private static DataFile copySorted( FileIO tableIO, InputFile inputFile, DataFileNamingStrategy.Name dataFileNamingStrategy, - PartitionKey partitionKey) + PartitionKey partitionKey, + Map tableProperties, + @Nullable String compressionCodecOverride) throws IOException { logger.info("{}: copying (sorted) to {}", file, dstDataFile); @@ -698,11 +749,16 @@ private static DataFile copySorted( // Write sorted records to outputFile Parquet.WriteBuilder writeBuilder = Parquet.write(outputFile) + .setAll(tableProperties) .overwrite( dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) // FIXME .createWriterFunc(GenericParquetWriter::buildWriter) .metricsConfig(metricsConfig) .schema(tableSchema); + if (compressionCodecOverride != null) { + writeBuilder = + writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride); + } long fileSizeInBytes; Metrics metrics; @@ -793,7 +849,8 @@ public record Options( @Nullable String retryListFile, @Nullable List partitionList, @Nullable List sortOrderList, - int threadCount) { + int threadCount, + @Nullable String compression) { public static Builder builder() { return new Builder(); @@ -816,6 +873,7 @@ public static final class Builder { private List partitionList = List.of(); private List sortOrderList = List.of(); private int threadCount = Runtime.getRuntime().availableProcessors(); + private String compression; private Builder() {} @@ -899,6 +957,11 @@ public Builder threadCount(int threadCount) { return this; } + public Builder compression(String compression) { + this.compression = compression; + return this; + } + public Options build() { return new Options( dataFileNamingStrategy, @@ -916,7 +979,8 @@ public Options build() { retryListFile, partitionList, sortOrderList, - threadCount); + threadCount, + compression); } } }