diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java index c92527c5d0..dbfc4529f5 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ConvertCommand.java @@ -42,6 +42,7 @@ import org.apache.parquet.cli.util.Schemas; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.StandardOutputFile; import org.slf4j.Logger; @Parameters(commandDescription = "Create a Parquet file from a data file") @@ -56,8 +57,8 @@ public ConvertCommand(Logger console) { @Parameter( names = {"-o", "--output"}, - description = "Output file path", - required = true) + description = "Output file path, if not given writes to stdout", + required = false) String outputPath = null; @Parameter( @@ -112,18 +113,11 @@ public int run() throws IOException { } Schema projection = filterSchema(schema, columns); - Path outPath = qualifiedPath(outputPath); - FileSystem outFS = outPath.getFileSystem(getConf()); - if (overwrite && outFS.exists(outPath)) { - console.debug("Deleting output file {} (already exists)", outPath); - outFS.delete(outPath); - } - Iterable reader = openDataFile(source, projection); boolean threw = true; long count = 0; try { - try (ParquetWriter writer = AvroParquetWriter.builder(qualifiedPath(outputPath)) + try (ParquetWriter writer = createBuilder() .withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0) .withConf(getConf()) .withCompressionCodec(codec) @@ -151,6 +145,19 @@ public int run() throws IOException { return 0; } + private AvroParquetWriter.Builder createBuilder() throws IOException { + if (outputPath != null) { + Path outPath = qualifiedPath(outputPath); + FileSystem outFS = outPath.getFileSystem(getConf()); + if (overwrite && outFS.exists(outPath)) { + console.debug("Deleting output file {} (already exists)", outPath); + outFS.delete(outPath); + } + return AvroParquetWriter.builder(outPath); + } + return AvroParquetWriter.builder(new StandardOutputFile()); + } + @Override public List getExamples() { return Lists.newArrayList( diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java index b2365d53fd..037ebde0b8 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java @@ -31,10 +31,15 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.cli.BaseCommand; import org.apache.parquet.cli.util.Codecs; +import org.apache.parquet.conf.HadoopParquetConfiguration; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.rewrite.MaskMode; import org.apache.parquet.hadoop.rewrite.ParquetRewriter; import org.apache.parquet.hadoop.rewrite.RewriteOptions; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.StandardOutputFile; import org.slf4j.Logger; @Parameters(commandDescription = "Rewrite one or more Parquet files to a new Parquet file") @@ -48,8 +53,8 @@ public class RewriteCommand extends BaseCommand { @Parameter( names = {"-o", "--output"}, - description = "", - required = true) + description = "", + required = false) String output; @Parameter( @@ -87,18 +92,10 @@ public RewriteCommand(Logger console) { } private RewriteOptions buildOptionsOrFail() throws IOException { - Preconditions.checkArgument( - inputs != null && !inputs.isEmpty() && output != null, - "Both input and output parquet file paths are required."); - - List inputPaths = new ArrayList<>(); - for (String input : inputs) { - inputPaths.add(new Path(input)); - } - Path outputPath = new Path(output); + Preconditions.checkArgument(inputs != null && !inputs.isEmpty(), "Input parquet file paths are required."); // The builder below takes the job to validate all input parameters. - RewriteOptions.Builder builder = new RewriteOptions.Builder(getConf(), inputPaths, outputPath); + RewriteOptions.Builder builder = createBuilder(); // Mask columns if specified. if (maskMode != null && maskMode.equals("nullify") && maskColumns != null && !maskColumns.isEmpty()) { @@ -121,18 +118,38 @@ private RewriteOptions buildOptionsOrFail() throws IOException { RewriteOptions options = builder.build(); - // If RewriteOptions are successfully built and the overwrite option is specified, remove the output path - FileSystem outFS = outputPath.getFileSystem(getConf()); - if (overwrite && outFS.exists(outputPath)) { - console.debug("Deleting output file {} (already exists)", outputPath); - outFS.delete(outputPath); + if (output != null) { + Path outputPath = new Path(output); + // If RewriteOptions are successfully built and the overwrite option is specified, remove the output path + FileSystem outFS = outputPath.getFileSystem(getConf()); + if (overwrite && outFS.exists(outputPath)) { + console.debug("Deleting output file {} (already exists)", outputPath); + outFS.delete(outputPath); + } } return options; } + private RewriteOptions.Builder createBuilder() { + if (output != null) { + List inputPaths = new ArrayList<>(); + for (String input : inputs) { + inputPaths.add(new Path(input)); + } + Path outputPath = new Path(output); + return new RewriteOptions.Builder(getConf(), inputPaths, outputPath); + } + + List inputFiles = new ArrayList<>(); + for (String input : inputs) { + inputFiles.add(HadoopInputFile.fromPathUnchecked(new Path(input), getConf())); + } + OutputFile outputFile = new StandardOutputFile(); + return new RewriteOptions.Builder(new HadoopParquetConfiguration(getConf()), inputFiles, outputFile); + } + @Override - @SuppressWarnings("unchecked") public int run() throws IOException { RewriteOptions options = buildOptionsOrFail(); ParquetRewriter rewriter = new ParquetRewriter(options); diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCommandTest.java index 4870c48b49..3beab72a03 100644 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCommandTest.java +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ConvertCommandTest.java @@ -18,8 +18,10 @@ */ package org.apache.parquet.cli.commands; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.PrintStream; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; @@ -37,4 +39,16 @@ public void testConvertCommand() throws IOException { Assert.assertEquals(0, command.run()); Assert.assertTrue(output.exists()); } + + @Test + public void testConvertCommandToStdOut() throws IOException { + File file = toAvro(parquetFile()); + ConvertCommand command = new ConvertCommand(createLogger()); + command.targets = Arrays.asList(file.getAbsolutePath()); + command.setConf(new Configuration()); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + Assert.assertEquals(0, command.run()); + Assert.assertTrue(baos.size() > 0); + } } diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java index 10f8c6176b..3c24ccfcf6 100644 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/RewriteCommandTest.java @@ -18,8 +18,10 @@ */ package org.apache.parquet.cli.commands; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.PrintStream; import java.nio.file.Files; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; @@ -95,4 +97,18 @@ public void testRewriteCommandWithCompression_gzip() throws IOException { Assert.assertEquals(0, command.run()); Assert.assertTrue(output.exists()); } + + @Test + public void testRewriteCommandToStdOut() throws IOException { + File file = parquetFile(); + RewriteCommand command = new RewriteCommand(createLogger()); + command.inputs = Arrays.asList(file.getAbsolutePath()); + command.codec = "gzip"; + command.setConf(new Configuration()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + Assert.assertEquals(0, command.run()); + Assert.assertTrue(baos.size() > 0); + } } diff --git a/parquet-common/src/main/java/org/apache/parquet/io/StandardOutputFile.java b/parquet-common/src/main/java/org/apache/parquet/io/StandardOutputFile.java new file mode 100644 index 0000000000..f4b4d1f075 --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/io/StandardOutputFile.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.io; + +import java.io.IOException; + +/** + * An {@link OutputFile} implementation that allows Parquet to write to {@link System#out} + */ +public class StandardOutputFile implements OutputFile { + + public class RawPositionOutputStream extends PositionOutputStream { + + private long pos = 0; + + @Override + public long getPos() throws IOException { + return this.pos; + } + + @Override + public void write(int b) throws IOException { + this.pos++; + System.out.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + this.pos += b.length; + System.out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + this.pos += len; + System.out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + System.out.flush(); + } + } + + @Override + public PositionOutputStream create(long blockSizeHint) throws IOException { + return new RawPositionOutputStream(); + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + return new RawPositionOutputStream(); + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return -1; + } +} diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestStandardOutputFile.java b/parquet-common/src/test/java/org/apache/parquet/io/TestStandardOutputFile.java new file mode 100644 index 0000000000..43134d0aa5 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/io/TestStandardOutputFile.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import org.junit.Test; + +public class TestStandardOutputFile { + + private static final String TEST = "this is a test"; + + @Test + public void outputFileWriteByteToStdOut() throws IOException, InterruptedException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + final PositionOutputStream stdOut = new StandardOutputFile().create(1); + final byte test = 7; + stdOut.write(test); + assertEquals(1, stdOut.getPos()); + assertEquals(1, baos.toByteArray().length); + assertEquals(7, baos.toByteArray()[0]); + } + + @Test + public void outputFileWriteArrayToStdOut() throws IOException, InterruptedException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + final PositionOutputStream stdOut = new StandardOutputFile().create(1); + final byte[] test = TEST.getBytes(); + stdOut.write(test); + assertEquals(test.length, stdOut.getPos()); + assertEquals(test.length, baos.toByteArray().length); + assertEquals(TEST, new String(baos.toByteArray())); + } + + @Test + public void outputFileWriteArrayOffsetToStdOut() throws IOException, InterruptedException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + final PositionOutputStream stdOut = new StandardOutputFile().create(1); + final byte[] test = TEST.getBytes(); + stdOut.write(test, 1, 4); + assertEquals(4, stdOut.getPos()); + assertEquals(4, baos.toByteArray().length); + assertEquals(TEST.substring(1, 5), new String(baos.toByteArray())); + } +}