xiewj 2 anni fa
parent
commit
a4dee0f7e2

+ 3 - 3
src/main/java/com/fdkankan/openApi/component/ValidateApiAOP.java

@@ -8,7 +8,7 @@ import com.fdkankan.openApi.constant.HttpStatus;
 import com.fdkankan.openApi.constant.RedisKey;
 import com.fdkankan.openApi.exception.ApiBusinessException;
 import com.fdkankan.openApi.mq.dto.CallApiDTO;
-import com.fdkankan.openApi.mq.sender.Sender;
+import com.fdkankan.openApi.mq.sender.CallApiSender;
 import com.fdkankan.openApi.service.system.IUserAuthService;
 import com.fdkankan.redis.util.RedisUtil;
 import com.fdkankan.web.response.ResultData;
@@ -32,7 +32,7 @@ public class ValidateApiAOP {
     @Autowired
     private RedisUtil redisUtil;
     @Autowired
-    private Sender sender;
+    private CallApiSender callApiSender;
     @Autowired
     private IUserAuthService userAuthService;
     @Autowired
@@ -94,7 +94,7 @@ public class ValidateApiAOP {
             //预减次数
             //进入队列,同步数据库
             log.info("进入队列,同步数据库");
-            sender.callApi(new CallApiDTO(authorization));
+            callApiSender.callApi(new CallApiDTO(authorization));
         } else {
             log.info("业务非正常返回,进行次数回滚+1:{}");
             long incrStock = redisUtil.incr(String.format(RedisKey.API_METHOD_COUNT, authorization, method), 1);

+ 21 - 0
src/main/java/com/fdkankan/openApi/mq/dto/CallApiDTO.java

@@ -0,0 +1,21 @@
+package com.fdkankan.openApi.mq.dto;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author Xiewj
+ * @date 2023/2/3
+ */
+@Data
+public class CallApiDTO implements Serializable {
+    private String appKey;
+
+    public CallApiDTO() {
+    }
+
+    public CallApiDTO(String appKey) {
+        this.appKey = appKey;
+    }
+}

+ 67 - 0
src/main/java/com/fdkankan/openApi/mq/listener/CallApiListener.java

@@ -0,0 +1,67 @@
+package com.fdkankan.openApi.mq.listener;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fdkankan.openApi.mq.dto.CallApiDTO;
+import com.fdkankan.openApi.service.system.IUserAuthService;
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.dao.DeadlockLoserDataAccessException;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * 消息监听器
+ *
+ * @author XiongNeng
+ * @version 1.0
+ * @since 2018/3/1
+ */
+@Component
+@Slf4j
+public class CallApiListener {
+    @Autowired
+    IUserAuthService userAuthService;
+
+    /**
+     * saveOrEdit
+     *
+     * @param channel
+     * @param message
+     * @throws Exception the io exception  这里异常需要处理
+     */
+    @RabbitListener(
+            queuesToDeclare = @Queue("${queue.openApi.call-api}"),
+            concurrency = "${maxThread.openApi.update-count}"
+
+    )
+    public void callApi(Channel channel, Message message) throws Exception {
+        if (ObjectUtils.isEmpty(message.getBody())) {
+            log.error("消息内容为空,退出构建,当前服务器id:{}");
+            return;
+        }
+        long deliveryTag = message.getMessageProperties().getDeliveryTag();
+        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+        String messageId = message.getMessageProperties().getMessageId();
+
+        try {
+            log.info("callApi开始,id:{},deliveryTag:{},消息体:{}", messageId, deliveryTag, msg);
+            CallApiDTO param = JSONObject.parseObject(msg, CallApiDTO.class);
+
+            int i = userAuthService.updateCallCounts(param.getAppKey());
+            log.info("修改数据量为{}", i);
+            channel.basicAck(deliveryTag, false);
+
+        } catch (DeadlockLoserDataAccessException e) {
+            log.info("修改死锁重新入队");
+            channel.basicReject(deliveryTag, true);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 25 - 0
src/main/java/com/fdkankan/openApi/mq/sender/CallApiSender.java

@@ -0,0 +1,25 @@
+package com.fdkankan.openApi.mq.sender;
+
+import com.fdkankan.openApi.mq.dto.CallApiDTO;
+import com.fdkankan.rabbitmq.util.RabbitMqProducer;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author Xiewj
+ * @date 2023/2/17
+ */
+@Service
+@Slf4j
+public class CallApiSender {
+    @Value("${queue.openApi.call-api}")
+    private String callApi;
+    @Autowired
+    private RabbitMqProducer rabbitMqProducer;
+
+    public void callApi(CallApiDTO callApiDTO) {
+        rabbitMqProducer.sendByWorkQueue(callApi, callApiDTO);
+    }
+}