Skip to content

Commit

Permalink
[INLONG-10111][DataProxy] Add auditVersion field processing (#10112)
Browse files Browse the repository at this point in the history
Co-authored-by: gosonzhang <gosonzhang@tencent.com>
  • Loading branch information
gosonzhang and gosonzhang committed Apr 30, 2024
1 parent 28bfa89 commit 364bdc5
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;

import java.util.Map;

import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;

/**
* Audit utils
*/
Expand Down Expand Up @@ -72,15 +75,17 @@ public static void add(int auditID, Event event) {
if (event.getHeaders().containsKey(ConfigConstants.MSG_COUNTER_KEY)) {
msgCount = Long.parseLong(event.getHeaders().get(ConfigConstants.MSG_COUNTER_KEY));
}
AuditOperator.getInstance().add(auditID, inlongGroupId,
inlongStreamId, logTime, msgCount, event.getBody().length);
long auditVersion = getAuditVersion(headers);
AuditOperator.getInstance().add(auditID, DEFAULT_AUDIT_TAG,
inlongGroupId, inlongStreamId, logTime, msgCount, event.getBody().length, auditVersion);
} else {
String groupId = headers.get(AttributeConstants.GROUP_ID);
String streamId = headers.get(AttributeConstants.STREAM_ID);
long dataTime = NumberUtils.toLong(headers.get(AttributeConstants.DATA_TIME));
long msgCount = NumberUtils.toLong(headers.get(ConfigConstants.MSG_COUNTER_KEY));
AuditOperator.getInstance().add(auditID, groupId,
streamId, dataTime, msgCount, event.getBody().length);
long auditVersion = getAuditVersion(headers);
AuditOperator.getInstance().add(auditID, DEFAULT_AUDIT_TAG,
groupId, streamId, dataTime, msgCount, event.getBody().length, auditVersion);
}
}

Expand Down Expand Up @@ -119,6 +124,25 @@ public static long getAuditFormatTime(long msgTime) {
return msgTime - msgTime % CommonConfigHolder.getInstance().getAuditFormatInvlMs();
}

/**
* Get Audit version
*
* @param headers the message headers
*
* @return audit version
*/
public static long getAuditVersion(Map<String, String> headers) {
String strAuditVersion = headers.get(AttributeConstants.AUDIT_VERSION);
if (StringUtils.isNotBlank(strAuditVersion)) {
try {
return Long.parseLong(strAuditVersion);
} catch (Throwable ex) {
//
}
}
return -1L;
}

/**
* Send audit data
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.HttpAttrConst;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.BaseSource;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
Expand Down Expand Up @@ -335,6 +336,8 @@ private void processMessage(ChannelHandlerContext ctx, Map<String, String> reqAt
// get message count
int intMsgCnt = NumberUtils.toInt(reqAttrs.get(HttpAttrConst.KEY_MESSAGE_COUNT), 1);
String strMsgCount = String.valueOf(intMsgCnt);
// get audit version
long auditVersion = AuditUtils.getAuditVersion(reqAttrs);
// build message attributes
InLongMsg inLongMsg = InLongMsg.newInLongMsg(source.isCompressed());
strBuff.append("groupId=").append(groupId)
Expand All @@ -345,6 +348,10 @@ private void processMessage(ChannelHandlerContext ctx, Map<String, String> reqAt
.append("&rt=").append(msgRcvTime)
.append(AttributeConstants.SEPARATOR).append(AttributeConstants.MSG_RPT_TIME)
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
if (auditVersion != -1L) {
strBuff.append(AttributeConstants.SEPARATOR).append(AttributeConstants.AUDIT_VERSION)
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(auditVersion);
}
inLongMsg.addMsg(strBuff.toString(), body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
byte[] inlongMsgData = inLongMsg.buildArray();
long pkgTime = inLongMsg.getCreatetime();
Expand All @@ -365,6 +372,7 @@ private void processMessage(ChannelHandlerContext ctx, Map<String, String> reqAt
MessageWrapType.INLONG_MSG_V0.getStrId());
eventHeaders.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
eventHeaders.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(pkgTime));
eventHeaders.put(AttributeConstants.AUDIT_VERSION, String.valueOf(auditVersion));
Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
try {
source.getCachedChProcessor().processEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public abstract class AbsV0MsgCodec {
protected String msgProcType = "b2b";
protected boolean needResp = true;
protected long msgPkgTime;
protected long auditVersion = -1L;

public AbsV0MsgCodec(int totalDataLen, int msgTypeValue,
long msgRcvTime, String strRemoteIP) {
Expand Down Expand Up @@ -246,6 +247,7 @@ protected Map<String, String> buildEventHeaders(BaseSource source) {
headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
headers.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
headers.put(ConfigConstants.PKG_TIME_KEY, String.valueOf(msgPkgTime));
headers.put(AttributeConstants.AUDIT_VERSION, String.valueOf(auditVersion));
// add extra key-value information
if (!needResp) {
headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.BaseSource;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -157,6 +158,8 @@ public boolean validAndFillFields(BaseSource source, StringBuilder strBuff) {
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
attrMap.put(AttributeConstants.MSG_RPT_TIME, String.valueOf(msgRcvTime));
}
// get audit version
this.auditVersion = AuditUtils.getAuditVersion(this.attrMap);
// get trace requirement
if (this.needTraceMsg) {
if (strBuff.length() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.dataproxy.config.ConfigManager;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.BaseSource;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -188,6 +189,8 @@ public boolean validAndFillFields(BaseSource source, StringBuilder strBuff) {
attrMap.put(AttributeConstants.DATA_TIME, String.valueOf(this.dataTimeMs));
}
}
// get audit version
this.auditVersion = AuditUtils.getAuditVersion(this.attrMap);
// process sequence id
String sequenceId = attrMap.get(AttributeConstants.SEQUENCE_ID);
if (StringUtils.isNotBlank(sequenceId)) {
Expand Down

0 comments on commit 364bdc5

Please sign in to comment.