RabbitMqUtil.java 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package com.fdkk.sxz.util;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.stereotype.Component;
  7. import sun.misc.BASE64Encoder;
  8. import java.io.BufferedReader;
  9. import java.io.IOException;
  10. import java.io.InputStreamReader;
  11. import java.net.HttpURLConnection;
  12. import java.net.URL;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. import java.util.Objects;
  16. /**
  17. * @author Xiewj
  18. * @date 2022/3/14
  19. */
  20. @Slf4j
  21. @Component
  22. public class RabbitMqUtil {
  23. @Value("${spring.rabbitmq.host}")
  24. private String host;
  25. @Value("${spring.rabbitmq.apiport}")
  26. private String port;
  27. @Value("${spring.rabbitmq.username}")
  28. private String username;
  29. @Value("${spring.rabbitmq.password}")
  30. private String password;
  31. /**
  32. * 队列任务总数 * * @param queueName * @return
  33. */
  34. public int getMessageCount(String queueName) throws IOException {
  35. String apiMessage = getApiMessage(queueName);
  36. if (Objects.equals(apiMessage, "")) {
  37. RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
  38. return 0;
  39. }
  40. JSONObject jsonObject = JSON.parseObject(apiMessage);
  41. return Integer.parseInt(jsonObject.get("messages").toString());
  42. }
  43. /**
  44. * 队列ready任务数 * * @param queueName * @return
  45. */
  46. public int getMessageReadyCount(String queueName) throws IOException {
  47. String apiMessage = getApiMessage(queueName);
  48. if (Objects.equals(apiMessage, "")) {
  49. RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
  50. return 0;
  51. }
  52. JSONObject jsonObject = JSON.parseObject(apiMessage);
  53. return Integer.parseInt(jsonObject.get("messages_ready").toString());
  54. }
  55. /**
  56. * 队列unack数MQ * * @param queueName * @return
  57. */
  58. public int getMessagesUnacknowledgedCount(String queueName) throws IOException {
  59. String apiMessage = getApiMessage(queueName);
  60. if (Objects.equals(apiMessage, "")) {
  61. RabbitMqUtil.log.error("请求RabbitMQ API时出错!!");
  62. return 0;
  63. }
  64. JSONObject jsonObject = JSON.parseObject(apiMessage);
  65. return Integer.parseInt(jsonObject.get("messages_unacknowledged").toString());
  66. }
  67. /**
  68. * 获取队列消息总数、ready消息数、unack消息数 * * @param queueName * @return Map<String,Integer>
  69. */
  70. public Map<String, Integer> getMQCountMap(String queueName) throws IOException {
  71. String apiMessage = getApiMessage(queueName);
  72. JSONObject jsonObject = JSON.parseObject(apiMessage);
  73. Map<String, Integer> map = new HashMap<>();
  74. map.put("messages", Integer.parseInt(jsonObject.get("messages").toString()));
  75. map.put("messages_ready", Integer.parseInt(jsonObject.get("messages_ready").toString()));
  76. map.put("messages_unacknowledged", Integer.parseInt(jsonObject.get("messages_unacknowledged").toString()));
  77. return map;
  78. }
  79. public String getApiMessage(String queueName) throws IOException {
  80. //发送一个GET请求
  81. HttpURLConnection httpConn = null;
  82. BufferedReader in = null;
  83. String urlString = "http://" + host + ":" + port + "/api/queues/%2f/" + queueName;
  84. URL url = new URL(urlString);
  85. httpConn = (HttpURLConnection) url.openConnection();
  86. //设置用户名密码
  87. String auth = username + ":" + password;
  88. BASE64Encoder enc = new BASE64Encoder();
  89. String encoding = enc.encode(auth.getBytes());
  90. httpConn.setDoOutput(true);
  91. httpConn.setRequestProperty("Authorization", "Basic " + encoding);
  92. // 创建实际的链接
  93. httpConn.connect();
  94. //读取响应
  95. if (httpConn.getResponseCode() == HttpURLConnection.HTTP_OK) {
  96. StringBuilder content = new StringBuilder();
  97. String tempStr = "";
  98. in = new BufferedReader(new InputStreamReader(httpConn.getInputStream()));
  99. while ((tempStr = in.readLine()) != null) {
  100. content.append(tempStr);
  101. }
  102. in.close();
  103. httpConn.disconnect();
  104. return content.toString();
  105. } else {
  106. httpConn.disconnect();
  107. return "";
  108. }
  109. }
  110. }