Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster entering a recovery loop may caused by Kafka sink QueueFull (Local: Queue full) #16640

Open
ly9chee opened this issue May 8, 2024 · 4 comments
Assignees
Labels
type/bug Something isn't working user-feedback
Milestone

Comments

@ly9chee
Copy link

ly9chee commented May 8, 2024

Describe the bug

Description

The cluster entering a recovery loop when creating kafka sink from a mv (about 4 million records); when the problematic sink was dropped, the system went back to normal.

create sink sink_1 from mv1
WITH (
    connector='kafka',
    primary_key='***',
    topic='***',
    properties.bootstrap.server='***',
    properties.compression.codec='zstd'
) FORMAT UPSERT ENCODE JSON

20 minutes later, the sink had been successfully recreated.
image

Other observed phenomena

Kafka should work fine, because within the recovery period, the topic had 140 million records written into it, but the upstream mv only had 4 million records.
image

Not sure increasing properties.retry.max can solve this issue.

Error message/log

2024-05-08T02:50:36.677896483Z  WARN actor{otel.name="Actor 4487" actor_id=4487 prev_epoch=6416051083608064 curr_epoch=6416068298735616}:executor{otel.name="Sink 118700000001"}: risingwave_connector::sink::kafka: producing message (key Some([*, *, *])) to topic *** failed error=Message production error: QueueFull (Local: Queue full)
2024-05-08T02:50:36.677906392Z  WARN actor{otel.name="Actor 4487" actor_id=4487 prev_epoch=6416051083608064 curr_epoch=6416068298735616}:executor{otel.name="Sink 118700000001"}: risingwave_connector::sink::kafka: Producer queue full. Delivery future buffer size=100029. Await and retry #1
2024-05-08T02:50:36.677909398Z  WARN actor{otel.name="Actor 4487" actor_id=4487 prev_epoch=6416051083608064 curr_epoch=6416068298735616}:executor{otel.name="Sink 118700000001"}: risingwave_connector::sink::kafka: producing message (key Some([*, *, *])) to topic *** error=Message production error: QueueFull (Local: Queue full)
2024-05-08T02:50:36.677920839Z  WARN actor{otel.name="Actor 4487" actor_id=4487 prev_epoch=6416051083608064 curr_epoch=6416068298735616}:executor{otel.name="Sink 118700000001"}: risingwave_connector::sink::kafka: Producer queue full. Delivery future buffer size=100028. Await and retry #2
2024-05-08T02:50:37.377364973Z ERROR risingwave_stream::task::stream_manager: actor exit with error actor_id=4490 error=Executor error: Sink error: Kafka error: Message production error: QueueFull (Local: Queue full)

