Skip to content

Commit

Permalink
avoid timeout upon long processing (#152)
Browse files Browse the repository at this point in the history
* enable using an external-system for sending continue message upon long processing.
refactoring the current external system functionality setup.

* compile error, modified interface

* add condition before the action

* adding severity to exceptions

* adding timer per continue-response, upon X elapsed time, the continue-response-function is called

* fix of the JSON error flow

---------

Signed-off-by: Gal Salomon <gal.salomon@gmail.com>
  • Loading branch information
galsalomon66 committed Mar 25, 2024
1 parent 0541c35 commit 9a28798
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 49 deletions.
2 changes: 1 addition & 1 deletion example/s3select_example.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file)
{
try
{
status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format);
status = parquet_processor.run_s3select_on_object(result);
}
catch (base_s3select_exception &e)
{
Expand Down
126 changes: 89 additions & 37 deletions include/s3select.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <boost/bind.hpp>
#include <functional>
#include <unordered_set>
#include <chrono>

#define _DEBUG_TERM {string token(a,b);std::cout << __FUNCTION__ << token << std::endl;}

Expand Down Expand Up @@ -2168,6 +2169,37 @@ struct s3select_csv_definitions //TODO

};

class ProcessTimer
{
private:
std::chrono::steady_clock::time_point m_last_call_time;
std::chrono::seconds m_interval;

public:
ProcessTimer() {
m_interval = std::chrono::seconds(30);
m_last_call_time = std::chrono::steady_clock::now();
}

ProcessTimer(std::chrono::seconds interval) : m_interval(interval) {
m_last_call_time = std::chrono::steady_clock::now();
}

void set(std::chrono::seconds interval)
{
m_interval = interval;
}

bool hasElapsed(void) {
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(now - m_last_call_time);
if (elapsed >= m_interval) {
m_last_call_time = now;
return true;
}
return false;
}
};

/////// handling different object types
class base_s3object
Expand All @@ -2188,8 +2220,12 @@ class base_s3object
unsigned long m_limit;
unsigned long m_processed_rows;
size_t m_returned_bytes_size;
std::function<void(const char*)> fp_ext_debug_mesg;//dispache debug message into external system
std::function<void(const char*)> m_fp_ext_debug_mesg=nullptr;//dispache debug message into external system
std::vector<std::string> m_projection_keys{};
std::function<int(std::string&)> m_fp_s3select_continue=nullptr;
std::function<int(std::string&)> m_fp_s3select_result_format=nullptr;
std::function<int(std::string&)> m_fp_s3select_header_format=nullptr;
ProcessTimer m_timer;

public:
s3select_csv_definitions m_csv_defintion;//TODO add method for modify
Expand Down Expand Up @@ -2312,8 +2348,25 @@ class base_s3object
m_processed_rows = 0;
}

void set_continue_mesg_interval(std::chrono::seconds interval)
{
m_timer.set(interval);
}

base_s3object():m_sa(nullptr),m_is_to_aggregate(false),m_where_clause(nullptr),m_s3_select(nullptr),m_error_count(0),m_returned_bytes_size(0),m_sql_processing_status(Status::INITIAL_STAT){}

void set_external_system_functions(std::function<int(std::string&)>& continue_message_fp,
std::function<int(std::string&)>& result_message_fp,
std::function<int(std::string&)>& header_message_fp,
std::function<void(const char*)>& ext_debug_mesg_fp)
{
m_fp_s3select_continue = continue_message_fp;
m_fp_s3select_result_format = result_message_fp;
m_fp_s3select_header_format = header_message_fp;
m_fp_ext_debug_mesg = ext_debug_mesg_fp;
}


