Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive's CombineTextInputFormat #21842

Open
sjdurfey opened this issue May 7, 2024 · 4 comments
Open

Hive's CombineTextInputFormat #21842

sjdurfey opened this issue May 7, 2024 · 4 comments

Comments

@sjdurfey
Copy link

sjdurfey commented May 7, 2024

I use the hive metastore pretty heavily and we currently have hundreds of tables that make use of the CombineTextInputFormat for the storage descriptor. I was recently trying to upgrade to Trino 445 from Trino 409 and found #15921 which mentions removing Hive dependencies in the code base. I spun it up and tried querying against my metastore and was getting errors like this when querying text based tables:

io.trino.spi.TrinoException: Unsupported storage format: mydatabase.mytable:<UNPARTITIONED> StorageFormat{serde=org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, inputFormat=org.apache.hadoop.mapred.lib.CombineTextInputFormat, outputFormat=org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat}

This error tracks with the code changes in the HiveStorageFormat and the HiveClassNames. Our hive tables are used to query with Trino but also run spark jobs against them. So, using the CombineTextInputFormat is ideal to help deal with the small file problem in spark. However, that means those tables can't be queried via Trino.

A few questions:

  • This comment mentions that trino devs are open to supporting popular and maintained input formats. Is CombineTextInputFormat a format Trino would consider supporting?
  • If my hive tables have a small file problem would converting them to TextInputFormat degrade performance in Trino when querying these tables?
  • Are there any suggestions for working around this that doesn't require migrating tables storage formats to work with Trino?
@raunaqmorarka
Copy link
Member

cc: @dain @electrum @findinpath

@electrum
Copy link
Member

electrum commented May 8, 2024

This should be easy to fix. Try adding the following lines to the HIVE_STORAGE_FORMATS map in HiveStorageFormat:

.put(new SerdeAndInputFormat(LAZY_SIMPLE_SERDE_CLASS, "org.apache.hadoop.mapred.lib.CombineTextInputFormat"), TEXTFILE)
.put(new SerdeAndInputFormat(LAZY_SIMPLE_SERDE_CLASS, "org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat"), TEXTFILE)

If that works, then please send a PR.

Trino has never supported combined splits, so there should be no performance difference.

@sjdurfey
Copy link
Author

sjdurfey commented May 8, 2024

I added that and I do get back data, however, the trino logs are flooded with exceptions. They are DEBUG, so, perhaps that is fine and normal. They don't show up with INFO logging. If these are fine, then I'll get out a PR for the change. Thanks!

2024-05-08T19:08:29.870-0400	DEBUG	task-notification-0	io.trino.execution.TaskStateMachine	Task 20240508_230827_00000_ihhvj.1.0.0 is CANCELED
2024-05-08T19:08:29.870-0400	DEBUG	SplitRunner-25	io.trino.execution.executor.dedicated.SplitProcessor	Driver was interrupted
io.trino.spi.TrinoException: Driver was interrupted
	at io.trino.operator.Driver.lambda$process$8(Driver.java:327)
	at io.trino.operator.Driver.tryWithLock(Driver.java:709)
	at io.trino.operator.Driver.process(Driver.java:298)
	at io.trino.operator.Driver.processForDuration(Driver.java:269)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:890)
	at io.trino.execution.executor.dedicated.SplitProcessor.run(SplitProcessor.java:76)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.lambda$run$0(TaskEntry.java:191)
	at io.trino.$gen.Trino_dev____20240508_230700_2.run(Unknown Source)
	at io.trino.execution.executor.dedicated.TaskEntry$VersionEmbedderBridge.run(TaskEntry.java:192)
	at io.trino.execution.executor.scheduler.FairScheduler.runTask(FairScheduler.java:174)
	at io.trino.execution.executor.scheduler.FairScheduler.lambda$submit$0(FairScheduler.java:161)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
	Suppressed: io.trino.spi.TrinoException: Error opening Hive split s3://mys3bucket/<some part path>/part-00000-451f2b6b-005a-44f9-aaa1-4d4e979cf644.c000.gz (offset=0, length=1114348): Read of file s3://mys3bucket/<same part path>/part-00000-451f2b6b-005a-44f9-aaa1-4d4e979cf644.c000.gz failed: null
		at io.trino.plugin.hive.line.LinePageSourceFactory.createPageSource(LinePageSourceFactory.java:156)
		at io.trino.plugin.hive.HivePageSourceProvider.createHivePageSource(HivePageSourceProvider.java:200)
		at io.trino.plugin.hive.HivePageSourceProvider.createPageSource(HivePageSourceProvider.java:136)
		at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorPageSourceProvider.createPageSource(ClassLoaderSafeConnectorPageSourceProvider.java:48)
		at io.trino.split.PageSourceManager.createPageSource(PageSourceManager.java:61)
		at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:264)
		at io.trino.operator.Driver.processInternal(Driver.java:403)
		at io.trino.operator.Driver.lambda$process$8(Driver.java:306)
		... 17 more
	Caused by: java.io.IOException: Read of file s3://mys3bucket/<some part path>/part-00000-451f2b6b-005a-44f9-aaa1-4d4e979cf644.c000.gz failed: null
		at io.trino.filesystem.hdfs.HdfsTrinoInputStream.read(HdfsTrinoInputStream.java:108)
		at java.base/java.io.InputStream.readNBytes(InputStream.java:412)
		at java.base/java.io.InputStream.readAllBytes(InputStream.java:349)
		at io.trino.plugin.hive.line.LinePageSourceFactory.createPageSource(LinePageSourceFactory.java:140)
		... 24 more
	Caused by: java.io.InterruptedIOException
		at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.propagate(TrinoS3FileSystem.java:1641)
		at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.openStream(TrinoS3FileSystem.java:1600)
		at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.openStream(TrinoS3FileSystem.java:1553)
		at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.seekStream(TrinoS3FileSystem.java:1546)
		at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.lambda$read$1(TrinoS3FileSystem.java:1490)
		at io.trino.hdfs.s3.RetryDriver.run(RetryDriver.java:125)
		at io.trino.hdfs.s3.TrinoS3FileSystem$TrinoS3InputStream.read(TrinoS3FileSystem.java:1489)
		at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:345)
		at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
		at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:405)
		at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
		at java.base/java.io.DataInputStream.read(DataInputStream.java:158)
		at io.trino.filesystem.hdfs.HdfsTrinoInputStream.read(HdfsTrinoInputStream.java:102)
		... 27 more
Caused by: java.lang.Exception: Interrupted By
	at java.base/java.lang.Thread.getStackTrace(Thread.java:2450)
	at io.trino.operator.Driver$DriverLock.interruptCurrentOwner(Driver.java:858)
	at io.trino.operator.Driver.close(Driver.java:178)
	at io.trino.execution.SqlTaskExecution$DriverSplitRunner.close(SqlTaskExecution.java:909)
	at io.trino.execution.executor.dedicated.TaskEntry.destroy(TaskEntry.java:88)
	at io.trino.execution.executor.dedicated.ThreadPerDriverTaskExecutor.removeTask(ThreadPerDriverTaskExecutor.java:143)
	at io.trino.execution.SqlTaskExecution.lambda$createTaskHandle$3(SqlTaskExecution.java:232)
	at io.trino.execution.StateMachine.fireStateChangedListener(StateMachine.java:240)
	at io.trino.execution.StateMachine.lambda$fireStateChanged$0(StateMachine.java:232)
	... 3 more

@electrum
Copy link
Member

electrum commented May 8, 2024

These logs are normal. The Driver was interrupted message for SplitRunner occurs when a task is cancelled, which usually happens due to a LIMIT query or terminating manually in the CLI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

3 participants