Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10105][Manager] Fix sort standalone get kafka config error #10106

Merged
merged 1 commit into from
May 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.Map;

/**
* Kafka data node operator
*/
Expand All @@ -48,6 +50,9 @@ public class KafkaDataNodeOperator extends AbstractDataNodeOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDataNodeOperator.class);

private static final String bootstrapServers = "bootstrap.servers";
private static final String clientId = "client.id";

@Autowired
private ObjectMapper objectMapper;

Expand Down Expand Up @@ -79,6 +84,15 @@ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
return kafkaDataNodeInfo;
}

@Override
public Map<String, String> parse2SinkParams(DataNodeInfo info) {
Map<String, String> params = super.parse2SinkParams(info);
KafkaDataNodeInfo kafkaDataNodeInfo = (KafkaDataNodeInfo) info;
params.put(bootstrapServers, kafkaDataNodeInfo.getBootstrapServers());
params.put(clientId, kafkaDataNodeInfo.getClientId());
return params;
}

@Override
protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) {
KafkaDataNodeRequest nodeRequest = (KafkaDataNodeRequest) request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkField;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
Expand All @@ -30,13 +31,15 @@
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

/**
* Kafka sink operator
Expand All @@ -46,6 +49,8 @@ public class KafkaSinkOperator extends AbstractSinkOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSinkOperator.class);

private static final String topic = "topic";

@Autowired
private ObjectMapper objectMapper;

Expand Down Expand Up @@ -75,6 +80,23 @@ protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntit
}
}

@Override
public Map<String, String> parse2IdParams(StreamSinkEntity streamSink, List<String> fields,
DataNodeInfo dataNodeInfo) {

Map<String, String> params = super.parse2IdParams(streamSink, fields, dataNodeInfo);

KafkaSinkDTO kafkaSinkDTO;
try {
kafkaSinkDTO = objectMapper.readValue(streamSink.getExtParams(), KafkaSinkDTO.class);
} catch (JsonProcessingException e) {
LOGGER.error("parse kafka sink dto error", e);
return params;
}
params.put(topic, kafkaSinkDTO.getTopicName());
return params;
}

@Override
public StreamSink getFromEntity(StreamSinkEntity entity) {
KafkaSink sink = new KafkaSink();
Expand Down