12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package com.fdkankan.manage.service.impl;
- import java.util.Calendar;
- import java.util.Date;
- import cn.hutool.core.collection.CollUtil;
- import com.alibaba.fastjson.JSON;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.fdkankan.common.constant.CommonStatus;
- import com.fdkankan.common.constant.ErrorCode;
- import com.fdkankan.common.constant.RecStatus;
- import com.fdkankan.common.exception.BusinessException;
- import com.fdkankan.manage.common.ResultData;
- import com.fdkankan.manage.entity.MqBackup;
- import com.fdkankan.manage.mapper.IMqBackupMapper;
- import com.fdkankan.manage.service.IMqBackupService;
- import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
- import com.fdkankan.rabbitmq.util.RabbitMqProducer;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.GetResponse;
- import org.springframework.amqp.rabbit.core.ChannelCallback;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import java.io.UnsupportedEncodingException;
- import java.nio.charset.StandardCharsets;
- import java.util.List;
- import java.util.Objects;
- /**
- * <p>
- * 服务实现类
- * </p>
- *
- * @author
- * @since 2023-01-05
- */
- @Service
- public class MqBackupServiceImpl extends ServiceImpl<IMqBackupMapper, MqBackup> implements IMqBackupService {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Autowired
- private RabbitMqProducer rabbitMqProducer;
- @Override
- public ResultData getRabbitMqMsg(String queue, int needRequeue) throws UnsupportedEncodingException {
- if(needRequeue != CommonStatus.YES.code() && needRequeue != CommonStatus.NO.code()){
- throw new BusinessException(ErrorCode.FAILURE_CODE_5012);
- }
- boolean exit = false;
- do {
- GetResponse getResponse = rabbitTemplate.execute(channel -> channel.basicGet(queue, true));
- if(Objects.nonNull(getResponse)){
- byte[] body = getResponse.getBody();
- MqBackup mqBackup = new MqBackup();
- mqBackup.setQueue(queue);
- mqBackup.setMsg(new String(body, StandardCharsets.UTF_8.name()));
- mqBackup.setNeedRequeue(needRequeue);
- mqBackup.setCreateTime(Calendar.getInstance().getTime());
- this.save(mqBackup);
- }else{
- exit = true;
- }
- }while (!exit);
- return ResultData.ok();
- }
- @Override
- public ResultData rabbitmqRequeue(String queue) throws UnsupportedEncodingException {
- int index = 0;
- int size = 500;
- boolean exit = false;
- do {
- List<MqBackup> list = this.list(new LambdaQueryWrapper<MqBackup>()
- .eq(MqBackup::getQueue, queue)
- .eq(MqBackup::getNeedRequeue, CommonStatus.YES.code().intValue())
- .eq(MqBackup::getRequeue, CommonStatus.NO.code().intValue()).last("limit " + index + "," + size));
- if(CollUtil.isNotEmpty(list)){
- list.stream().forEach(mqBackup -> {
- rabbitMqProducer.sendByWorkQueue(mqBackup.getQueue(), JSON.parseObject(mqBackup.getMsg()));
- mqBackup.setRequeue(CommonStatus.YES.code().intValue());
- mqBackup.setRecStatus(RecStatus.DISABLE.code());
- this.updateById(mqBackup);
- this.removeById(mqBackup);
- });
- index += size;
- }else{
- exit = true;
- }
- }while (!exit);
- return ResultData.ok();
- }
- }
|