Skip to content

Commit

Permalink
Merge pull request #18075 from andrwng/v24.1.x-storage-log-reader-der…
Browse files Browse the repository at this point in the history
…ecurse

[v24.1.x] storage: coroutinize/de-recursify log_reader::do_load_slice
  • Loading branch information
piyushredpanda committed Apr 25, 2024
2 parents 2da5b2e + f6ce717 commit b5ade3f
Showing 1 changed file with 129 additions and 125 deletions.
254 changes: 129 additions & 125 deletions src/v/storage/log_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

#include <fmt/ostream.h>

#include <exception>

namespace {
model::record_batch make_ghost_batch(
model::offset start_offset, model::offset end_offset, model::term_id term) {
Expand Down Expand Up @@ -382,133 +384,135 @@ void log_reader::maybe_log_load_slice_depth_warning(
ss::future<log_reader::storage_t>
log_reader::do_load_slice(model::timeout_clock::time_point timeout) {
_load_slice_depth = 0;
return load_slice(timeout);
}
while (true) {
_load_slice_depth++;
if (is_done()) {
// must keep this function because, the segment might not be done
// but offsets might have exceeded the read
set_end_of_stream();
co_await _iterator.close();
co_return log_reader::storage_t{};
}
if (_last_base == _config.start_offset) {
set_end_of_stream();
co_await _iterator.close();
co_return log_reader::storage_t{};
}
/**
* We do not want to close the reader if we stopped because requested
* range was read. This way we make it possible to reset configuration
* and reuse underlying file input stream.
*/
if (
_config.start_offset > _config.max_offset
|| _config.bytes_consumed > _config.max_bytes
|| _config.over_budget) {
set_end_of_stream();
co_return log_reader::storage_t{};
}
maybe_log_load_slice_depth_warning("reading more");
_last_base = _config.start_offset;
ss::future<> fut = find_next_valid_iterator();
if (is_end_of_stream()) {
co_await std::move(fut);
co_return log_reader::storage_t{};
}
std::exception_ptr e;
try {
co_await std::move(fut);
auto recs = co_await _iterator.reader->read_some(timeout);
if (!recs) {
set_end_of_stream();

if (!_lease->range.empty()) {
// Readers do not know their ntp directly: discover
// it by checking the segments in our lease
auto seg_ptr = *(_lease->range.begin());
vlog(
stlog.info,
"stopped reading stream[{}]: {}",
seg_ptr->path().get_ntp(),
recs.error().message());
} else {
// Leases should always have a segment, but this is
// not a strict invariant at present, so handle the
// empty case.
vlog(
stlog.info,
"stopped reading stream: {}",
recs.error().message());
}

auto const batch_parse_err
= recs.error() == parser_errc::header_only_crc_missmatch
|| recs.error()
== parser_errc::input_stream_not_enough_bytes;

if (batch_parse_err) {
_probe.batch_parse_error();
}
co_await _iterator.close();
co_return log_reader::storage_t{};
}
if (recs.value().empty()) {
/*
* if no records are returned it may be the case that all of the
* batches in the segment were skipped (e.g. all control
* batches). thus, returning no records does not imply end of
* stream. instead, we continue which will advance the iterator
* and check end of stream.
*/
maybe_log_load_slice_depth_warning("load next slice");
continue;
}
// Update the probe without the ghost batches.
_probe.add_batches_read(recs.value().size());

auto& batches = recs.value();
if (_config.fill_gaps && _expected_next.has_value()) {
records_t batches_filled;
batches_filled.reserve(batches.size());
for (auto& b : batches) {
if (b.base_offset() > _expected_next) {
auto gb = make_ghost_batches(
_expected_next.value(),
model::prev_offset(b.base_offset()),
b.term());
std::move(
gb.begin(),
gb.end(),
std::back_inserter(batches_filled));
}
_expected_next = model::next_offset(b.last_offset());
batches_filled.emplace_back(std::move(b));
}
co_return std::move(batches_filled);
}
// To keep things consistent, our internal accounting is all done in
// untranslated offsets, even if we've been requested to return
// translated offsets.
_expected_next = model::next_offset(batches.back().last_offset());

if (_config.translate_offsets) {
vassert(
_translator, "Expected offset translactor to be initialized");
for (auto& b : batches) {
b.header().base_offset = _translator->from_log_offset(
b.base_offset());
}
}

ss::future<log_reader::storage_t>
log_reader::load_slice(model::timeout_clock::time_point timeout) {
_load_slice_depth++;
if (is_done()) {
// must keep this function because, the segment might not be done
// but offsets might have exceeded the read
set_end_of_stream();
return _iterator.close().then(
[] { return ss::make_ready_future<storage_t>(); });
}
if (_last_base == _config.start_offset) {
set_end_of_stream();
return _iterator.close().then(
[] { return ss::make_ready_future<storage_t>(); });
}
/**
* We do not want to close the reader if we stopped because requested range
* was read. This way we make it possible to reset configuration and reuse
* underlying file input stream.
*/
if (
_config.start_offset > _config.max_offset
|| _config.bytes_consumed > _config.max_bytes || _config.over_budget) {
set_end_of_stream();
return ss::make_ready_future<storage_t>();
}
maybe_log_load_slice_depth_warning("reading more");
_last_base = _config.start_offset;
ss::future<> fut = find_next_valid_iterator();
if (is_end_of_stream()) {
return fut.then([] { return ss::make_ready_future<storage_t>(); });
co_return std::move(batches);
} catch (...) {
e = std::current_exception();
set_end_of_stream();
_probe.batch_parse_error();
}
// Non-exceptional cases should have continued or early-returned above.
vassert(e, "Expected exception");
co_await _iterator.close();
std::rethrow_exception(e);
}
return fut
.then([this, timeout] { return _iterator.reader->read_some(timeout); })
.then([this, timeout](result<records_t> recs) -> ss::future<storage_t> {
if (!recs) {
set_end_of_stream();

if (!_lease->range.empty()) {
// Readers do not know their ntp directly: discover
// it by checking the segments in our lease
auto seg_ptr = *(_lease->range.begin());
vlog(
stlog.info,
"stopped reading stream[{}]: {}",
seg_ptr->path().get_ntp(),
recs.error().message());
} else {
// Leases should always have a segment, but this is
// not a strict invariant at present, so handle the
// empty case.
vlog(
stlog.info,
"stopped reading stream: {}",
recs.error().message());
}

auto const batch_parse_err
= recs.error() == parser_errc::header_only_crc_missmatch
|| recs.error() == parser_errc::input_stream_not_enough_bytes;

if (batch_parse_err) {
_probe.batch_parse_error();
}
return _iterator.close().then(
[] { return ss::make_ready_future<storage_t>(); });
}
if (recs.value().empty()) {
/*
* if no records are returned it may be the case that all of the
* batches in the segment were skipped (e.g. all control batches).
* thus, returning no records does not imply end of stream.
* instead, we recurse which will advance the iterator and check
* end of stream.
*/
maybe_log_load_slice_depth_warning("load next slice");
return load_slice(timeout);
}
// Update the probe without the ghost batches.
_probe.add_batches_read(recs.value().size());

auto& batches = recs.value();
if (_config.fill_gaps && _expected_next.has_value()) {
records_t batches_filled;
batches_filled.reserve(batches.size());
for (auto& b : batches) {
if (b.base_offset() > _expected_next) {
auto gb = make_ghost_batches(
_expected_next.value(),
model::prev_offset(b.base_offset()),
b.term());
std::move(
gb.begin(),
gb.end(),
std::back_inserter(batches_filled));
}
_expected_next = model::next_offset(b.last_offset());
batches_filled.emplace_back(std::move(b));
}
return ss::make_ready_future<storage_t>(
std::move(batches_filled));
}
// To keep things consistent, our internal accounting is all done in
// untranslated offsets, even if we've been requested to return
// translated offsets.
_expected_next = model::next_offset(batches.back().last_offset());

if (_config.translate_offsets) {
vassert(
_translator, "Expected offset translactor to be initialized");
for (auto& b : batches) {
b.header().base_offset = _translator->from_log_offset(
b.base_offset());
}
}

return ss::make_ready_future<storage_t>(std::move(batches));
})
.handle_exception([this](std::exception_ptr e) {
set_end_of_stream();
_probe.batch_parse_error();
return _iterator.close().then(
[e] { return ss::make_exception_future<storage_t>(e); });
});
}

static inline bool is_finished_offset(segment_set& s, model::offset o) {
Expand Down

0 comments on commit b5ade3f

Please sign in to comment.