package com.wsm.admin.device.mqtt; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.wsm.admin.constant.MsgCode; import com.wsm.admin.model.Device; import com.wsm.admin.model.DeviceEvent; import com.wsm.admin.service.IDeviceEventService; import com.wsm.admin.service.IDeviceService; import com.wsm.admin.service.impl.DeviceEventServiceImpl; import com.wsm.admin.service.impl.DeviceServiceImpl; import com.wsm.common.util.DateTimeUtils; import com.wsm.common.util.RedisUtil; import com.wsm.common.util.SpringContext; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.simp.SimpMessagingTemplate; import java.util.Date; public class PushCallback implements MqttCallback { private static final Logger log = LoggerFactory.getLogger(PushCallback.class); private IDeviceService deviceService; private IDeviceEventService deviceEventService; private SimpMessagingTemplate messagingTemplate; private RedisUtil redisUtil; /** * 火灾报警 * alarm_type : 1 */ private static String TYPE_FIRE = "1"; /** * 故障 * alarm_type : 2 */ private static String TYPE_FAULT = "2"; public PushCallback(){ if (deviceService == null){ deviceService = SpringContext.getBean(DeviceServiceImpl.class); } if (deviceEventService == null){ deviceEventService = SpringContext.getBean(DeviceEventServiceImpl.class); } if (messagingTemplate == null){ messagingTemplate = SpringContext.getBean(SimpMessagingTemplate.class); } if (redisUtil == null){ redisUtil = SpringContext.getBean(RedisUtil.class); } } @Override public void connectionLost(Throwable cause) { // 连接丢失后,一般在这里面进行重连 log.info("连接断开,需要从连重连"); ClientMQTT clientMqtt = new ClientMQTT(); clientMqtt.start(); log.info("重新启动MQTT成功"); } @Override public void deliveryComplete(IMqttDeliveryToken token) { log.warn("deliveryComplete: {}", token.isComplete()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe后得到的消息会执行到这里面 log.warn("接收消息主题 : {}", topic); log.warn("接收消息Qos : {}", message.getQos()); corventJson(new String(message.getPayload()), topic); } /** * 解析接收到的内容 */ private void corventJson(String str, String topic){ // String str = "{\"devid\":\"3jyun-866971030771930\",\"pid\":\"H388N\",\"pname\":\"广东潮庭集团-珠海移动白蕉项目1\",\"cid\":34,\"aid\":1,\"a_name\":\"\",\"bid\":2,\"b_name\":\"\",\"lid\":5,\"l_name\":\"北京天安门\",\"time\":\"2019-12-05 14:19:54\",\"alarm_type\":1,\"alarm_type_name\":\"烟感传感器火警\",\"event_id\":32,\"event_count\":1,\"device_type\":1,\"comm_type\":2,\"first_alarm_time\":\"2019-12-05 14:19:54\",\"last_alarm_time\":\"2019-12-05 14:19:54\"}"; JSONObject orgin = JSON.parseObject(str); log.info("orgin data: {}", orgin); // 火灾报警、设备故障 String devid = orgin.getString("devid"); String time = orgin.getString("time"); Date postTime = DateTimeUtils.parse(time); log.warn("上报时间: {}", time); // 用来心跳检测 if (TopicCode.TOPIC_DEV_MSG.equals(topic)) { redisUtil.set(devid,"heartbeat", Long.parseLong("25")); return; } // 火警报警 String alarmTypeName = orgin.getString("alarm_type_name"); String alarmType = orgin.getString("alarm_type"); log.info("devid : {}", devid); log.info("time : {}", time); log.info("alarmType : {}", alarmType); log.info("alarmTypeName : {}", alarmTypeName); Device device = deviceService.findByDeviceId(devid); if (device != null && alarmType.equals(TYPE_FIRE)) { // 报警 device.setStatus((byte)1); device.setUpdateTime(new Date()); deviceService.update(device); // 删除缓存 redisUtil.delete(MsgCode.REDIS_CYCLE_DEVICE_LIST); // 记录报警详情 DeviceEvent event = new DeviceEvent(); // DeviceEvent event = deviceEventService.findByDeviceIdTop(device.getId()); // if (event == null) { event = new DeviceEvent(); event.setDevice(device); event.setCreateTime(new Date()); // } event.setHandleStatus((byte)0); event.setContent(alarmTypeName); event.setUpdateTime(new Date()); event.setPostTime(postTime); deviceEventService.save(event); log.warn(device + ": " + alarmTypeName); JSONObject result = new JSONObject(); result.put("id", device.getDeviceId()); result.put("deviceType", device.getDeviceType()); result.put("status", alarmTypeName); //推送信息到前端 messagingTemplate.convertAndSend("/topic/device", result.toJSONString()); } } }