SpringBoot集成RabbitMQ

《林老师带你学编程》知识星球是由多个工作10年以上的一线大厂开发人员联合创建,希望通过我们的分享,帮助大家少走弯路,可以在技术的领域不断突破和发展。

🔥 具体的加入方式:

一、pom文件增加引入

需要引入spring-boot-starter-amqp包,为我们提供RabbitMQ相关消息处理的jar包,具体如下。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

修改完毕后,弹出maven引入依赖提示,点击Import Changes。

二、配置

修改application.yaml配置,增加rabbitMQ配置,注意yaml中的中文注释去掉,我自己启动时候发现有中文无法启动,我写注释是为了大家能够理解配置的意思。

server:
  port: 8082
  servlet:
    context-path: /hello-world-new

spring:
  cache:
    type: ehcache
    config: classpath:ehcache.xml
#  MYSql配置
#  datasource:
#    url:
#    username:
#    password:
#    driver-class-name: com.mysql.cj.jdbc.Driver
  datasource:
    driver-class-name: org.sqlite.JDBC
    url: jdbc:sqlite:/Users/XuesongBu/Documents/git_code/hello-world/hello-world.db
    username:
    password:
  #针对SQLite配置方言,MySQL不需要该配置
  jpa:
    database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialect
  #RabbitMQ服务器配置,地址账号密码,virtualhost等配置
  rabbitmq:
    addresses: 修改为自己RabbitMQ服务器地址
    username: 修改为自己的RabbitMQ账号
    password: 修改为自己的RabbitMQ密码
    virtual-host: 修改为自己的RabbitMQ的virtual-host

三、java实现Consumer消费者部分

增加RabbitMQConsumerConfig.java配置消费者连接,以及初始化queue,exchange,routingKey,也就是如果RabbitMQ服务器没有queue,exchange,routingKey,自动创建。

package com.example.demo;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitMQConsumerConfig {

    /**
     * 注入RabbitMQClinet的brokerContainerFactory
     * 如果RabbitMQ服务器没有对应的Queue,exchange,routing key,则初始化新建Queue,exchange,routing key
     * @param configurer
     * @param connectionFactory
     * @return
     */
    @Bean(name = "brokerContainerFactory")
    public SimpleRabbitListenerContainerFactory brokerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                                                       ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        // 手工ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 最小的消费者数
        factory.setConcurrentConsumers(3);
        // 消费者最大数
        factory.setMaxConcurrentConsumers(5);
        // 预拉取条数
        factory.setPrefetchCount(5);
        // 初始化并新建
        try (Connection connection = connectionFactory.createConnection();
             Channel channel = connection.createChannel(false)) {
            channel.queueDeclare("QUEUE_NAME", true, false, false, null);
            channel.exchangeDeclare("EXCHANGE_NAME", BuiltinExchangeType.DIRECT);
            channel.queueBind("QUEUE_NAME", "EXCHANGE_NAME", "ROUTING_KEY");
        } catch (Exception e) {
            log.info("Declare and bind queue error!", e);
        }
        return factory;
    }
}

新增MessageDto.java,接收消息实体。

package com.example.demo;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.io.Serializable;

@Data
public class MessageDto implements Serializable {
    @JsonProperty("content")
    private String content;
}

新增MessageConsumer.java,RabbitMQ消息消费监听类,负责消息的接收处理。

package com.example.demo;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
public class MessageConsumer {

    private static final ObjectMapper mapper = new ObjectMapper();
    /**
     * 监听消息队列,队列名称QUEUE_NAME
     * 通过brokerContainerFactory获取到对应的queue
     */
    @RabbitListener(queues = "QUEUE_NAME", containerFactory = "brokerContainerFactory")
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            log.info("Consumed message: {}", message);
            MessageDto dto = parse(new String(message.getBody()), MessageDto.class);
            log.info("Consumed message content: {}", dto);
            // 由于之前配置的手动ack,需要手动回调rabbitMQ服务器,通知已经完成消费
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch(Exception e) {
            log.error("Consumed erorr,", e);
            // 由于之前配置的手动ack,需要手动回调rabbitMQ服务器,通知出现问题
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }

    public static <T> T parse(String json, Class<T> clazz) {
        try {
            return mapper.readValue(json, clazz);
        } catch (IOException e) {
            log.error("IOException, json = {}, clazz = {}", json, clazz, e);
            try {
                return clazz.newInstance();
            } catch (InstantiationException | IllegalAccessException e1) {
                log.error("InstantiationException or IllegalAccessException, clazz = {}", clazz, e1);
                return null;
            }
        }
    }
}

四、java实现Producer生产者部分

新增RabbitMQProducerConfig.java,生产者配置类,负责创建RabbitTemplate,提供消息发送的工具类。

package com.example.demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitMQProducerConfig {

    /**
     * 配置生产者rabbitTemplate
     * 生产者只需要配置exchange和routingKey
     */
    @Bean(name = "pointRabbitTemplate")
    public RabbitTemplate pointRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("EXCHANGE_NAME");
        rabbitTemplate.setRoutingKey("ROUTING_KEY");
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}

新建MessageProducer.java,调用RabbitTemplate的convertAndSend发送方法来发送消息,注意消息发送前需要先序列化再发送。

package com.example.demo;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MessageProducer {

    private static final ObjectMapper mapper = new ObjectMapper();
    @Autowired
    private RabbitTemplate pointRabbitTemplate;

    /**
     * 发送消息
     * @param content
     */
    public void sendMessageToMQ(String content) {
        MessageDto dto = new MessageDto();
        dto.setContent(content);
        pointRabbitTemplate.convertAndSend(this.format(dto));
    }

    public static String format(Object pojo) {
        try {
            return mapper.writeValueAsString(pojo);
        } catch (JsonProcessingException e) {
            log.error("JsonProcessingException, pojo = {}", pojo, e);
            return "{}";
        }
    }
}

新建RabbitMQController.java,负责提供rest接口,以方便测试消息发送。

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RabbitMQController {
    @Autowired
    private MessageProducer messageProducer;

    @PostMapping("/sendMQMessage")
    public String sendMQMessage(@RequestParam String content) {
        messageProducer.sendMessageToMQ(content);
        return "ok";
    }
}

五、Postman验证

启动服务后,首先通过postman的post方式调用http://localhost:8082/hello-world-new/sendMQMessage接口,来发送消息

查看后台日志显示,消息Consumer消费者输出了日志,说明测试成功。

六、总结

以上就是咱们常用的SpringBoot项目集成RabbitMQ的方法,可以说不复杂,按照我上面的配置即可实现。

原文地址:https://zhuanlan.zhihu.com/p/583835016

滚动至顶部