MqBackupServiceImpl.java 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package com.fdkankan.manage.service.impl;
  2. import java.util.Calendar;
  3. import java.util.Date;
  4. import cn.hutool.core.collection.CollUtil;
  5. import com.alibaba.fastjson.JSON;
  6. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  7. import com.fdkankan.common.constant.CommonStatus;
  8. import com.fdkankan.common.constant.ErrorCode;
  9. import com.fdkankan.common.constant.RecStatus;
  10. import com.fdkankan.common.exception.BusinessException;
  11. import com.fdkankan.manage.common.ResultData;
  12. import com.fdkankan.manage.entity.MqBackup;
  13. import com.fdkankan.manage.mapper.IMqBackupMapper;
  14. import com.fdkankan.manage.service.IMqBackupService;
  15. import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
  16. import com.fdkankan.rabbitmq.util.RabbitMqProducer;
  17. import com.rabbitmq.client.Channel;
  18. import com.rabbitmq.client.GetResponse;
  19. import org.springframework.amqp.rabbit.core.ChannelCallback;
  20. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.stereotype.Service;
  23. import java.io.UnsupportedEncodingException;
  24. import java.nio.charset.StandardCharsets;
  25. import java.util.List;
  26. import java.util.Objects;
  27. /**
  28. * <p>
  29. * 服务实现类
  30. * </p>
  31. *
  32. * @author
  33. * @since 2023-01-05
  34. */
  35. @Service
  36. public class MqBackupServiceImpl extends ServiceImpl<IMqBackupMapper, MqBackup> implements IMqBackupService {
  37. @Autowired
  38. private RabbitTemplate rabbitTemplate;
  39. @Autowired
  40. private RabbitMqProducer rabbitMqProducer;
  41. @Override
  42. public ResultData getRabbitMqMsg(String queue, int needRequeue) throws UnsupportedEncodingException {
  43. if(needRequeue != CommonStatus.YES.code() && needRequeue != CommonStatus.NO.code()){
  44. throw new BusinessException(ErrorCode.FAILURE_CODE_5012);
  45. }
  46. boolean exit = false;
  47. do {
  48. GetResponse getResponse = rabbitTemplate.execute(channel -> channel.basicGet(queue, true));
  49. if(Objects.nonNull(getResponse)){
  50. byte[] body = getResponse.getBody();
  51. MqBackup mqBackup = new MqBackup();
  52. mqBackup.setQueue(queue);
  53. mqBackup.setMsg(new String(body, StandardCharsets.UTF_8.name()));
  54. mqBackup.setNeedRequeue(needRequeue);
  55. mqBackup.setCreateTime(Calendar.getInstance().getTime());
  56. this.save(mqBackup);
  57. }else{
  58. exit = true;
  59. }
  60. }while (!exit);
  61. return ResultData.ok();
  62. }
  63. @Override
  64. public ResultData rabbitmqRequeue(String queue) throws UnsupportedEncodingException {
  65. int index = 0;
  66. int size = 500;
  67. boolean exit = false;
  68. do {
  69. List<MqBackup> list = this.list(new LambdaQueryWrapper<MqBackup>()
  70. .eq(MqBackup::getQueue, queue)
  71. .eq(MqBackup::getNeedRequeue, CommonStatus.YES.code().intValue())
  72. .eq(MqBackup::getRequeue, CommonStatus.NO.code().intValue()).last("limit " + index + "," + size));
  73. if(CollUtil.isNotEmpty(list)){
  74. list.stream().forEach(mqBackup -> {
  75. rabbitMqProducer.sendByWorkQueue(mqBackup.getQueue(), JSON.parseObject(mqBackup.getMsg()));
  76. mqBackup.setRequeue(CommonStatus.YES.code().intValue());
  77. mqBackup.setRecStatus(RecStatus.DISABLE.code());
  78. this.updateById(mqBackup);
  79. this.removeById(mqBackup);
  80. });
  81. index += size;
  82. }else{
  83. exit = true;
  84. }
  85. }while (!exit);
  86. return ResultData.ok();
  87. }
  88. }