explicit base_s3object(s3select* m):base_s3object()
{
if(m)
Expand All @@ -2330,11 +2383,6 @@ class base_s3object
// for the case were the rows are not fetched, but "pushed" by the data-source parser (JSON)
virtual bool multiple_row_processing(){return true;}

void set_external_debug_system(std::function<void(const char*)> fp_external)
{
fp_ext_debug_mesg = fp_external;
}

size_t get_return_result_size()
{
return m_returned_bytes_size;
Expand Down Expand Up @@ -2397,8 +2445,8 @@ class base_s3object
}


if(fp_ext_debug_mesg)
fp_ext_debug_mesg(column_result.data());
if(m_fp_ext_debug_mesg)
m_fp_ext_debug_mesg(column_result.data());

if (m_csv_defintion.quote_fields_always) {
std::ostringstream quoted_result;
Expand Down Expand Up @@ -2438,7 +2486,7 @@ class base_s3object
{
return m_sql_processing_status = Status::LIMIT_REACHED;
}

if (m_aggr_flow == true)
{
do
Expand All @@ -2461,6 +2509,12 @@ class base_s3object
}

m_processed_rows++;
if(m_timer.hasElapsed())
{
if(m_fp_s3select_continue)
m_fp_s3select_continue(result);
}

if ((*m_projections.begin())->is_set_last_call())
{
//should validate while query execution , no update upon nodes are marked with set_last_call
Expand Down Expand Up @@ -2489,7 +2543,7 @@ class base_s3object
i->set_last_call();
i->set_skip_non_aggregate(false);//projection column is set to be runnable
projections_resuls.push_value( &(i->eval()) );
}
}
result_values_to_string(projections_resuls,result);
return is_processing_time_error() ? (m_sql_processing_status = Status::SQL_ERROR) : (m_sql_processing_status = Status::LIMIT_REACHED);
}
Expand All @@ -2510,6 +2564,11 @@ class base_s3object
}

m_processed_rows++;
if(m_timer.hasElapsed())
{
if(m_fp_s3select_continue)
m_fp_s3select_continue(result);
}
row_update_data();
for (auto& a : *m_s3_select->get_aliases()->get())
{
Expand Down Expand Up @@ -2543,7 +2602,7 @@ class base_s3object
for (auto& i : m_projections)
{
projections_resuls.push_value( &(i->eval()) );
}
}
result_values_to_string(projections_resuls,result);
if(m_sql_processing_status == Status::SQL_ERROR)
{
Expand Down Expand Up @@ -2627,15 +2686,6 @@ class csv_object : public base_s3object
int64_t m_number_of_tokens;
size_t m_skip_x_first_bytes=0;

std::function<int(std::string&)> fp_s3select_result_format=nullptr;
std::function<int(std::string&)> fp_s3select_header_format=nullptr;
public:
void set_result_formatters( std::function<int(std::string&)>& result_format,
std::function<int(std::string&)>& header_format)
{
fp_s3select_result_format = result_format;
fp_s3select_header_format = header_format;
}
private:
int getNextRow()
{
Expand Down Expand Up @@ -2786,10 +2836,10 @@ class csv_object : public base_s3object

if(*p_obj_chunk != m_csv_defintion.row_delimiter)
{// previous row can not be completed with current chunk
if(fp_ext_debug_mesg)
if(m_fp_ext_debug_mesg)
{
std::string err_mesg = "** the stream chunk is too small for processing(saved for later) **";
fp_ext_debug_mesg(err_mesg.c_str());
m_fp_ext_debug_mesg(err_mesg.c_str());
}
//copy the part to be processed later
tmp_buff.assign((char*)csv_stream, (char*)csv_stream + (p_obj_chunk - csv_stream));
Expand Down Expand Up @@ -2886,12 +2936,12 @@ class csv_object : public base_s3object
}
}

if(fp_s3select_result_format && fp_s3select_header_format)
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{
if (result.size() > CSV_INPUT_TYPE_RESPONSE_SIZE_LIMIT)
{//there are systems that might resject the response due to its size.
fp_s3select_result_format(result);
fp_s3select_header_format(result);
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}
}

Expand All @@ -2910,11 +2960,11 @@ class csv_object : public base_s3object

} while (true);

if(fp_s3select_result_format && fp_s3select_header_format)
if(m_fp_s3select_result_format && m_fp_s3select_header_format)
{ //note: it may produce empty response(more the once)
//upon empty result, it should return *only* upon last call.
fp_s3select_result_format(result);
fp_s3select_header_format(result);
m_fp_s3select_result_format(result);
m_fp_s3select_header_format(result);
}

return 0;
Expand Down Expand Up @@ -3002,11 +3052,10 @@ class parquet_object : public base_s3object
}


