Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rocketmq flusher #850

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

add rocketmq flusher #850

wants to merge 2 commits into from

Conversation

wlazjr
Copy link

@wlazjr wlazjr commented May 16, 2023

add rocketmq flusher

@CLAassistant
Copy link

CLAassistant commented May 16, 2023

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.


伴农 seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

@messixukejia
Copy link
Collaborator

很高效👍
贵公司如果已经在使用ilogtail,欢迎在这里登记下,多谢。#693

@messixukejia
Copy link
Collaborator

CI的问题需要处理下

}

type FlusherRocketmq struct {
context pipeline.Context
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlusherRocketmq这个结构体属性建议把面向用户配置和内部逻辑使用的分开,不要混合在一起,大写和小写命名的可以参考其它flusher 分割下

}
for index, log := range logs.([][]byte) {
valueMap := values[index]
topic, err := fmtstr.FormatTopic(valueMap, r.Topic)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里参考当前flusher pulsar的逻辑优化下,大部分场景不会触发动态topic,FormatTopic有一定的性能损耗,如果静态topic就不要调用FormatTopic

@shalousun
Copy link
Collaborator

shalousun commented May 16, 2023

"context"
"errors"
"fmt"
"github.com/alibaba/ilogtail/pkg/fmtstr"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import这块按照golang的规范格式化下

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all right, thanks bro, first time writing golang, I will fix above issues later.

Copy link
Collaborator

@henryzhx8 henryzhx8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要加一下e2e测试,详情见doc

|----------------------------|--------|------|-------------------------------------------------------------|
| NameServers | String | 是 | name server address |
| Topic | String | 是 | rocketmq Topic,支持动态topic, 例如: `test_%{content.appname}` |
| Sync | bool | 否 | 默认为异常发送 |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

异常发送?异步?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

异步


| 参数 | 类型 | 是否必选 | 说明 |
|----------------------------|--------|------|-------------------------------------------------------------|
| NameServers | String | 是 | name server address |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

名字和其它插件统一一下吧

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你是说要改成Brokers么,kafka配置里写的是brokers的地址,rocketmq是需要填namesever地址,然后通过nameserver找到broker

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个保留和中间件本身的术语应该OK

r.context = context
if r.NameServers == nil || len(r.NameServers) == 0 {
var err = errors.New("name server is nil")
logger.Error(r.context.GetRuntimeContext(), "FLUSHER_INIT_ALARM", "init rocketmq flusher fail, eerror", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo,以及可以在外层打印错误日志

return nil
}

type FlusherFunc func(projectName string, logstoreName string, configName string, logGroupList []*protocol.LogGroup) error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个除了这种flush模式外,还有其它模式可能吗?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

还有单条的,这次没加,等后续再加

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不是让添加rocketmq的license文件,可以看下LICENSE_OF_ILOGTAIL_DEPENDENCIES.md这个文件里配置,把本次pr新添加的依赖库的license加上

for index, log := range logs.([][]byte) {
valueMap := values[index]
topic := r.Topic
if len(r.topicKeys) > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topicKeys这个定义了并未看到实际有用的逻辑?是不是可以参靠flusher pulsar和kafka v2实现下

@messixukejia
Copy link
Collaborator

image
麻烦处理下这几个问题

@wlazjr wlazjr closed this Jun 5, 2023
@wlazjr wlazjr force-pushed the main branch 2 times, most recently from 86f76a6 to 00d6e7e Compare June 5, 2023 07:03
@wlazjr wlazjr reopened this Jun 5, 2023
logger.Debug(r.context.GetRuntimeContext(), "[LogGroup] topic", logGroup.Topic, "logstore", logGroup.Category, "logcount", len(logGroup.Logs), "tags", logGroup.LogTags)
logs, values, err := r.converter.ToByteStreamWithSelectedFields(logGroup, r.topicKeys)
if err != nil {
logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush kafka convert log fail, error", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush kafka 这个错误提示message需要改下

logger.Error(r.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "flush rocketmq error, error", err)
return err
}
logger.Debug(r.context.GetRuntimeContext(), "success flush 2 rocketmq with [LogGroup] projectName", projectName, "logstoreName", logstoreName, "logcount", len(msgs), "res", res.String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to use 'to' instead of 2

@shalousun
Copy link
Collaborator

If "conversation" has been resolved, please mark it as resolved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEATURE]: iLogtail support rocketmq Flusher
5 participants