Skip to content

Commit

Permalink
fix alloc write row concurrently core dump for drop column.
Browse files Browse the repository at this point in the history
  • Loading branch information
obdev authored and junye committed Feb 3, 2024
1 parent cfe7141 commit 73d0496
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
14 changes: 10 additions & 4 deletions src/storage/ddl/ob_complement_data_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,14 +707,20 @@ int ObComplementPrepareTask::process()
}

ObComplementWriteTask::ObComplementWriteTask()
: ObITask(TASK_TYPE_COMPLEMENT_WRITE), is_inited_(false), task_id_(0), param_(nullptr),
: ObITask(TASK_TYPE_COMPLEMENT_WRITE), allocator_("WriteTaskAlloc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
is_inited_(false), task_id_(0), param_(nullptr),
context_(nullptr), write_row_(),
col_ids_(), org_col_ids_(), output_projector_()
{
}

ObComplementWriteTask::~ObComplementWriteTask()
{
col_ids_.reset();
org_col_ids_.reset();
output_projector_.reset();
write_row_.reset();
allocator_.reset();
}

int ObComplementWriteTask::init(const int64_t task_id, ObComplementDataParam &param,
Expand All @@ -739,7 +745,7 @@ int ObComplementWriteTask::init(const int64_t task_id, ObComplementDataParam &pa
ret = OB_TABLE_NOT_EXIST;
LOG_WARN("hidden table schema not exist", K(ret), K(param));
} else if (OB_FAIL(write_row_.init(
param.allocator_, hidden_table_schema->get_column_count() + storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
allocator_, hidden_table_schema->get_column_count() + storage::ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt()))) {
LOG_WARN("Fail to init write row", K(ret));
} else {
write_row_.row_flag_.set_flag(ObDmlFlag::DF_INSERT);
Expand Down Expand Up @@ -1474,7 +1480,7 @@ int ObComplementMergeTask::add_build_hidden_table_sstable()
ObLocalScan::ObLocalScan() : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), table_id_(OB_INVALID_ID),
dest_table_id_(OB_INVALID_ID), schema_version_(0), extended_gc_(), snapshot_version_(common::OB_INVALID_VERSION),
txs_(nullptr), default_row_(), tmp_row_(), row_iter_(nullptr), scan_merge_(nullptr), ctx_(), access_param_(),
access_ctx_(), get_table_param_(), allocator_("ObLocalScan"), calc_buf_(ObModIds::OB_SQL_EXPR_CALC),
access_ctx_(), get_table_param_(), allocator_("ObLocalScan", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), calc_buf_(ObModIds::OB_SQL_EXPR_CALC),
col_params_(), read_info_(), exist_column_mapping_(allocator_), checksum_calculator_()
{}

Expand Down Expand Up @@ -1933,7 +1939,7 @@ ObRemoteScan::ObRemoteScan()
row_with_reshape_(),
res_(),
result_(nullptr),
allocator_("DDLRemoteScan"),
allocator_("DDLRemoteScan", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
org_col_ids_(),
column_names_(),
checksum_calculator_()
Expand Down
17 changes: 11 additions & 6 deletions src/storage/ddl/ob_complement_data_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ struct ObComplementDataParam final
is_inited_(false), orig_tenant_id_(common::OB_INVALID_TENANT_ID), dest_tenant_id_(common::OB_INVALID_TENANT_ID),
orig_ls_id_(share::ObLSID::INVALID_LS_ID), dest_ls_id_(share::ObLSID::INVALID_LS_ID), orig_table_id_(common::OB_INVALID_ID),
dest_table_id_(common::OB_INVALID_ID), orig_tablet_id_(ObTabletID::INVALID_TABLET_ID), dest_tablet_id_(ObTabletID::INVALID_TABLET_ID),
allocator_("CompleteDataPar"), row_store_type_(common::ENCODING_ROW_STORE), orig_schema_version_(0), dest_schema_version_(0),
snapshot_version_(0), concurrent_cnt_(0), task_id_(0), execution_id_(-1), tablet_task_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID), data_format_version_(0)
row_store_type_(common::ENCODING_ROW_STORE), orig_schema_version_(0), dest_schema_version_(0),
snapshot_version_(0), concurrent_cnt_(0), task_id_(0), execution_id_(-1), tablet_task_id_(0), compat_mode_(lib::Worker::CompatMode::INVALID), data_format_version_(0),
allocator_("CompleteDataPar", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
{}
~ObComplementDataParam() { destroy(); }
int init(const ObDDLBuildSingleReplicaRequestArg &arg);
Expand Down Expand Up @@ -99,7 +100,6 @@ struct ObComplementDataParam final
uint64_t dest_table_id_;
ObTabletID orig_tablet_id_;
ObTabletID dest_tablet_id_;
common::ObArenaAllocator allocator_;
common::ObRowStoreType row_store_type_;
int64_t orig_schema_version_;
int64_t dest_schema_version_;
Expand All @@ -111,15 +111,18 @@ struct ObComplementDataParam final
lib::Worker::CompatMode compat_mode_;
int64_t data_format_version_;
ObSEArray<common::ObStoreRange, 32> ranges_;
private:
common::ObArenaAllocator allocator_;
};

struct ObComplementDataContext final
{
public:
ObComplementDataContext():
is_inited_(false), is_major_sstable_exist_(false), complement_data_ret_(common::OB_SUCCESS),
allocator_("CompleteDataCtx"), lock_(ObLatchIds::COMPLEMENT_DATA_CONTEXT_LOCK), concurrent_cnt_(0),
data_sstable_redo_writer_(), index_builder_(nullptr), ddl_kv_mgr_handle_(), row_scanned_(0), row_inserted_(0)
lock_(ObLatchIds::COMPLEMENT_DATA_CONTEXT_LOCK), concurrent_cnt_(0),
data_sstable_redo_writer_(), index_builder_(nullptr), ddl_kv_mgr_handle_(), row_scanned_(0), row_inserted_(0),
allocator_("CompleteDataCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
{}
~ObComplementDataContext() { destroy(); }
int init(const ObComplementDataParam &param, const ObDataStoreDesc &desc);
Expand All @@ -132,7 +135,6 @@ struct ObComplementDataContext final
bool is_inited_;
bool is_major_sstable_exist_;
int complement_data_ret_;
common::ObArenaAllocator allocator_;
ObSpinLock lock_;
int64_t concurrent_cnt_;
ObDDLSSTableRedoWriter data_sstable_redo_writer_;
Expand All @@ -142,6 +144,8 @@ struct ObComplementDataContext final
int64_t row_inserted_;
ObArray<int64_t> report_col_checksums_;
ObArray<int64_t> report_col_ids_;
private:
common::ObArenaAllocator allocator_;
};

class ObComplementPrepareTask;
Expand Down Expand Up @@ -215,6 +219,7 @@ class ObComplementWriteTask final : public share::ObITask

private:
static const int64_t RETRY_INTERVAL = 100 * 1000; // 100ms
common::ObArenaAllocator allocator_;
bool is_inited_;
int64_t task_id_;
ObComplementDataParam *param_;
Expand Down

0 comments on commit 73d0496

Please sign in to comment.