package com.fdkankan.manage.service.impl; import java.util.Calendar; import java.util.Date; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fdkankan.common.constant.CommonStatus; import com.fdkankan.common.constant.ErrorCode; import com.fdkankan.common.constant.RecStatus; import com.fdkankan.common.exception.BusinessException; import com.fdkankan.manage.common.ResultData; import com.fdkankan.manage.entity.MqBackup; import com.fdkankan.manage.mapper.IMqBackupMapper; import com.fdkankan.manage.service.IMqBackupService; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.fdkankan.rabbitmq.util.RabbitMqProducer; import com.rabbitmq.client.Channel; import com.rabbitmq.client.GetResponse; import org.springframework.amqp.rabbit.core.ChannelCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; /** *

* 服务实现类 *

* * @author * @since 2023-01-05 */ @Service public class MqBackupServiceImpl extends ServiceImpl implements IMqBackupService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitMqProducer rabbitMqProducer; @Override public ResultData getRabbitMqMsg(String queue, int needRequeue) throws UnsupportedEncodingException { if(needRequeue != CommonStatus.YES.code() && needRequeue != CommonStatus.NO.code()){ throw new BusinessException(ErrorCode.FAILURE_CODE_5012); } boolean exit = false; do { GetResponse getResponse = rabbitTemplate.execute(channel -> channel.basicGet(queue, true)); if(Objects.nonNull(getResponse)){ byte[] body = getResponse.getBody(); MqBackup mqBackup = new MqBackup(); mqBackup.setQueue(queue); mqBackup.setMsg(new String(body, StandardCharsets.UTF_8.name())); mqBackup.setNeedRequeue(needRequeue); mqBackup.setCreateTime(Calendar.getInstance().getTime()); this.save(mqBackup); }else{ exit = true; } }while (!exit); return ResultData.ok(); } @Override public ResultData rabbitmqRequeue(String queue) throws UnsupportedEncodingException { int index = 0; int size = 500; boolean exit = false; do { List list = this.list(new LambdaQueryWrapper() .eq(MqBackup::getQueue, queue) .eq(MqBackup::getNeedRequeue, CommonStatus.YES.code().intValue()) .eq(MqBackup::getRequeue, CommonStatus.NO.code().intValue()).last("limit " + index + "," + size)); if(CollUtil.isNotEmpty(list)){ list.stream().forEach(mqBackup -> { rabbitMqProducer.sendByWorkQueue(mqBackup.getQueue(), JSON.parseObject(mqBackup.getMsg())); mqBackup.setRequeue(CommonStatus.YES.code().intValue()); mqBackup.setRecStatus(RecStatus.DISABLE.code()); this.updateById(mqBackup); this.removeById(mqBackup); }); index += size; }else{ exit = true; } }while (!exit); return ResultData.ok(); } }