- 直接访问链接:https://t.zsxq.com/14F2uGap7
- 微信扫码下图:
一、pom文件增加引入
需要引入spring-boot-starter-amqp包,为我们提供RabbitMQ相关消息处理的jar包,具体如下。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
修改完毕后,弹出maven引入依赖提示,点击Import Changes。
二、配置
修改application.yaml配置,增加rabbitMQ配置,注意yaml中的中文注释去掉,我自己启动时候发现有中文无法启动,我写注释是为了大家能够理解配置的意思。
server:
port: 8082
servlet:
context-path: /hello-world-new
spring:
cache:
type: ehcache
config: classpath:ehcache.xml
# MYSql配置
# datasource:
# url:
# username:
# password:
# driver-class-name: com.mysql.cj.jdbc.Driver
datasource:
driver-class-name: org.sqlite.JDBC
url: jdbc:sqlite:/Users/XuesongBu/Documents/git_code/hello-world/hello-world.db
username:
password:
#针对SQLite配置方言,MySQL不需要该配置
jpa:
database-platform: com.enigmabridge.hibernate.dialect.SQLiteDialect
#RabbitMQ服务器配置,地址账号密码,virtualhost等配置
rabbitmq:
addresses: 修改为自己RabbitMQ服务器地址
username: 修改为自己的RabbitMQ账号
password: 修改为自己的RabbitMQ密码
virtual-host: 修改为自己的RabbitMQ的virtual-host
三、java实现Consumer消费者部分
增加RabbitMQConsumerConfig.java配置消费者连接,以及初始化queue,exchange,routingKey,也就是如果RabbitMQ服务器没有queue,exchange,routingKey,自动创建。
package com.example.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitMQConsumerConfig {
/**
* 注入RabbitMQClinet的brokerContainerFactory
* 如果RabbitMQ服务器没有对应的Queue,exchange,routing key,则初始化新建Queue,exchange,routing key
* @param configurer
* @param connectionFactory
* @return
*/
@Bean(name = "brokerContainerFactory")
public SimpleRabbitListenerContainerFactory brokerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
// 手工ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 最小的消费者数
factory.setConcurrentConsumers(3);
// 消费者最大数
factory.setMaxConcurrentConsumers(5);
// 预拉取条数
factory.setPrefetchCount(5);
// 初始化并新建
try (Connection connection = connectionFactory.createConnection();
Channel channel = connection.createChannel(false)) {
channel.queueDeclare("QUEUE_NAME", true, false, false, null);
channel.exchangeDeclare("EXCHANGE_NAME", BuiltinExchangeType.DIRECT);
channel.queueBind("QUEUE_NAME", "EXCHANGE_NAME", "ROUTING_KEY");
} catch (Exception e) {
log.info("Declare and bind queue error!", e);
}
return factory;
}
}
新增MessageDto.java,接收消息实体。
package com.example.demo;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
import java.io.Serializable;
@Data
public class MessageDto implements Serializable {
@JsonProperty("content")
private String content;
}
新增MessageConsumer.java,RabbitMQ消息消费监听类,负责消息的接收处理。
package com.example.demo;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
public class MessageConsumer {
private static final ObjectMapper mapper = new ObjectMapper();
/**
* 监听消息队列,队列名称QUEUE_NAME
* 通过brokerContainerFactory获取到对应的queue
*/
@RabbitListener(queues = "QUEUE_NAME", containerFactory = "brokerContainerFactory")
public void onMessage(Message message, Channel channel) throws Exception {
try {
log.info("Consumed message: {}", message);
MessageDto dto = parse(new String(message.getBody()), MessageDto.class);
log.info("Consumed message content: {}", dto);
// 由于之前配置的手动ack,需要手动回调rabbitMQ服务器,通知已经完成消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch(Exception e) {
log.error("Consumed erorr,", e);
// 由于之前配置的手动ack,需要手动回调rabbitMQ服务器,通知出现问题
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
public static <T> T parse(String json, Class<T> clazz) {
try {
return mapper.readValue(json, clazz);
} catch (IOException e) {
log.error("IOException, json = {}, clazz = {}", json, clazz, e);
try {
return clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e1) {
log.error("InstantiationException or IllegalAccessException, clazz = {}", clazz, e1);
return null;
}
}
}
}
四、java实现Producer生产者部分
新增RabbitMQProducerConfig.java,生产者配置类,负责创建RabbitTemplate,提供消息发送的工具类。
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitMQProducerConfig {
/**
* 配置生产者rabbitTemplate
* 生产者只需要配置exchange和routingKey
*/
@Bean(name = "pointRabbitTemplate")
public RabbitTemplate pointRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("EXCHANGE_NAME");
rabbitTemplate.setRoutingKey("ROUTING_KEY");
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
}
新建MessageProducer.java,调用RabbitTemplate的convertAndSend发送方法来发送消息,注意消息发送前需要先序列化再发送。
package com.example.demo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MessageProducer {
private static final ObjectMapper mapper = new ObjectMapper();
@Autowired
private RabbitTemplate pointRabbitTemplate;
/**
* 发送消息
* @param content
*/
public void sendMessageToMQ(String content) {
MessageDto dto = new MessageDto();
dto.setContent(content);
pointRabbitTemplate.convertAndSend(this.format(dto));
}
public static String format(Object pojo) {
try {
return mapper.writeValueAsString(pojo);
} catch (JsonProcessingException e) {
log.error("JsonProcessingException, pojo = {}", pojo, e);
return "{}";
}
}
}
新建RabbitMQController.java,负责提供rest接口,以方便测试消息发送。
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RabbitMQController {
@Autowired
private MessageProducer messageProducer;
@PostMapping("/sendMQMessage")
public String sendMQMessage(@RequestParam String content) {
messageProducer.sendMessageToMQ(content);
return "ok";
}
}
五、Postman验证
启动服务后,首先通过postman的post方式调用http://localhost:8082/hello-world-new/sendMQMessage接口,来发送消息
查看后台日志显示,消息Consumer消费者输出了日志,说明测试成功。
六、总结
以上就是咱们常用的SpringBoot项目集成RabbitMQ的方法,可以说不复杂,按照我上面的配置即可实现。
原文地址:https://zhuanlan.zhihu.com/p/583835016