Backtrace:
   0: <thiserror_ext::backtrace::MaybeBacktrace as thiserror_ext::backtrace::WithBacktrace>::capture
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/thiserror-ext-0.0.11/src/backtrace.rs:30:18
   1: thiserror_ext::ptr::ErrorBox<T,B>::new
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/thiserror-ext-0.0.11/src/ptr.rs:40:33
   2: <risingwave_stream::executor::error::StreamExecutorError as core::convert::From<E>>::from
             at ./risingwave/src/stream/src/executor/error.rs:35:35
   3: <core::result::Result<T,F> as core::ops::try_trait::FromResidual<core::result::Result<core::convert::Infallible,E>>>::from_residual
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/result.rs:1959:27
   4: risingwave_stream::executor::sink::SinkExecutor<F>::execute_consume_log::{{closure}}
             at ./risingwave/src/stream/src/executor/sink.rs:430:13
   5: <futures_util::stream::once::Once<Fut> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/once.rs:46:33
                                                                                                      51440,1       17%
   5: <futures_util::stream::once::Once<Fut> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/once.rs:46:33
   6: <futures_util::future::future::IntoStream<F> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/lib.rs:102:13
   7: futures_util::stream::select_with_strategy::poll_side
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/select_with_strategy.rs:219:27
   8: futures_util::stream::select_with_strategy::poll_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/select_with_strategy.rs:234:28
   9: <futures_util::stream::select_with_strategy::SelectWithStrategy<St1,St2,Clos,State> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/select_with_strategy.rs:270:17
  10: <futures_util::stream::select::Select<St1,St2> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/select.rs:115:9
  11: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  12: <futures_util::stream::stream::flatten::Flatten<St,<St as futures_core::stream::Stream>::Item> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/flatten.rs:50:44
  13: <futures_util::stream::stream::Flatten<St> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/lib.rs:102:13
  14: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  15: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  16: futures_util::stream::stream::StreamExt::poll_next_unpin
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/mod.rs:1638:9
  17: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/next.rs:32:9
  18: <await_tree::future::Instrumented<F,_> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/await-tree-0.1.1/src/future.rs:124:23
  19: risingwave_stream::executor::wrapper::trace::instrument_await_tree::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/trace.rs:115:10
                                                                                                      51457,4       17%
  19: risingwave_stream::executor::wrapper::trace::instrument_await_tree::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/trace.rs:115:10
  20: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  21: risingwave_stream::executor::wrapper::schema_check::schema_check::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/schema_check.rs:24:1
  22: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  23: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  24: futures_util::stream::stream::StreamExt::poll_next_unpin
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/mod.rs:1638:9
  25: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/next.rs:32:9
  26: risingwave_stream::executor::wrapper::epoch_check::epoch_check::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/epoch_check.rs:31:44
  27: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  28: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  29: <S as futures_core::stream::TryStream>::try_poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:196:9
  30: futures_util::stream::try_stream::TryStreamExt::try_poll_next_unpin
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/try_stream/mod.rs:1131:9
  31: <futures_util::stream::try_stream::try_next::TryNext<St> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/try_stream/try_next.rs:32:9
  32: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:347:31
  33: tokio::task::task_local::LocalKey<T>::scope_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
  34: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:343:19
  35: risingwave_common::util::epoch::task_local::scope::{{closure}}
             at ./risingwave/src/common/src/util/epoch.rs:244:47
  36: risingwave_stream::executor::wrapper::epoch_provide::epoch_provide::{{closure}}
                                                                                                      51485,3       17%
             at ./risingwave/src/common/src/util/epoch.rs:244:47
  36: risingwave_stream::executor::wrapper::epoch_provide::epoch_provide::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/epoch_provide.rs:31:59
  37: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  38: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  39: futures_util::stream::stream::StreamExt::poll_next_unpin
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/mod.rs:1638:9
  40: <futures_util::stream::stream::next::Next<St> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/next.rs:32:9
  41: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tracing-0.1.40/src/instrument.rs:321:9
  42: risingwave_stream::executor::wrapper::trace::trace::{{closure}}
             at ./risingwave/src/stream/src/executor/wrapper/trace.rs:48:69
  43: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  44: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  45: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  46: <risingwave_stream::executor::dispatch::DispatchExecutor as risingwave_stream::executor::StreamConsumer>::execute::{{closure}}
             at ./risingwave/src/stream/src/executor/dispatch.rs:382:9
  47: <futures_async_stream::try_stream::GenTryStream<G> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-async-stream-0.2.9/src/lib.rs:506:33
  48: <core::pin::Pin<P> as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:120:9
  49: <&mut S as futures_core::stream::Stream>::poll_next
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-core-0.3.30/src/stream.rs:104:9
  50: <tokio_stream::stream_ext::next::Next<St> as core::future::future::Future>::poll
             at ./root/.cargo/git/checkouts/tokio-968c02b7a1a41bea/fe39bb8/tokio-stream/src/stream_ext/next.rs:42:9
  51: <tokio_stream::stream_ext::try_next::TryNext<St> as core::future::future::Future>::poll
             at ./root/.cargo/git/checkouts/tokio-968c02b7a1a41bea/fe39bb8/tokio-stream/src/stream_ext/try_next.rs:43:9
  52: <tracing::instrument::Instrumented<T> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tracing-0.1.40/src/instrument.rs:321:9
  53: <await_tree::future::Instrumented<F,_> as core::future::future::Future>::poll
                                                                                                      51518,14      17%
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tracing-0.1.40/src/instrument.rs:321:9
  53: <await_tree::future::Instrumented<F,_> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/await-tree-0.1.1/src/future.rs:124:23
  54: risingwave_stream::executor::actor::Actor<C>::run_consumer::{{closure}}
             at ./risingwave/src/stream/src/executor/actor.rs:206:18
  55: <tokio::future::maybe_done::MaybeDone<Fut> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/future/maybe_done.rs:68:48
  56: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}::{{closure}}::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/macros/join.rs:126:24
  57: <tokio::future::poll_fn::PollFn<F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/future/poll_fn.rs:58:9
  58: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}::{{closure}}
             at ./risingwave/src/stream/src/executor/actor.rs:162:17
  59: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:347:31
  60: tokio::task::task_local::LocalKey<T>::scope_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
  61: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:343:19
  62: risingwave_expr::expr_context::expr_context_scope::{{closure}}
             at ./risingwave/src/expr/core/src/expr_context.rs:35:65
  63: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:347:31
  64: tokio::task::task_local::LocalKey<T>::scope_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
  65: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:343:19
  66: risingwave_stream::executor::actor::Actor<C>::run::{{closure}}
             at ./risingwave/src/stream/src/executor/actor.rs:170:10
  67: <futures_util::future::future::map::Map<Fut,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/future/map.rs:55:37
  68: <futures_util::future::future::Map<Fut,F> as core::future::future::Future>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/lib.rs:91:13
  69: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:347:31
  70: tokio::task::task_local::LocalKey<T>::scope_inner
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/task/task_local.rs:217:19
  71: <tokio::task::task_local::TaskLocalFuture<T,F> as core::future::future::Future>::poll
                                                                                                      51552,14      17%
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/task/harness.rs:208:27
 114: tokio::runtime::task::harness::Harness<T,S>::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/task/harness.rs:153:15
 115: tokio::runtime::task::raw::RawTask::poll
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/task/raw.rs:200:18
 116: tokio::runtime::task::UnownedTask<S>::run
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/task/mod.rs:437:9
 117: tokio::runtime::blocking::pool::Task::run
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/blocking/pool.rs:159:9
 118: tokio::runtime::blocking::pool::Inner::run
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/blocking/pool.rs:513:17
 119: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
             at ./root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.32.0/src/runtime/blocking/pool.rs:471:13
 120: std::sys_common::backtrace::__rust_begin_short_backtrace
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/sys_common/backtrace.rs:155:18
 121: std::thread::Builder::spawn_unchecked_::{{closure}}::{{closure}}
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/thread/mod.rs:529:17
 122: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/panic/unwind_safe.rs:272:9
 123: std::panicking::try::do_call
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/panicking.rs:552:40
 124: std::panicking::try
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/panicking.rs:516:19
 125: std::panic::catch_unwind
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/panic.rs:142:14
 126: std::thread::Builder::spawn_unchecked_::{{closure}}
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/thread/mod.rs:528:30
 127: core::ops::function::FnOnce::call_once{{vtable.shim}}
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/core/src/ops/function.rs:250:5
 128: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/alloc/src/boxed.rs:2015:9
 129: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/alloc/src/boxed.rs:2015:9
 130: std::sys::unix::thread::Thread::new::thread_start
                                                                                                      51674,14      17%
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/alloc/src/boxed.rs:2015:9
 130: std::sys::unix::thread::Thread::new::thread_start
             at ./rustc/e4c626dd9a17a23270bf8e7158e59cf2b9c04840/library/std/src/sys/unix/thread.rs:108:17
 131: start_thread
             at ./nptl/pthread_create.c:442:8
 132: __GI___clone
             at ./misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:100

