PushCallback.java 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package com.wsm.admin.device.mqtt;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.wsm.admin.constant.MsgCode;
  5. import com.wsm.admin.model.Device;
  6. import com.wsm.admin.model.DeviceEvent;
  7. import com.wsm.admin.service.IDeviceEventService;
  8. import com.wsm.admin.service.IDeviceService;
  9. import com.wsm.admin.service.impl.DeviceEventServiceImpl;
  10. import com.wsm.admin.service.impl.DeviceServiceImpl;
  11. import com.wsm.common.util.DateTimeUtils;
  12. import com.wsm.common.util.RedisUtil;
  13. import com.wsm.common.util.SpringContext;
  14. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  15. import org.eclipse.paho.client.mqttv3.MqttCallback;
  16. import org.eclipse.paho.client.mqttv3.MqttMessage;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.messaging.simp.SimpMessagingTemplate;
  20. import java.util.Date;
  21. public class PushCallback implements MqttCallback {
  22. private static final Logger log = LoggerFactory.getLogger(PushCallback.class);
  23. private IDeviceService deviceService;
  24. private IDeviceEventService deviceEventService;
  25. private SimpMessagingTemplate messagingTemplate;
  26. private RedisUtil redisUtil;
  27. /**
  28. * 火灾报警
  29. * alarm_type : 1
  30. */
  31. private static String TYPE_FIRE = "1";
  32. /**
  33. * 故障
  34. * alarm_type : 2
  35. */
  36. private static String TYPE_FAULT = "2";
  37. public PushCallback(){
  38. if (deviceService == null){
  39. deviceService = SpringContext.getBean(DeviceServiceImpl.class);
  40. }
  41. if (deviceEventService == null){
  42. deviceEventService = SpringContext.getBean(DeviceEventServiceImpl.class);
  43. }
  44. if (messagingTemplate == null){
  45. messagingTemplate = SpringContext.getBean(SimpMessagingTemplate.class);
  46. }
  47. if (redisUtil == null){
  48. redisUtil = SpringContext.getBean(RedisUtil.class);
  49. }
  50. }
  51. @Override
  52. public void connectionLost(Throwable cause) {
  53. // 连接丢失后,一般在这里面进行重连
  54. log.info("连接断开,需要从连重连");
  55. ClientMQTT clientMqtt = new ClientMQTT();
  56. clientMqtt.start();
  57. log.info("重新启动MQTT成功");
  58. }
  59. @Override
  60. public void deliveryComplete(IMqttDeliveryToken token) {
  61. log.warn("deliveryComplete: {}", token.isComplete());
  62. }
  63. @Override
  64. public void messageArrived(String topic, MqttMessage message) throws Exception {
  65. // subscribe后得到的消息会执行到这里面
  66. log.warn("接收消息主题 : {}", topic);
  67. log.warn("接收消息Qos : {}", message.getQos());
  68. corventJson(new String(message.getPayload()), topic);
  69. }
  70. /**
  71. * 解析接收到的内容
  72. */
  73. private void corventJson(String str, String topic){
  74. // String str = "{\"devid\":\"3jyun-866971030771930\",\"pid\":\"H388N\",\"pname\":\"广东潮庭集团-珠海移动白蕉项目1\",\"cid\":34,\"aid\":1,\"a_name\":\"\",\"bid\":2,\"b_name\":\"\",\"lid\":5,\"l_name\":\"北京天安门\",\"time\":\"2019-12-05 14:19:54\",\"alarm_type\":1,\"alarm_type_name\":\"烟感传感器火警\",\"event_id\":32,\"event_count\":1,\"device_type\":1,\"comm_type\":2,\"first_alarm_time\":\"2019-12-05 14:19:54\",\"last_alarm_time\":\"2019-12-05 14:19:54\"}";
  75. JSONObject orgin = JSON.parseObject(str);
  76. log.info("orgin data: {}", orgin);
  77. // 火灾报警、设备故障
  78. String devid = orgin.getString("devid");
  79. String time = orgin.getString("time");
  80. Date postTime = DateTimeUtils.parse(time);
  81. log.warn("上报时间: {}", time);
  82. // 用来心跳检测
  83. if (TopicCode.TOPIC_DEV_MSG.equals(topic)) {
  84. redisUtil.set(devid,"heartbeat", Long.parseLong("25"));
  85. return;
  86. }
  87. // 火警报警
  88. String alarmTypeName = orgin.getString("alarm_type_name");
  89. String alarmType = orgin.getString("alarm_type");
  90. log.info("devid : {}", devid);
  91. log.info("time : {}", time);
  92. log.info("alarmType : {}", alarmType);
  93. log.info("alarmTypeName : {}", alarmTypeName);
  94. Device device = deviceService.findByDeviceId(devid);
  95. if (device != null && alarmType.equals(TYPE_FIRE)) {
  96. // 报警
  97. device.setStatus((byte)1);
  98. device.setUpdateTime(new Date());
  99. deviceService.update(device);
  100. // 删除缓存
  101. redisUtil.delete(MsgCode.REDIS_CYCLE_DEVICE_LIST);
  102. // 记录报警详情
  103. DeviceEvent event = new DeviceEvent();
  104. // DeviceEvent event = deviceEventService.findByDeviceIdTop(device.getId());
  105. // if (event == null) {
  106. event = new DeviceEvent();
  107. event.setDevice(device);
  108. event.setCreateTime(new Date());
  109. // }
  110. event.setHandleStatus((byte)0);
  111. event.setContent(alarmTypeName);
  112. event.setUpdateTime(new Date());
  113. event.setPostTime(postTime);
  114. deviceEventService.save(event);
  115. log.warn(device + ": " + alarmTypeName);
  116. JSONObject result = new JSONObject();
  117. result.put("id", device.getDeviceId());
  118. result.put("deviceType", device.getDeviceType());
  119. result.put("status", alarmTypeName);
  120. //推送信息到前端
  121. messagingTemplate.convertAndSend("/topic/device", result.toJSONString());
  122. }
  123. }
  124. }