一分钟在SpringBoot项目中使用EMQ
- 游戏开发
- 2025-08-05 03:24:02

先展示最终的结果:
生产者端:
@RestController @RequiredArgsConstructor public class TestController { private final MqttProducer mqttProducer; @GetMapping("/test") public String test() { User build = User.builder().age(100).sex(1).address("世界潍坊渤海之眼").build(); // 延时发布 mqttProducer.send("$delayed/10/cookie", 2, JSON.toJSONString(build)); return "ok"; } }消费者端
/** * @author : Cookie * date : 2024/1/30 */ @Component @Topic("cookie") public class TestTopicHandler implements MsgHandler { @Override public void process(String jsonMsg) { User user = JSON.parseObject(jsonMsg, User.class); System.out.println(user); } }控制台结果:
具体解释在之前的笔记中, 需要的话可以查看 EMQ的介绍及整合SpringBoot的使用-CSDN博客
OK, 下面我们就开始实现上面的逻辑, 你要做的就是把 1-9 复制到项目即可
1. 依赖导入 <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> 2. yml 配置 # 顶格 mqtt: client: username: admin password: public serverURI: tcp://192.168.200.128:1883 clientId: monitor.task.${random.int[10000,99999]} # 注意: emq的客户端id 不能重复 keepAliveInterval: 10 #连接保持检查周期 秒 connectionTimeout: 30 #连接超时时间 秒 producer: defaultQos: 2 defaultRetained: false defaultTopic: topic/test1 consumer: consumerTopics: $queue/cookie/#, $share/group1/yfs1024 #不带群组的共享订阅 多个主题逗号隔开 # $queue/cookie/# # 以$queue开头,不带群组的共享订阅 多个客户端只能有一个消费者消费 # $share/group1/yfs1024 # 以$share开头,群组的共享订阅 多个客户端订阅 # 如果在一个组 只能有一个消费者消费 # 如果不在一个组 都可以消费 3. 属性配置 @Data @Configuration @ConfigurationProperties(prefix = "mqtt.client") public class MqttConfigProperties { private int defaultProducerQos; private boolean defaultRetained; private String defaultTopic; private String username; private String password; private String serverURI; private String clientId; private int keepAliveInterval; private int connectionTimeout; } 4. 定义主题注解 @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface Topic { String value(); } 5.Mqtt配置类 @Data @Slf4j @Configuration @RequiredArgsConstructor public class MqttConfig { private final MqttConfigProperties configProperties; private final MqttCallback mqttCallback; @Bean public MqttClient mqttClient() { try { MqttClient client = new MqttClient(configProperties.getServerURI(), configProperties.getClientId(), mqttClientPersistence()); client.setManualAcks(true); //设置手动消息接收确认 mqttCallback.setMqttClient(client); client.setCallback(mqttCallback); client.connect(mqttConnectOptions()); return client; } catch (MqttException e) { log.error("emq connect error", e); return null; } } @Bean public MqttConnectOptions mqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(configProperties.getUsername()); options.setPassword(configProperties.getPassword().toCharArray()); options.setAutomaticReconnect(true);//是否自动重新连接 options.setCleanSession(true);//是否清除之前的连接信息 options.setConnectionTimeout(configProperties.getConnectionTimeout());//连接超时时间 options.setKeepAliveInterval(configProperties.getKeepAliveInterval());//心跳 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);//设置mqtt版本 return options; } public MqttClientPersistence mqttClientPersistence() { return new MemoryPersistence(); } } 6. 定义消息处理接口 /** * 消息处理接口 */ public interface MsgHandler { void process(String jsonMsg) throws IOException; } 7.定义消息上下文 /** * 消息处理上下文, 通过主题拿到topic */ public interface MsgHandlerContext{ MsgHandler getMsgHandler(String topic); } 8. 定义回调类 @Component @Slf4j public class MqttCallback implements MqttCallbackExtended { // 需要订阅的topic配置 @Value("${mqtt.consumer.consumerTopics}") private List<String> consumerTopics; @Autowired private MsgHandlerContext msgHandlerContext; @Override public void connectionLost(Throwable throwable) { log.error("emq error.", throwable); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("topic:" + topic + " message:" + new String(message.getPayload())); //处理消息 String msgContent = new String(message.getPayload()); log.info("接收到消息:" + msgContent); try { // 根据主题名称 获取 该主题对应的处理器对象 // 多态 父类的引用指向子类的对象 MsgHandler msgHandler = msgHandlerContext.getMsgHandler(topic); if (msgHandler == null) { return; } msgHandler.process(msgContent); //执行 } catch (IOException e) { log.error("process msg error,msg is: " + msgContent, e); } //处理成功后确认消息 mqttClient.messageArrivedComplete(message.getId(), message.getQos()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete()); } @Override public void connectComplete(boolean b, String s) { log.info("连接成功"); //和EMQ连接成功后根据配置自动订阅topic if (consumerTopics != null && consumerTopics.size() > 0) { // 循环遍历当前项目中配置的所有的主题. consumerTopics.forEach(t -> { try { log.info(">>>>>>>>>>>>>>subscribe topic:" + t); // 订阅当前集群中所有的主题 消息服务质量 2 -> 至少收到一个 mqttClient.subscribe(t, 2); } catch (MqttException e) { log.error("emq connect error", e); } }); } } private MqttClient mqttClient; // 在配置类中调用传入连接 public void setMqttClient(MqttClient mqttClient) { this.mqttClient = mqttClient; } } 8. 消息处理类加载器作用: 将Topic跟处理类对应 通过 handlerMap
/** * 消息处理类加载器 * 作用: * 1. 因为实现了Spring 的 ApplicationContextAware 接口, 项目启动后就会运行实现的方法 * 2. 获取MsgHandler接口的所有的实现类 * 3. 将实现类上的Topic注解的值,作为handlerMap的键,实现类(处理器)作为对应的值 */ @Component public class MsgHandlerContextImp implements ApplicationContextAware, MsgHandlerContext { private final Map<String, MsgHandler> handlerMap = new HashMap<>(); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 从spring容器中获取 <所有> 实现了MsgHandler接口的对象 // key 默认类名首字母小写 value 当前对象 Map<String, MsgHandler> map = applicationContext.getBeansOfType(MsgHandler.class); map.values().forEach(obj -> { // 通过反射拿到注解中的值 即 当前类处理的 topic String topic = obj.getClass().getAnnotation(Topic.class).value(); // 将主题和当前主题的处理类建立映射 handlerMap.put(topic,obj); }); } @Override public MsgHandler getMsgHandler(String topic) { return handlerMap.get(topic); } } 9. 封装消息生产者 @Slf4j @Component public class MqttProducer { // @Value() 读取配置 当然也可以批量读取配置,这里就一个一个了 @Value("${mqtt.producer.defaultQos}") private int defaultProducerQos; @Value("${mqtt.producer.defaultRetained}") private boolean defaultRetained; @Value("${mqtt.producer.defaultTopic}") private String defaultTopic; @Autowired private MqttClient mqttClient; public void send(String payload) { this.send(defaultTopic, payload); } public void send(String topic, String payload) { this.send(topic, defaultProducerQos, payload); } public void send(String topic, int qos, String payload) { this.send(topic, qos, defaultRetained, payload); } public void send(String topic, int qos, boolean retained, String payload) { try { mqttClient.publish(topic, payload.getBytes(), qos, retained); } catch (MqttException e) { log.error("publish msg error.",e); } } public <T> void send(String topic, int qos, T msg) throws JsonProcessingException { String payload = JsonUtil.serialize(msg); this.send(topic,qos,payload); } } 最终的实现的结果 生产者端: 在需要发送消息的地方注入 MqttProducer 发送消息消费者端: 在需要处理对应主题的类上 实现 MsgHandler接口 代码示例 生产者端 @RestController @RequiredArgsConstructor public class TestController { private final MqttProducer mqttProducer; @GetMapping("/test") public String test() { User build = User.builder().age(100).sex(1).address("世界潍坊渤海之眼").build(); // 延时发布 mqttProducer.send("$delayed/10/cookie", 2, JSON.toJSONString(build)); return "ok"; } } 消费者端 @Component @Topic("cookie") public class TestTopicHandler implements MsgHandler { @Override public void process(String jsonMsg) { User user = JSON.parseObject(jsonMsg, User.class); System.out.println(user); } }控制台结果展示:
补充JsonUtil
public class JsonUtil { /** * 从json字符串中根据nodeName获取值 * @param nodeName * @param json * @return * @throws IOException */ public static String getValueByNodeName(String nodeName, String json) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); JsonNode jsonNode = objectMapper.readTree(json); JsonNode node = jsonNode.findPath(nodeName); if(node == null) return null; return node.asText(); } /** * 根据nodeName获取节点内容 * @param nodeName * @param json * @return * @throws IOException */ public static JsonNode getNodeByName(String nodeName, String json) throws IOException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readTree(json).findPath(nodeName); } /** * 反序列化 * @param json * @param clazz * @param <T> * @return * @throws IOException */ public static <T> T getByJson(String json, Class<T> clazz) throws IOException { ObjectMapper mapper = new ObjectMapper(); // 在反序列化时忽略在 json 中存在但 Java 对象不存在的属性 mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // 在序列化时日期格式默认为 yyyy-MM-dd'T'HH:mm:ss.SSSZ mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); return mapper.readValue(json, clazz); } /** * 反序列化(驼峰转换) * @param json * @param clazz * @param <T> * @return * @throws IOException */ public static <T> T getByJsonSNAKE(String json, Class<T> clazz) throws IOException { ObjectMapper mapper = new ObjectMapper(); // 在反序列化时忽略在 json 中存在但 Java 对象不存在的属性 mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // 在序列化时日期格式默认为 yyyy-MM-dd'T'HH:mm:ss.SSSZ mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); // 设置驼峰和下划线之间的映射 mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); return mapper.readValue(json, clazz); } /** * 序列化 * @param object * @return * @throws JsonProcessingException */ public static String serialize(Object object) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); return mapper.writeValueAsString(object); } /** * 序列化(驼峰转换) * @param object * @return * @throws JsonProcessingException */ public static String serializeSNAKE(Object object) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); // 设置驼峰和下划线之间的映射 mapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); return mapper.writeValueAsString(object); } public static JsonNode getTreeNode(String json) throws JsonProcessingException { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.readTree(json); } /** * 将对象转map * @param obj * @return * @throws IOException */ public static Map<String,Object> convertToMap(Object obj) throws IOException { ObjectMapper mapper = new ObjectMapper(); return mapper.readValue(serialize(obj),Map.class); } }一分钟在SpringBoot项目中使用EMQ由讯客互联游戏开发栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“一分钟在SpringBoot项目中使用EMQ”