Skip to content

Commit

Permalink
[CP] trigger empty shell task thread when tablet create tx aborted
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddenbomb authored and ob-robot committed Apr 23, 2024
1 parent aa14b5a commit c129d71
Show file tree
Hide file tree
Showing 15 changed files with 385 additions and 54 deletions.
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ ob_set_subtarget(ob_storage multi_data_source
multi_data_source/ob_start_transfer_in_mds_ctx.cpp
multi_data_source/ob_finish_transfer_in_mds_ctx.cpp
multi_data_source/ob_abort_transfer_in_mds_ctx.cpp
multi_data_source/ob_tablet_create_mds_ctx.cpp
multi_data_source/test/example_user_helper_define.cpp
)

Expand Down
3 changes: 2 additions & 1 deletion src/storage/multi_data_source/compile_utility/mds_register.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "src/storage/tablet/ob_tablet_finish_transfer_mds_helper.h"
#include "src/share/balance/ob_balance_task_table_operator.h"
#include "src/storage/tablet/ob_tablet_abort_transfer_mds_helper.h"
#include "src/storage/multi_data_source/ob_tablet_create_mds_ctx.h"
#include "src/storage/multi_data_source/ob_start_transfer_in_mds_ctx.h"
#include "src/storage/multi_data_source/ob_finish_transfer_in_mds_ctx.h"
#include "src/storage/multi_data_source/ob_abort_transfer_in_mds_ctx.h"
Expand Down Expand Up @@ -73,7 +74,7 @@ _GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION_(HELPER_CLASS, BUFFER_CTX_TYPE, ID, ENU
16,\
TEST3)
GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION(::oceanbase::storage::ObTabletCreateMdsHelper,\
::oceanbase::storage::mds::MdsCtx,\
::oceanbase::storage::mds::ObTabletCreateMdsCtx,\
3,\
CREATE_TABLET_NEW_MDS)
GENERATE_MDS_FRAME_CODE_FOR_TRANSACTION(::oceanbase::storage::ObTabletDeleteMdsHelper,\
Expand Down
8 changes: 0 additions & 8 deletions src/storage/multi_data_source/mds_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@
#include "meta_programming/ob_type_traits.h"
namespace oceanbase
{
namespace share
{
class ObLSID;
}
namespace common
{
class ObTabletID;
}
namespace storage
{
namespace mds
Expand Down
80 changes: 59 additions & 21 deletions src/storage/multi_data_source/ob_start_transfer_in_mds_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ ERRSIM_POINT_DEF(EN_START_TRANSFER_IN_ON_PREPARE);

ObStartTransferInMdsCtx::ObStartTransferInMdsCtx()
: MdsCtx(),
version_(ObStartTransferInMdsCtxVersion::CURRENT_CTX_VERSION)
version_(ObStartTransferInMdsCtxVersion::CURRENT_CTX_VERSION),
ls_id_()
{
}

ObStartTransferInMdsCtx::ObStartTransferInMdsCtx(const MdsWriter &writer)
: MdsCtx(writer),
version_(ObStartTransferInMdsCtxVersion::CURRENT_CTX_VERSION)
version_(ObStartTransferInMdsCtxVersion::CURRENT_CTX_VERSION),
ls_id_()
{
}

Expand All @@ -61,6 +63,31 @@ void ObStartTransferInMdsCtx::on_prepare(const share::SCN &prepare_version)
MdsCtx::on_prepare(prepare_version);
}

void ObStartTransferInMdsCtx::on_abort(const share::SCN &abort_scn)
{
mds::MdsCtx::on_abort(abort_scn);

int ret = OB_SUCCESS;
ObLSService *ls_service = MTL(ObLSService*);
ObLSHandle ls_handle;
ObLS *ls = nullptr;

if (OB_UNLIKELY(!ls_id_.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("ls id is invalid", K(ret), K_(ls_id));
} else if (OB_FAIL(ls_service->get_ls(ls_id_, ls_handle, ObLSGetMod::MDS_TABLE_MOD))) {
LOG_WARN("fail to get ls", K(ret), K_(ls_id));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is null", K(ret), K_(ls_id), KP(ls));
} else {
checkpoint::ObTabletEmptyShellHandler *handler = ls->get_tablet_empty_shell_handler();
handler->set_empty_shell_trigger(true/*is_trigger*/);

LOG_INFO("start transfer in tx aborted", K(ret), K_(ls_id), K(abort_scn));
}
}

int ObStartTransferInMdsCtx::serialize(char *buf, const int64_t len, int64_t &pos) const
{
int ret = OB_SUCCESS;
Expand All @@ -80,15 +107,19 @@ int ObStartTransferInMdsCtx::serialize(char *buf, const int64_t len, int64_t &po
LOG_WARN("buffer's length is not enough", K(ret), K(length), K(len - pos));
} else if (ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V1 == version_) {
if (OB_FAIL(MdsCtx::serialize(buf, len, pos))) {
LOG_WARN("failed to serialize mds ctx serialize", K(ret), K(len), K(pos));
LOG_WARN("failed to serialize mds ctx", K(ret), K(len), K(pos));
}
} else {
} else if (ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V2 == version_
|| ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V3 == version_) {
if (OB_FAIL(serialization::encode(buf, len, pos, static_cast<int64_t>(version_)))) {
LOG_WARN("failed to serialize tablet meta's version", K(ret), K(len), K(pos), K_(version));
LOG_WARN("failed to serialize start transfer in mds ctx version", K(ret), K(len), K(pos), K_(version));
} else if (OB_FAIL(serialization::encode_i32(buf, len, pos, length))) {
LOG_WARN("failed to serialize tablet meta's length", K(ret), K(len), K(pos), K(length));
LOG_WARN("failed to serialize start transfer in mds ctx length", K(ret), K(len), K(pos), K(length));
} else if (ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V3 == version_
&& OB_FAIL(ls_id_.serialize(buf, len, pos))) {
LOG_WARN("failed to serialize ls id", K(ret), K(len), K(pos), K_(ls_id));
} else if (OB_FAIL(MdsCtx::serialize(buf, len, pos))) {
LOG_WARN("failed to serialize mds ctx serialize", K(ret), K(len), K(pos));
LOG_WARN("failed to serialize mds ctx", K(ret), K(len), K(pos));
}
}

Expand Down Expand Up @@ -121,23 +152,25 @@ int ObStartTransferInMdsCtx::deserialize(const char *buf, const int64_t len, int
if (OB_FAIL(MdsCtx::deserialize(buf, len, pos))) {
LOG_WARN("failed to deserialize mds ctx", K(ret), K(len), K(pos));
}
} else {
} else if (ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V2 == version_
|| ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V3 == version_) {
if (OB_FAIL(serialization::decode_i32(buf, len, pos, &length))) {
LOG_WARN("failed to deserialize start transfer in mds ctx's length", K(ret), K(len), K(pos));
} else if (ObStartTransferInMdsCtxVersion::CURRENT_CTX_VERSION != version_) {
} else if (OB_UNLIKELY(length > len - saved_pos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid version", K(ret), K_(version));
} else {
if (OB_UNLIKELY(length > len - saved_pos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("buffer's length is not enough", K(ret), K(length), K(len - pos));
} else if (OB_FAIL(MdsCtx::deserialize(buf, len, pos))) {
LOG_WARN("failed to deserialize mds ctx", K(ret), K(len), K(pos));
} else if (OB_UNLIKELY(length != pos - saved_pos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("start transfer in ctx length doesn't match standard length", K(ret), K(saved_pos), K(pos), K(length), K(len));
}
LOG_WARN("buffer's length is not enough", K(ret), K(length), K(len - pos));
} else if (ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V3 == version_
&& OB_FAIL(ls_id_.deserialize(buf, len, pos))) {
LOG_WARN("fail to deserialize ls id", K(ret), K(len), K(pos));
} else if (OB_FAIL(MdsCtx::deserialize(buf, len, pos))) {
LOG_WARN("failed to deserialize mds ctx", K(ret), K(len), K(pos));
} else if (OB_UNLIKELY(length != pos - saved_pos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("start transfer in ctx length doesn't match standard length", K(ret), K(saved_pos), K(pos), K(length), K(len));
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid version", K(ret), K_(version));
}
return ret;
}
Expand All @@ -148,10 +181,15 @@ int64_t ObStartTransferInMdsCtx::get_serialize_size(void) const
const int32_t placeholder_length = 0;
if (ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V1 == version_) {
size += MdsCtx::get_serialize_size();
} else {
} else if (ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V2 == version_
|| ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V3 == version_) {
const int64_t version = static_cast<int64_t>(version_);
size += serialization::encoded_length(version);
size += serialization::encoded_length_i32(placeholder_length);
if (ObStartTransferInMdsCtxVersion::START_TRANSFER_IN_MDS_CTX_VERSION_V3 == version_) {
size += ls_id_.get_serialize_size();
}

size += MdsCtx::get_serialize_size();
}
return size;
Expand Down
28 changes: 14 additions & 14 deletions src/storage/multi_data_source/ob_start_transfer_in_mds_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,12 @@

#include "mds_ctx.h"
#include "lib/container/ob_array.h"
#include "storage/multi_data_source/runtime_utility/mds_tenant_service.h"
#include "lib/container/ob_array_serialization.h"
#include "share/ob_ls_id.h"
#include "storage/multi_data_source/runtime_utility/mds_tenant_service.h"

namespace oceanbase
{
namespace share
{
class ObLSID;
}
namespace common
{
class ObTabletID;
}
namespace storage
{
namespace mds
Expand All @@ -40,9 +33,10 @@ struct ObStartTransferInMdsCtxVersion
enum VERSION {
START_TRANSFER_IN_MDS_CTX_VERSION_V1 = 1,
START_TRANSFER_IN_MDS_CTX_VERSION_V2 = 2,
START_TRANSFER_IN_MDS_CTX_VERSION_V3 = 3,
MAX
};
static const VERSION CURRENT_CTX_VERSION = START_TRANSFER_IN_MDS_CTX_VERSION_V2;
static const VERSION CURRENT_CTX_VERSION = START_TRANSFER_IN_MDS_CTX_VERSION_V3;
static bool is_valid(const ObStartTransferInMdsCtxVersion::VERSION &version) {
return version >= START_TRANSFER_IN_MDS_CTX_VERSION_V1
&& version < MAX;
Expand All @@ -53,14 +47,20 @@ class ObStartTransferInMdsCtx : public MdsCtx
{
public:
ObStartTransferInMdsCtx();
ObStartTransferInMdsCtx(const MdsWriter &writer);
explicit ObStartTransferInMdsCtx(const MdsWriter &writer);
virtual ~ObStartTransferInMdsCtx();
public:
virtual void on_prepare(const share::SCN &prepare_version) override;
virtual int serialize(char *buf, const int64_t len, int64_t &pos) const;
virtual int deserialize(const char *buf, const int64_t len, int64_t &pos);
virtual int64_t get_serialize_size(void) const;
virtual void on_abort(const share::SCN &abort_scn) override;

virtual int serialize(char *buf, const int64_t len, int64_t &pos) const override;
virtual int deserialize(const char *buf, const int64_t len, int64_t &pos) override;
virtual int64_t get_serialize_size(void) const override;
public:
void set_ls_id(const share::ObLSID &ls_id) { ls_id_ = ls_id; }
private:
ObStartTransferInMdsCtxVersion::VERSION version_;
share::ObLSID ls_id_;
DISALLOW_COPY_AND_ASSIGN(ObStartTransferInMdsCtx);
};

Expand Down
160 changes: 160 additions & 0 deletions src/storage/multi_data_source/ob_tablet_create_mds_ctx.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/

#include "storage/multi_data_source/ob_tablet_create_mds_ctx.h"
#include "lib/utility/serialization.h"
#include "storage/ls/ob_ls_get_mod.h"
#include "storage/tx_storage/ob_empty_shell_task.h"
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/tx_storage/ob_ls_handle.h"

#define USING_LOG_PREFIX MDS

namespace oceanbase
{
namespace storage
{
namespace mds
{
ObTabletCreateMdsCtx::ObTabletCreateMdsCtx()
: MdsCtx(),
magic_(MAGIC),
version_(VERSION),
ls_id_()
{
}

ObTabletCreateMdsCtx::ObTabletCreateMdsCtx(const MdsWriter &writer)
: MdsCtx(writer),
magic_(MAGIC),
version_(VERSION),
ls_id_()
{
}

void ObTabletCreateMdsCtx::on_abort(const share::SCN &abort_scn)
{
mds::MdsCtx::on_abort(abort_scn);

int ret = OB_SUCCESS;
ObLSService *ls_service = MTL(ObLSService*);
ObLSHandle ls_handle;
ObLS *ls = nullptr;

if (OB_UNLIKELY(!ls_id_.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("ls id is invalid", K(ret), K_(ls_id));
} else if (OB_FAIL(ls_service->get_ls(ls_id_, ls_handle, ObLSGetMod::MDS_TABLE_MOD))) {
LOG_WARN("fail to get ls", K(ret), K_(ls_id));
} else if (OB_ISNULL(ls = ls_handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls is null", K(ret), K_(ls_id), KP(ls));
} else {
checkpoint::ObTabletEmptyShellHandler *handler = ls->get_tablet_empty_shell_handler();
handler->set_empty_shell_trigger(true/*is_trigger*/);

LOG_INFO("tablet create tx aborted", K(ret), K_(ls_id), K(abort_scn));
}
}

int ObTabletCreateMdsCtx::serialize(char *buf, const int64_t buf_len, int64_t &pos) const
{
int ret = OB_SUCCESS;
const int64_t serialize_size = get_serialize_size();
int64_t tmp_pos = pos;

if (OB_ISNULL(buf)
|| OB_UNLIKELY(buf_len <= 0)
|| OB_UNLIKELY(pos < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(buf), K(buf_len), K(pos));
} else if (OB_UNLIKELY(buf_len - pos < serialize_size)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("buffer len is not enough to serialize", K(ret), K(buf_len), K(pos), K(serialize_size));
} else if (VERSION == version_) {
if (OB_FAIL(MdsCtx::serialize(buf, buf_len, tmp_pos))) {
LOG_WARN("failed to serialize mds ctx", K(ret), K(buf_len), K(tmp_pos));
} else if (OB_FAIL(serialization::encode(buf, buf_len, tmp_pos, magic_))) {
LOG_WARN("fail to serialize magic", K(ret), K(buf_len), K(tmp_pos), K_(magic));
} else if (OB_FAIL(serialization::encode(buf, buf_len, tmp_pos, version_))) {
LOG_WARN("fail to serialize version", K(ret), K(buf_len), K(tmp_pos), K_(version));
} else if (OB_FAIL(serialization::encode_i64(buf, buf_len, tmp_pos, serialize_size))) {
LOG_WARN("fail to serialize length", K(ret), K(buf_len), K(tmp_pos), K(serialize_size));
} else if (OB_FAIL(ls_id_.serialize(buf, buf_len, tmp_pos))) {
LOG_WARN("fail to serialize ls id", K(ret), K(buf_len), K(tmp_pos), K_(ls_id));
} else {
pos = tmp_pos;
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected version", K(ret), K_(version));
}

return ret;
}

int ObTabletCreateMdsCtx::deserialize(const char *buf, const int64_t buf_len, int64_t &pos)
{
int ret = OB_SUCCESS;
int64_t tmp_pos = pos;
int32_t magic = -1;
int32_t version = -1;
int64_t serialize_size = 0;

if (OB_ISNULL(buf)
|| OB_UNLIKELY(buf_len <= 0)
|| OB_UNLIKELY(pos < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", K(ret), K(buf), K(buf_len), K(pos));
} else if (OB_FAIL(MdsCtx::deserialize(buf, buf_len, tmp_pos))) {
LOG_WARN("fail to deserialize mds ctx", K(ret), K(buf_len), K(tmp_pos));
} else if (OB_FAIL(serialization::decode(buf, buf_len, tmp_pos, magic))) {
LOG_WARN("failed to deserialize magic", K(ret), K(buf_len), K(tmp_pos));
} else if (OB_UNLIKELY(magic != MAGIC)) {
FLOG_INFO("magic does not match, maybe this is old version data", K(ret), K(magic), LITERAL_K(MAGIC));
version_ = VERSION;
ls_id_ = ObLSID::INVALID_LS_ID;
pos = tmp_pos;
} else if (OB_FAIL(serialization::decode(buf, buf_len, tmp_pos, version))) {
LOG_WARN("failed to deserialize version", K(ret), K(buf_len), K(tmp_pos));
} else if (OB_UNLIKELY(VERSION != version)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("version does not match", K(ret), K(version));
} else if (OB_FAIL(serialization::decode_i64(buf, buf_len, tmp_pos, &serialize_size))) {
LOG_WARN("failed to deserialize serialize size", K(ret), K(buf_len), K(tmp_pos));
} else if (tmp_pos - pos < serialize_size && OB_FAIL(ls_id_.deserialize(buf, buf_len, tmp_pos))) {
LOG_WARN("failed to deserialize ls id", K(ret), K(buf_len), K(tmp_pos));
} else if (OB_UNLIKELY(tmp_pos - pos != serialize_size)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("deserialize length does not match", K(ret), K(buf_len), K(pos), K(tmp_pos), K(serialize_size));
} else {
version_ = version;
pos = tmp_pos;
}

return ret;
}

int64_t ObTabletCreateMdsCtx::get_serialize_size() const
{
int64_t size = 0;
int64_t serialize_size = 0; // dummy
size += MdsCtx::get_serialize_size();
size += serialization::encoded_length(magic_);
size += serialization::encoded_length(version_);
size += serialization::encoded_length_i64(serialize_size);
size += ls_id_.get_serialize_size();
return size;
}
} // namespace mds
} // namespace storage
} // namespace oceanbase

0 comments on commit c129d71

Please sign in to comment.