To Reproduce

No response

Expected behavior

No response

How did you deploy RisingWave?

No response

The version of RisingWave

v1.8.2

Additional context

No response

@ly9chee ly9chee added the type/bug Something isn't working label May 8, 2024
@github-actions github-actions bot added this to the release-1.10 milestone May 8, 2024
@xiangjinwu
Copy link
Contributor

@tabVersion Any recommendations / thoughts on this?

@tabVersion
Copy link
Contributor

Seeing the log here

ERROR risingwave_stream::task::stream_manager: actor exit with error actor_id=4490 error=Executor error: Sink error: Kafka error: Message production error: QueueFull (Local: Queue full)

It indicates the batching is too small and it triggers a small failover to ingest the data within the same epoch over and over again.
I'd suggest increasing properties.queue.buffering.max.ms and `` to allow larger batching cache and reduce the overhead for each batch. Related issue confluentinc/librdkafka#2247

@ly9chee
Copy link
Author

ly9chee commented May 10, 2024

@tabVersion Thanks for the suggestions, but I have a concern about how to set those properties well, because in practice, the upstream thruput can change frequently, it may work pretty well when the upstream thruput is 1k/s, but enters a recovery loop when the upstream thruput gets high(100k/s) or Kafka is experiencing high load. And when we encounter this error, the only thing we can do is drop this sink to prevent cluster from continuing to crash.

It seems that in KafkaPayloadWriter implementation, when a Queue Full error is encountered, we only await one delivery to be sent and immediately create a new delivery future.

match e {
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull) => {
tracing::warn!(
"Producer queue full. Delivery future buffer size={}. Await and retry #{}",
self.add_future.future_count(),
i
);
self.add_future.await_one_delivery().await?;
continue;
}
_ => return Err(e.into()),
}

In this case, I think we might await all inflight deliveries being drained or wait a sufficient time before doing a retry, otherwise the producer queue may keep reaching full.

@ly9chee
Copy link
Author

ly9chee commented May 10, 2024

It seems this issue is not urgent, because I can't reproduce it.😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working user-feedback
Projects
None yet
Development

No branches or pull requests

4 participants