123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package com.wsm.admin.device.mqtt;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.wsm.admin.constant.MsgCode;
- import com.wsm.admin.model.Device;
- import com.wsm.admin.model.DeviceEvent;
- import com.wsm.admin.service.IDeviceEventService;
- import com.wsm.admin.service.IDeviceService;
- import com.wsm.admin.service.impl.DeviceEventServiceImpl;
- import com.wsm.admin.service.impl.DeviceServiceImpl;
- import com.wsm.common.util.DateTimeUtils;
- import com.wsm.common.util.RedisUtil;
- import com.wsm.common.util.SpringContext;
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.messaging.simp.SimpMessagingTemplate;
- import java.util.Date;
- public class PushCallback implements MqttCallback {
- private static final Logger log = LoggerFactory.getLogger(PushCallback.class);
- private IDeviceService deviceService;
- private IDeviceEventService deviceEventService;
- private SimpMessagingTemplate messagingTemplate;
- private RedisUtil redisUtil;
- /**
- * 火灾报警
- * alarm_type : 1
- */
- private static String TYPE_FIRE = "1";
- /**
- * 故障
- * alarm_type : 2
- */
- private static String TYPE_FAULT = "2";
- public PushCallback(){
- if (deviceService == null){
- deviceService = SpringContext.getBean(DeviceServiceImpl.class);
- }
- if (deviceEventService == null){
- deviceEventService = SpringContext.getBean(DeviceEventServiceImpl.class);
- }
- if (messagingTemplate == null){
- messagingTemplate = SpringContext.getBean(SimpMessagingTemplate.class);
- }
- if (redisUtil == null){
- redisUtil = SpringContext.getBean(RedisUtil.class);
- }
- }
- @Override
- public void connectionLost(Throwable cause) {
- // 连接丢失后,一般在这里面进行重连
- log.info("连接断开,需要从连重连");
- ClientMQTT clientMqtt = new ClientMQTT();
- clientMqtt.start();
- log.info("重新启动MQTT成功");
- }
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- log.warn("deliveryComplete: {}", token.isComplete());
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // subscribe后得到的消息会执行到这里面
- log.warn("接收消息主题 : {}", topic);
- log.warn("接收消息Qos : {}", message.getQos());
- corventJson(new String(message.getPayload()), topic);
- }
- /**
- * 解析接收到的内容
- */
- private void corventJson(String str, String topic){
- // String str = "{\"devid\":\"3jyun-866971030771930\",\"pid\":\"H388N\",\"pname\":\"广东潮庭集团-珠海移动白蕉项目1\",\"cid\":34,\"aid\":1,\"a_name\":\"\",\"bid\":2,\"b_name\":\"\",\"lid\":5,\"l_name\":\"北京天安门\",\"time\":\"2019-12-05 14:19:54\",\"alarm_type\":1,\"alarm_type_name\":\"烟感传感器火警\",\"event_id\":32,\"event_count\":1,\"device_type\":1,\"comm_type\":2,\"first_alarm_time\":\"2019-12-05 14:19:54\",\"last_alarm_time\":\"2019-12-05 14:19:54\"}";
- JSONObject orgin = JSON.parseObject(str);
- log.info("orgin data: {}", orgin);
- // 火灾报警、设备故障
- String devid = orgin.getString("devid");
- String time = orgin.getString("time");
- Date postTime = DateTimeUtils.parse(time);
- log.warn("上报时间: {}", time);
- // 用来心跳检测
- if (TopicCode.TOPIC_DEV_MSG.equals(topic)) {
- redisUtil.set(devid,"heartbeat", Long.parseLong("25"));
- return;
- }
- // 火警报警
- String alarmTypeName = orgin.getString("alarm_type_name");
- String alarmType = orgin.getString("alarm_type");
- log.info("devid : {}", devid);
- log.info("time : {}", time);
- log.info("alarmType : {}", alarmType);
- log.info("alarmTypeName : {}", alarmTypeName);
- Device device = deviceService.findByDeviceId(devid);
- if (device != null && alarmType.equals(TYPE_FIRE)) {
- // 报警
- device.setStatus((byte)1);
- device.setUpdateTime(new Date());
- deviceService.update(device);
- // 删除缓存
- redisUtil.delete(MsgCode.REDIS_CYCLE_DEVICE_LIST);
- // 记录报警详情
- DeviceEvent event = new DeviceEvent();
- // DeviceEvent event = deviceEventService.findByDeviceIdTop(device.getId());
- // if (event == null) {
- event = new DeviceEvent();
- event.setDevice(device);
- event.setCreateTime(new Date());
- // }
- event.setHandleStatus((byte)0);
- event.setContent(alarmTypeName);
- event.setUpdateTime(new Date());
- event.setPostTime(postTime);
- deviceEventService.save(event);
- log.warn(device + ": " + alarmTypeName);
- JSONObject result = new JSONObject();
- result.put("id", device.getDeviceId());
- result.put("deviceType", device.getDeviceType());
- result.put("status", alarmTypeName);
- //推送信息到前端
- messagingTemplate.convertAndSend("/topic/device", result.toJSONString());
- }
- }
- }
|