int run_s3select_on_object(std::string &result,
std::function<int(std::string&)> fp_s3select_result_format,
std::function<int(std::string&)> fp_s3select_header_format)
int run_s3select_on_object(std::string &result)
{
m_sql_processing_status = Status::INITIAL_STAT;

m_sql_processing_status = Status::INITIAL_STAT;
do
{
try
Expand Down Expand Up @@ -3035,18 +3084,21 @@ class parquet_object : public base_s3object
#define S3SELECT_RESPONSE_SIZE_LIMIT (4 * 1024 * 1024)
if (result.size() > S3SELECT_RESPONSE_SIZE_LIMIT)
{//AWS-cli limits response size the following callbacks send response upon some threshold
fp_s3select_result_format(result);
if(m_fp_s3select_result_format)
m_fp_s3select_result_format(result);

if (!is_end_of_stream() && (get_sql_processing_status() != Status::LIMIT_REACHED))
{
fp_s3select_header_format(result);
if(m_fp_s3select_header_format)
m_fp_s3select_header_format(result);
}
}
else
{
if (is_end_of_stream() || (get_sql_processing_status() == Status::LIMIT_REACHED))
{
fp_s3select_result_format(result);
if(m_fp_s3select_result_format)
m_fp_s3select_result_format(result);
}
}

Expand Down Expand Up @@ -3288,9 +3340,9 @@ class json_object : public base_s3object
throw base_s3select_exception(error_description,base_s3select_exception::s3select_exp_en_t::FATAL);
}

if(status<0)
if(status<0 || m_error_count>100)
{
std::string error_description = std::string("failure upon JSON processing");
std::string error_description = std::string("failure upon JSON processing:") + m_error_description;
throw base_s3select_exception(error_description,base_s3select_exception::s3select_exp_en_t::FATAL);
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion include/s3select_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ struct _fn_sum : public base_function
{
if (e.severity() == base_s3select_exception::s3select_exp_en_t::FATAL)
{
throw;
throw e;
}
}

Expand Down
18 changes: 9 additions & 9 deletions include/s3select_oper.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ struct binop_div
if( std::isnan(a)) {
return a;
} else {
throw base_s3select_exception("division by zero is not allowed");
throw base_s3select_exception("division by zero is not allowed",base_s3select_exception::s3select_exp_en_t::FATAL);
}
} else {
return a / b;
Expand All @@ -387,7 +387,7 @@ struct binop_modulo
{
if (b == 0)
{
throw base_s3select_exception("Mod zero is not allowed");
throw base_s3select_exception("Mod zero is not allowed",base_s3select_exception::s3select_exp_en_t::FATAL);
} else {
return a % b;
}
Expand All @@ -400,7 +400,7 @@ struct binop_float_modulo
{
if (b == 0)
{
throw base_s3select_exception("Mod zero is not allowed");
throw base_s3select_exception("Mod zero is not allowed",base_s3select_exception::s3select_exp_en_t::FATAL);
} else {
return fmod(a, b);
}
Expand Down Expand Up @@ -870,7 +870,7 @@ class value
return false;
}

throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
throw base_s3select_exception("operands not of the same type(numeric , string), while comparision",base_s3select_exception::s3select_exp_en_t::FATAL);
}

bool operator>(const value& v) //basic compare operator , most itensive runtime operation
Expand Down Expand Up @@ -919,7 +919,7 @@ class value
return false;
}

throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
throw base_s3select_exception("operands not of the same type(numeric , string), while comparision",base_s3select_exception::s3select_exp_en_t::FATAL);
}

friend bool operator==(const value& lhs, const value& rhs) //basic compare operator , most itensive runtime operation
Expand Down Expand Up @@ -986,7 +986,7 @@ class value
return false;
}

throw base_s3select_exception("operands not of the same type(numeric , string), while comparision");
throw base_s3select_exception("operands not of the same type(numeric , string), while comparision",base_s3select_exception::s3select_exp_en_t::FATAL);
}
bool operator<=(const value& v)
{
Expand Down Expand Up @@ -1022,11 +1022,11 @@ class value

if (l.is_string() || r.is_string())
{
throw base_s3select_exception("illegal binary operation with string");
throw base_s3select_exception("illegal binary operation with string",base_s3select_exception::s3select_exp_en_t::FATAL);
}
if (l.is_bool() || r.is_bool())
{
throw base_s3select_exception("illegal binary operation with bool type");
throw base_s3select_exception("illegal binary operation with bool type",base_s3select_exception::s3select_exp_en_t::FATAL);
}

if (l.is_number() && r.is_number())
Expand Down Expand Up @@ -1116,7 +1116,7 @@ class value
} else if(v.type == value_En_t::FLOAT || this->type == value_En_t::FLOAT) {
return compute<binop_float_modulo>(*this,v);
} else {
throw base_s3select_exception("wrong use of modulo operation!");
throw base_s3select_exception("wrong use of modulo operation!",base_s3select_exception::s3select_exp_en_t::FATAL);
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion test/s3select_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ int run_query_on_parquet_file(const char* input_query, const char* input_file, s
{
try
{
status = parquet_processor.run_s3select_on_object(result,fp_s3select_result_format,fp_s3select_header_format);
status = parquet_processor.run_s3select_on_object(result);
}
catch (base_s3select_exception &e)
{
Expand Down

0 comments on commit 9a28798

Please sign in to comment.