Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow CSV reader peak memory is very large #5766

Open
liujiayi771 opened this issue May 16, 2024 · 19 comments
Open

Arrow CSV reader peak memory is very large #5766

liujiayi771 opened this issue May 16, 2024 · 19 comments
Labels
bug Something isn't working triage

Comments

@liujiayi771
Copy link
Contributor

Backend

VL (Velox)

Bug description

When reading large CSV files, for example, when a single CSV file in a table is 300M, the peak memory usage of arrow memory pool during single-threaded reading can reach 500M. If the CSV is 2G, the peak memory usage can also increase to 1.7G. It looks like there is no memory leak, but the peak memory usage is very high.

From the code of Arrow Dataset, it seems that we are using the Streaming reader, theoretically the memory consumption may not increase proportionally with the size of the CSV file.

I have added some codes in the release method of ArrowNativeMemoryPool to check the peak memory.

@Override
public void release() throws Exception {
  System.out.println("peak=" + listener.peak() +", current=" + listener.current());
  if (arrowPool.getBytesAllocated() != 0) {
    LOGGER.warn(
        String.format(
            "Arrow pool still reserved non-zero bytes, "
                + "which may cause memory leak, size: %s. ",
            Utils.bytesToString(arrowPool.getBytesAllocated())));
  }
  arrowPool.close();
}

I also added some logs in arrow codes to check the peak memory.

Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
    const std::shared_ptr<ScanOptions>& scan_options,
    const std::shared_ptr<FileFragment>& file) const {
  auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
  auto source = file->source();
  auto reader_fut =
      OpenReaderAsync(source, *this, scan_options, ::arrow::internal::GetCpuThreadPool());
  auto generator = GeneratorFromReader(std::move(reader_fut), scan_options->batch_size);
  WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(
      generator, "arrow::dataset::CsvFileFormat::ScanBatchesAsync::Next");
  std::cout << "memory=" << default_memory_pool()->bytes_allocated() << ", max=" << default_memory_pool()->max_memory() << std::endl;
  return generator;
}
image image

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

No response

@liujiayi771 liujiayi771 added bug Something isn't working triage labels May 16, 2024
@liujiayi771
Copy link
Contributor Author

cc @jinchengchenghh @zhztheplayer, thanks.

@FelixYBW
Copy link
Contributor

I remember Arrow cached all record batches before it streams to Spark. In Gazelle we initially have the same issue, then have to customize some logic to do real streaming. @zhztheplayer do you remember?

@zhztheplayer
Copy link
Member

@zhztheplayer do you remember?

I can't recall that. But it doesn't make sense to buffer all data for a reader.

I suppose @jinchengchenghh is looking into this.

@jinchengchenghh
Copy link
Contributor

I could not reproduce this issue, I test TPCH Q6 with data 600G, and print the peak every time arrow reserve memory.

  public void reserve(long size) {
    synchronized (this) {
      sharedUsage.inc(size);
    }
    System.out.println(sharedUsage.peak());
  }

This is the test result

18350080
17825792:============================================>          (94 + 18) / 116]
18350080
17825792
18350080
17825792
18350080
17825792
18350080
17825792:=============================================>         (95 + 18) / 116]
18350080
17825792
18350080
17825792:=============================================>         (97 + 18) / 116]
18350080
17825792:==============================================>        (98 + 18) / 116]
18350080
17825792

After I change the --master from local[18] to local[2], same peak memory

@liujiayi771
Copy link
Contributor Author

@jinchengchenghh I will test the latest code.

@FelixYBW
Copy link
Contributor

@jinchengchenghh can you print in the record batch construction and destruction function to confirm? there should be only 1 record batch alive, no more than 3.

@liujiayi771
Copy link
Contributor Author

@jinchengchenghh Have you checked the size of a single CSV file?

@jinchengchenghh
Copy link
Contributor

I assume you use a middle commit of csv reader, there is redundant colVector.retain() in function ArrowUtil.loadBatch() in a middle version not the merged version, it may cause the vector does not release even if the column batch close. I delete colVector.retain for another issue, not sure if it is the root cause of this issue .@liujiayi771

@jinchengchenghh
Copy link
Contributor

The printed information is each time we request memory from arrow memory pool, not the recordbatch.
The batch consists of ArrowWritableColumnVector in java side, it use ArrowArray to bridge to C++ side, and then convert to Velox Vector, release it immediately. @FelixYBW

@liujiayi771
Copy link
Contributor Author

