Skip to content

Commit

Permalink
refactor: merge parquet files interface
Browse files Browse the repository at this point in the history
  • Loading branch information
hengfeiyang committed May 20, 2024
1 parent 08ab4e8 commit 6e8bb32
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 181 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
run: rustc --print cfg

- name: Setup cargo config
run: cargo --config net.git-fetch-with-cli=true clippy -- -A clippy::question-mark -D warnings
run: cargo --config net.git-fetch-with-cli=true clippy -- -D warnings

- name: Run unit tests
run: ./coverage.sh
Expand Down
43 changes: 32 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ rayon.workspace = true
regex.workspace = true
regex-syntax.workspace = true
reqwest.workspace = true
rust-embed-for-web = "11.1"
rust-embed-for-web = "11.2.1"
segment.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion deploy/build/buildspec-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ phases:
- touch web/dist/index.html

# run tests
- cargo --config net.git-fetch-with-cli=true clippy -- -A clippy::question-mark -D warnings
- cargo --config net.git-fetch-with-cli=true clippy -- -D warnings
- ./coverage.sh
24 changes: 23 additions & 1 deletion src/config/src/utils/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 Zinc Labs Inc.
// Copyright 2024 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -111,6 +111,28 @@ pub fn new_parquet_writer<'a>(
AsyncArrowWriter::try_new(buf, schema.clone(), Some(writer_props)).unwrap()
}

pub async fn write_recordbatch_to_parquet(
schema: Arc<Schema>,
record_batches: &[RecordBatch],
bloom_filter_fields: &[String],
full_text_search_fields: &[String],
metadata: &FileMeta,
) -> Result<Vec<u8>, anyhow::Error> {
let mut buf = Vec::new();
let mut writer = new_parquet_writer(
&mut buf,
&schema,
bloom_filter_fields,
full_text_search_fields,
metadata,
);
for batch in record_batches {
writer.write(batch).await?;
}
writer.close().await?;
Ok(buf)
}

// parse file key to get stream_key, date_key, file_name
pub fn parse_file_key_columns(key: &str) -> Result<(String, String, String), anyhow::Error> {
// eg: files/default/logs/olympics/2022/10/03/10/6982652937134804993_1.parquet
Expand Down

0 comments on commit 6e8bb32

Please sign in to comment.