New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
OTel AccessLogger: support log_written/dropped for OTel partial rejected log records #34072
Conversation
Signed-off-by: Xuyang Tao <taoxuy@google.com>
could you take a look at the format error please @TAOXUY ERROR: ./source/extensions/access_loggers/common/grpc_access_logger.h:108: Don't use std::optional; use absl::optional instead |
Signed-off-by: Xuyang Tao <taoxuy@google.com>
Done. Thank! |
Signed-off-by: Xuyang Tao <taoxuy@google.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/wait-any
source/extensions/access_loggers/common/grpc_access_logger_clients.h
Outdated
Show resolved
Hide resolved
[this, &dispatcher]() { | ||
// It will be deleted in the callbacks. | ||
auto* callback = new OTelLogRequestCallbacks(dispatcher, this->stats_, | ||
this->batched_log_entries_); | ||
this->batched_log_entries_ = 0; | ||
return callback; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather then create a self-management request callbacks object, I prefer to manage all these pending requests in the unary client directly. And the unary client should provide a way to let the specific implementation to provide a callbacks.
Then things would be simpler and the lifetime of these objects would be more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @wbpcode , could you please explain which class owns what and how the deletion works in your mind? Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will assume the unary client will maintains a set of pending requests in a flat_hash_map
. The request pointer (returned by the send()
) will be used as the key. And the value is a simple wrapper that like current OTelLogRequestCallbacks
. The wrapper will delegate the callback calling to the unary client to remove itself from the map and execute some logic that specified by the specific logger.
Then when the unary client self is destroyed, it will cancel all pending requests to ensure no any further logic will be executed. (This is the key difference)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! DONE!
PTAL @wbpcode @yanavlasov
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your quick response and hard work. But I think the most important point still doesn't be resolved.
Then when the unary client self is destroyed, it will cancel all pending requests to ensure no any further logic will be executed. (This is the key difference)
See, the pending requests will still be pending when the client is destroyed and the pending requests may use dangling reference to the callbacks objects or dangling reference to the client stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
You are talking about the potential issues: the ongoing stream access the deleted callback/stats owned under GrpcAccessLoggerImpl
.
I think in theory it shouldn't happen because GrpcAccessLoggerImpl
as common::GrpcAccessLogger
owns the AsyncClientImpl
, holding the AsyncStream
. So when GrpcAccessLoggerImpl
is destructed, all the active streams should be destructed as well and there won't be callbacks in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a quick check to the source code of grpc client. Yeah, all active streams will be destroyed automatically. But seems you still need to call the cancel() explicitly to reset underlying http request, needn't it?
(Will do a deeper check tomorrow in case I miss something)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When AsyncStreamImpl get destructed, it will reset the underlying http stream.
envoy/source/common/grpc/async_client_impl.cc
Lines 35 to 39 in 1ae1aa9
ASSERT(isThreadSafe()); | |
while (!active_streams_.empty()) { | |
active_streams_.front()->resetStream(); | |
} | |
} |
envoy/source/common/grpc/async_client_impl.cc
Lines 237 to 241 in 1ae1aa9
if (!http_reset_) { | |
http_reset_ = true; | |
stream_->reset(); | |
} | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the confirmation. It makes sense. Seems my previous comments are based on mine incorrect knowledge.
/wait-any |
Signed-off-by: Xuyang Tao <taoxuy@google.com>
Signed-off-by: Xuyang Tao <taoxuy@google.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/wait
source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h
Outdated
Show resolved
Hide resolved
source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h
Outdated
Show resolved
Hide resolved
: public Grpc::AsyncRequestCallbacks< | ||
opentelemetry::proto::collector::logs::v1::ExportLogsServiceResponse> { | ||
public: | ||
OTelLogRequestCallbacks(Envoy::Event::Dispatcher& dispatcher, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is dispatcher
still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.h
Outdated
Show resolved
Hide resolved
source/extensions/access_loggers/open_telemetry/grpc_access_log_impl.cc
Outdated
Show resolved
Hide resolved
Signed-off-by: Xuyang Tao <taoxuy@google.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/wait
this->stats_, this->batched_log_entries_, | ||
[this](OTelLogRequestCallbacks* p) { this->callbacks_.erase(p); }); | ||
this->batched_log_entries_ = 0; | ||
this->callbacks_.emplace(ptr, ptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should crash in non optimized builds. ptr is not a valid iterator position in callbacks_
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Yan, I think emplace(ptr, ptr)
treats the first argument as the key and the second argument to construct a unique_ptr as value. I don't know why you think it won't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry. I did not realize you were using hash map. Please disregard.
return [this]() { | ||
OTelLogRequestCallbacks* ptr = new OTelLogRequestCallbacks( | ||
this->stats_, this->batched_log_entries_, | ||
[this](OTelLogRequestCallbacks* p) { this->callbacks_.erase(p); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think you can do it this way. You need to first find p
in callbacks_ and then delete at the iterator if it was found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you asking about:
- this API doesn't work?
- or we need to do a sanity check on the existence?
For 1), it should work. absl::flat_hash_map has the api to directly remove the key
For 2), I just added the sanity check in case it is called twice.
template <typename LogRequest, typename LogResponse> | ||
class UnaryGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> { | ||
public: | ||
typedef std::function<Grpc::AsyncRequestCallbacks<LogResponse>*()> AsyncRequestCallbacksFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think now you can make the factory return std::unique_ptr. This will indicate that there is an owner for the object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are letting OpenTelemetry::GrpcAccessLoggerImpl to owns the callback so we only need to return pointer/reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see how you did it. I would just return reference instead of pointer then to signify that the callback object is owned elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DONE.
Signed-off-by: Xuyang Tao <taoxuy@google.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with a few minor comment.
template <typename LogRequest, typename LogResponse> | ||
class UnaryGrpcAccessLogClient : public GrpcAccessLogClient<LogRequest, LogResponse> { | ||
public: | ||
typedef std::function<Grpc::AsyncRequestCallbacks<LogResponse>*()> AsyncRequestCallbacksFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see how you did it. I would just return reference instead of pointer then to signify that the callback object is owned elsewhere.
this->stats_, this->batched_log_entries_, | ||
[this](OTelLogRequestCallbacks* p) { this->callbacks_.erase(p); }); | ||
this->batched_log_entries_ = 0; | ||
this->callbacks_.emplace(ptr, ptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry. I did not realize you were using hash map. Please disregard.
std::function<GrpcAccessLoggerImpl::OTelLogRequestCallbacks*()> | ||
GrpcAccessLoggerImpl::genOTelCallbacksFactory() { | ||
return [this]() { | ||
OTelLogRequestCallbacks* ptr = new OTelLogRequestCallbacks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use std::make_unique here to make it more idiomatic C++.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DONE.
Signed-off-by: Xuyang Tao <taoxuy@google.com>
The change does not build: https://source.cloud.google.com/results/invocations/53d8338b-909c-4f57-b0d7-e4bfd267524c |
/wait |
Fixed. |
This PR makes the OTel access logger to support log_written/log_dropped tracking
Context:
log_written/log_dropped
tracked in GrpcAccessLogger doesn't make sense for unary gRPC client, which always increment log_written for each incoming log entry. It couldn't record the 2 cases mentioned above.What this PR contains:
log_written/log_dropped
from GrpcAccessLogger to OpenTelemetry::GrpcAccessLoggerImpl while no changes for streaming case.