@jinchengchenghh I will test the latest code today.

@liujiayi771
Copy link
Contributor Author

liujiayi771 commented May 28, 2024

@jinchengchenghh I tested the latest code, and the peak memory usage is still relatively high. I did not add logs in ArrowReservationListener.reserve. Printing logs there did not output anything in my case. I added two methods in ArrowReservationListener, and printed peak and current in ArrowNativeMemoryPool.release.

public long peak() {
  return sharedUsage.peak();
}

public long current() {
  return sharedUsage.current();
}
@Override
public void release() throws Exception {
  System.out.println("peak=" + listener.peak() + ", current=" + listener.current());
  if (arrowPool.getBytesAllocated() != 0) {
    LOGGER.warn(
        String.format(
            "Arrow pool still reserved non-zero bytes, "
                + "which may cause memory leak, size: %s. ",
            Utils.bytesToString(arrowPool.getBytesAllocated())));
  }
  arrowPool.close();
}

I created a Parquet table and used spark-sql --local to read the data from a CSV table to insert overwrite into the Parquet table. My dataset is a 100GB TPC-DS. I first tested the store_sales table, where each CSV file is 700MB in size. The log output is as follows, the peak memory is about 920MB:

peak=964689920, current=8388608
peak=964689920, current=8388608
peak=964689920, current=8388608
peak=964689920, current=8388608
peak=964689920, current=8388608
peak=956301312, current=8388608

I continued testing the catalog_sales table, where each CSV file is 1.15GB in size. The log output is as follows, the peak memory is about 1064MB:

peak=1124073472, current=8388608
peak=1140850688, current=8388608
peak=1115684864, current=8388608
peak=1124073472, current=8388608
peak=1149239296, current=8388608
peak=1115684864, current=8388608

I constructed a larger catalog_sales table with a single 30GB CSV file. The log output is as follows, the peak memory is about 6GB:

peak=6601834496, current=8388608

The peak memory logs that I printed should only be used by the CSV reader. But this issue is not that urgent for me at the moment. After splitting the large CSV file into smaller files, it still works normally.

@jinchengchenghh
Copy link
Contributor

I think it is because arrow does not support to add file start and length to split a file, so it's peak memory is high for a very big CSV file.

@FelixYBW
Copy link
Contributor

Do you mean arrow csv doesn't support split? each partition must have one or more csv files, instead of part of a large csv file.

@jinchengchenghh
Copy link
Contributor

Yes.

@jinchengchenghh
Copy link
Contributor

Arrow is easy to support file offset and length, we just need to use RandomAccessFile to generate InputStream.
FileSource class constructor is

  using CustomOpen = std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;

  FileSource(std::shared_ptr<io::RandomAccessFile> file, int64_t size,
             Compression::type compression = Compression::UNCOMPRESSED)
      : custom_open_([=] { return ToResult(file); }),
        custom_size_(size),
        compression_(compression) {}
  static Result<std::shared_ptr<InputStream>> GetStream(
      std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes);

https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/file_base.cc#L110

I can help implement it on demand.

@FelixYBW
Copy link
Contributor

Arrow is easy to support file offset and length, we just need to use RandomAccessFile to generate InputStream. FileSource class constructor is

  using CustomOpen = std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;

  FileSource(std::shared_ptr<io::RandomAccessFile> file, int64_t size,
             Compression::type compression = Compression::UNCOMPRESSED)
      : custom_open_([=] { return ToResult(file); }),
        custom_size_(size),
        compression_(compression) {}
  static Result<std::shared_ptr<InputStream>> GetStream(
      std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes);

https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/file_base.cc#L110

I can help implement it on demand.

Thank you, Chengcheng. Let's hold on until we get requests

@liujiayi771
Copy link
Contributor Author

@jinchengchenghh Spark will split a single CSV file into multiple partitions for reading. We need to pass start and length to Arrow. I have currently resolved this issue through some hacks; otherwise, it would cause the same CSV file to be read multiple times.

@FelixYBW
Copy link
Contributor

FelixYBW commented May 31, 2024

@jinchengchenghh Spark will split a single CSV file into multiple partitions for reading. We need to pass start and length to Arrow. I have currently resolved this issue through some hacks; otherwise, it would cause the same CSV file to be read multiple times.

@jinchengchenghh Do we pass csv file multiple times to arrow if they are split by Spark?
@zhztheplayer how do we solve issue in Gazelle?

@jinchengchenghh
Copy link
Contributor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

4 participants