Skip to content

Commit

Permalink
Merge pull request #61720 from Avogar/try-fix-arrow-abort
Browse files Browse the repository at this point in the history
Try to fix abort in arrow
  • Loading branch information
Avogar committed Apr 10, 2024
2 parents 027c8a8 + 5fadac4 commit d70b622
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 15 deletions.
2 changes: 1 addition & 1 deletion contrib/arrow
74 changes: 60 additions & 14 deletions src/Processors/Formats/Impl/ArrowBufferedStreams.cpp
Expand Up @@ -4,6 +4,7 @@

#if USE_ARROW || USE_ORC || USE_PARQUET
#include <Common/assert_cast.h>
#include <Common/logger_useful.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
Expand Down Expand Up @@ -41,9 +42,18 @@ arrow::Result<int64_t> ArrowBufferedOutputStream::Tell() const

arrow::Status ArrowBufferedOutputStream::Write(const void * data, int64_t length)
{
out.write(reinterpret_cast<const char *>(data), length);
total_length += length;
return arrow::Status::OK();
try
{
out.write(reinterpret_cast<const char *>(data), length);
total_length += length;
return arrow::Status::OK();
}
catch (...)
{
auto message = getCurrentExceptionMessage(false);
LOG_ERROR(getLogger("ArrowBufferedOutputStream"), "Error while writing to arrow stream: {}", message);
return arrow::Status::IOError(message);
}
}

RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_, std::optional<off_t> file_size_, bool avoid_buffering_)
Expand Down Expand Up @@ -74,9 +84,18 @@ arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Tell() const

arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, void * out)
{
if (avoid_buffering)
in.setReadUntilPosition(seekable_in.getPosition() + nbytes);
return in.readBig(reinterpret_cast<char *>(out), nbytes);
try
{
if (avoid_buffering)
in.setReadUntilPosition(seekable_in.getPosition() + nbytes);
return in.readBig(reinterpret_cast<char *>(out), nbytes);
}
catch (...)
{
auto message = getCurrentExceptionMessage(false);
LOG_ERROR(getLogger("ArrowBufferedOutputStream"), "Error while reading from arrow stream: {}", message);
return arrow::Status::IOError(message);
}
}

arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes)
Expand All @@ -98,14 +117,23 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBu

arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
{
if (avoid_buffering)
try
{
// Seeking to a position above a previous setReadUntilPosition() confuses some of the
// ReadBuffer implementations.
in.setReadUntilEnd();
if (avoid_buffering)
{
// Seeking to a position above a previous setReadUntilPosition() confuses some of the
// ReadBuffer implementations.
in.setReadUntilEnd();
}
seekable_in.seek(position, SEEK_SET);
return arrow::Status::OK();
}
catch (...)
{
auto message = getCurrentExceptionMessage(false);
LOG_ERROR(getLogger("ArrowBufferedOutputStream"), "Error while seeking arrow file: {}", message);
return arrow::Status::IOError(message);
}
seekable_in.seek(position, SEEK_SET);
return arrow::Status::OK();
}


Expand All @@ -115,7 +143,16 @@ ArrowInputStreamFromReadBuffer::ArrowInputStreamFromReadBuffer(ReadBuffer & in_)

arrow::Result<int64_t> ArrowInputStreamFromReadBuffer::Read(int64_t nbytes, void * out)
{
return in.readBig(reinterpret_cast<char *>(out), nbytes);
try
{
return in.readBig(reinterpret_cast<char *>(out), nbytes);
}
catch (...)
{
auto message = getCurrentExceptionMessage(false);
LOG_ERROR(getLogger("ArrowBufferedOutputStream"), "Error while reading from arrow stream: {}", message);
return arrow::Status::IOError(message);
}
}

arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowInputStreamFromReadBuffer::Read(int64_t nbytes)
Expand Down Expand Up @@ -154,7 +191,16 @@ arrow::Result<int64_t> RandomAccessFileFromRandomAccessReadBuffer::GetSize()

arrow::Result<int64_t> RandomAccessFileFromRandomAccessReadBuffer::ReadAt(int64_t position, int64_t nbytes, void* out)
{
return in.readBigAt(reinterpret_cast<char*>(out), nbytes, position, nullptr);
try
{
return in.readBigAt(reinterpret_cast<char *>(out), nbytes, position, nullptr);
}
catch (...)
{
auto message = getCurrentExceptionMessage(false);
LOG_ERROR(getLogger("ArrowBufferedOutputStream"), "Error while reading from arrow stream: {}", message);
return arrow::Status::IOError(message);
}
}

arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromRandomAccessReadBuffer::ReadAt(int64_t position, int64_t nbytes)
Expand Down
Empty file.
4 changes: 4 additions & 0 deletions tests/queries/0_stateless/02834_apache_arrow_abort.sql
@@ -0,0 +1,4 @@
-- Tags: no-fasttest
-- This tests depends on internet access, but it does not matter, because it only has to check that there is no abort due to a bug in Apache Arrow library.

INSERT INTO TABLE FUNCTION url('https://clickhouse-public-datasets.s3.amazonaws.com/hits_compatible/hits.parquet') SELECT * FROM url('https://clickhouse-public-datasets.s3.amazonaws.com/hits_compatible/hits.parquet'); -- { serverError CANNOT_WRITE_TO_OSTREAM, RECEIVED_ERROR_FROM_REMOTE_IO_SERVER, POCO_EXCEPTION }

0 comments on commit d70b622

Please sign in to comment.