123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- package com.fdkk.sxz.util;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import sun.misc.BASE64Encoder;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.net.HttpURLConnection;
- import java.net.URL;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.Objects;
- /**
- * @author Xiewj
- * @date 2022/3/14
- */
- @Slf4j
- @Component
- public class RabbitMqUtil {
- @Value("${spring.rabbitmq.host}")
- private String host;
- @Value("${spring.rabbitmq.apiport}")
- private String port;
- @Value("${spring.rabbitmq.username}")
- private String username;
- @Value("${spring.rabbitmq.password}")
- private String password;
- /**
- * 队列任务总数 * * @param queueName * @return
- */
- public int getMessageCount(String queueName) throws IOException {
- String apiMessage = getApiMessage(queueName);
- if (Objects.equals(apiMessage, "")) {
- RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
- return 0;
- }
- JSONObject jsonObject = JSON.parseObject(apiMessage);
- return Integer.parseInt(jsonObject.get("messages").toString());
- }
- /**
- * 队列ready任务数 * * @param queueName * @return
- */
- public int getMessageReadyCount(String queueName) throws IOException {
- String apiMessage = getApiMessage(queueName);
- if (Objects.equals(apiMessage, "")) {
- RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
- return 0;
- }
- JSONObject jsonObject = JSON.parseObject(apiMessage);
- return Integer.parseInt(jsonObject.get("messages_ready").toString());
- }
- /**
- * 队列unack数MQ * * @param queueName * @return
- */
- public int getMessagesUnacknowledgedCount(String queueName) throws IOException {
- String apiMessage = getApiMessage(queueName);
- if (Objects.equals(apiMessage, "")) {
- RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
- return 0;
- }
- JSONObject jsonObject = JSON.parseObject(apiMessage);
- return Integer.parseInt(jsonObject.get("messages_unacknowledged").toString());
- }
- /**
- * 获取队列消息总数、ready消息数、unack消息数 * * @param queueName * @return Map<String,Integer>
- */
- public Map<String, Integer> getMQCountMap(String queueName) throws IOException {
- String apiMessage = getApiMessage(queueName);
- JSONObject jsonObject = JSON.parseObject(apiMessage);
- Map<String, Integer> map = new HashMap<>();
- map.put("messages", Integer.parseInt(jsonObject.get("messages").toString()));
- map.put("messages_ready", Integer.parseInt(jsonObject.get("messages_ready").toString()));
- map.put("messages_unacknowledged", Integer.parseInt(jsonObject.get("messages_unacknowledged").toString()));
- return map;
- }
- public String getApiMessage(String queueName) throws IOException {
- //发送一个GET请求
- HttpURLConnection httpConn = null;
- BufferedReader in = null;
- String urlString = "http://" + host + ":" + port + "/api/queues/%2f/" + queueName;
- URL url = new URL(urlString);
- httpConn = (HttpURLConnection) url.openConnection();
- //设置用户名密码
- String auth = username + ":" + password;
- BASE64Encoder enc = new BASE64Encoder();
- String encoding = enc.encode(auth.getBytes());
- httpConn.setDoOutput(true);
- httpConn.setRequestProperty("Authorization", "Basic " + encoding);
- // 创建实际的链接
- httpConn.connect();
- //读取响应
- if (httpConn.getResponseCode() == HttpURLConnection.HTTP_OK) {
- StringBuilder content = new StringBuilder();
- String tempStr = "";
- in = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
- while ((tempStr = in.readLine()) != null) {
- content.append(tempStr);
- }
- in.close();
- httpConn.disconnect();
- return content.toString();
- } else {
- httpConn.disconnect();
- return "";
- }
- }
- }
|