Skip to content

对rocketmq进行封装,兼容阿里云的ons,支持spring-boot,通过在方法上加上注解声明式消费消息。

License

Notifications You must be signed in to change notification settings

Kestrong/rocketmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

11 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Overview

Introduce

Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.

Quick Start

add the dependency below to your project

<dependency>
    <groupId>com.xjbg</groupId>
    <artifactId>rocketmq-sdk</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

you can choose rocketmq or aliyun ons,simple examples:

    private MqProducerProperties mqProducerProperties() {
        MqProducerProperties properties = new MqProducerProperties();
        properties.setProducerGroup(MqConstants.DEFAULT_PRODUCER_GROUP);
        properties.setNamesrvAddr("localhost:9876");
        return properties;
    }

    private OnsProducerProperties onsProducerProperties() {
        OnsProducerProperties properties = new OnsProducerProperties();
        properties.setProducerGroup(MqConstants.DEFAULT_PRODUCER_GROUP);
        properties.setOnsProperties(onsProperties());
        return properties;
    }

    private OnsProperties onsProperties() {
        OnsProperties onsProperties = new OnsProperties();
        onsProperties.setAccessKey("accessKey");
        onsProperties.setSecretKey("secretKey");
        onsProperties.setOnsAddr("http://onsaddr-internet.aliyun.com:8080/rocketmq/nsaddr4client-internal");
        return onsProperties;
    }

    private MqConsumerProperties mqConsumerProperties() {
        MqConsumerProperties properties = new MqConsumerProperties();
        properties.setNamesrvAddr("localhost:9876");
        properties.setConsumerGroup(MqConstants.DEFAULT_CONSUMER_GROUP);
        return properties;
    }

    private OnsConsumerProperties onsConsumerProperties() {
        OnsConsumerProperties properties = new OnsConsumerProperties();
        properties.setConsumerGroup(MqConstants.DEFAULT_CONSUMER_GROUP);
        properties.setOnsProperties(onsProperties());
        return properties;
    }

    @Test
    public void testMqProducer() throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = MqFactory.createProducer(mqProducerProperties());
        producer.start();
//        if (autoCreateTopic) {//for test or demo
//            producer.createTopic(producer.getCreateTopicKey(), testTopic, 4);
//        }
        producer.send(MessageBuilder.newBuilder().topic(testTopic).body("1").getRocketMQMessage(), 15000);
        producer.shutdown();
    }

    @Test
    public void testMqConsumer() throws MQClientException, InterruptedException {
        DefaultMQPushConsumer pushConsumer = MqFactory.createPushConsumer(mqConsumerProperties(), (MessageListenerConcurrently) (msgs, context) -> {
            msgs.forEach(x -> {
                System.out.println(new String(x.getBody()));
            });
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        pushConsumer.subscribe(testTopic, "*");
        pushConsumer.start();
        Thread.sleep(20_000);
        pushConsumer.shutdown();
    }

    /**
     * I'm so poor that I could not buy a ons for test
     * so you should test it yourself and tell me if there are some bugs exist
     */
    @Test
    public void testManagerApi() {
        OnsProperties onsProperties = onsProperties();
        IAcsClient iAcsClient = OnsManagerApi.acsClient(onsProperties);
        IClientProfile profile = OnsManagerApi.getProfile(onsProperties);
        String onsRegionId = OnsManagerApi.getONSRegionId(onsProperties);
        OnsManagerApi.checkConsumer(iAcsClient, "", "");
    }

    @Test
    public void testOnsProducer() {
        Producer producer = ONSFactory.createProducer(onsProducerProperties().properties());
        producer.start();
        producer.send(MessageBuilder.newBuilder().topic(testTopic).body("1").getOnsMessage());
        producer.shutdown();
    }

    @Test
    public void testOnsConsumer() throws InterruptedException {
        Consumer consumer = ONSFactory.createConsumer(onsConsumerProperties().properties());
        consumer.subscribe(testTopic, "*", (message, context) -> {
            System.out.println(new String(message.getBody()));
            return Action.CommitMessage;
        });
        consumer.start();
        Thread.sleep(20_000);
        consumer.shutdown();
    }

SpringBoot

if you are using springboot then you can make it easier.

dependency

<dependency>
    <groupId>com.xjbg</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

application.yml

rocketmq:
  profile: local
  product: rocketmq
  producer:
    namesrvAddr: localhost:9876
  consumer:
    namesrvAddr: localhost:9876
  ons:
    accessKey: accessKey
    secretKey: secretKey
    onsAddr: http://onsaddr-internet.aliyun.com:8080/rocketmq/nsaddr4client-internal

spring:
  profiles:
    active: local

use the annotation @Consumer to annotate a method of a spring bean,then a consumer will be created automatically and the message received will be processed by this method

    private static final String testTopic = "message-center-topic";
   
    @Autowired(required = false)
    private DefaultMQProducer mqProducer;
    
    @Autowired(required = false)
    private ProducerBean producerBean;  
    
    @Consumer(consumerGroup = "test-group", topic = testTopic, consumerType = ConsumerType.PUSH)
    public void test(String a) {
        System.out.println(a);
    
    
    @Test
    public void run() throws Exception {
        if (mqProducer != null) {
            mqProducer.send(MessageBuilder.newProfileBuilder().topic(testTopic).body(1).getRocketMQMessage());
        }
        if (producerBean != null) {
            producerBean.send(MessageBuilder.newProfileBuilder().topic(testTopic).body(1).getOnsMessage());
        }
        Thread.sleep(30_000);
    }

Official Examples

Learn More

About

对rocketmq进行封装,兼容阿里云的ons,支持spring-boot,通过在方法上加上注解声明式消费消息。

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages