package com.fdkankan.manage.service.impl;
import java.util.Calendar;
import java.util.Date;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fdkankan.common.constant.CommonStatus;
import com.fdkankan.common.constant.ErrorCode;
import com.fdkankan.common.constant.RecStatus;
import com.fdkankan.common.exception.BusinessException;
import com.fdkankan.manage.common.ResultData;
import com.fdkankan.manage.entity.MqBackup;
import com.fdkankan.manage.mapper.IMqBackupMapper;
import com.fdkankan.manage.service.IMqBackupService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.fdkankan.rabbitmq.util.RabbitMqProducer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
/**
*
* 服务实现类
*
*
* @author
* @since 2023-01-05
*/
@Service
public class MqBackupServiceImpl extends ServiceImpl implements IMqBackupService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitMqProducer rabbitMqProducer;
@Override
public ResultData getRabbitMqMsg(String queue, int needRequeue) throws UnsupportedEncodingException {
if(needRequeue != CommonStatus.YES.code() && needRequeue != CommonStatus.NO.code()){
throw new BusinessException(ErrorCode.FAILURE_CODE_5012);
}
boolean exit = false;
do {
GetResponse getResponse = rabbitTemplate.execute(channel -> channel.basicGet(queue, true));
if(Objects.nonNull(getResponse)){
byte[] body = getResponse.getBody();
MqBackup mqBackup = new MqBackup();
mqBackup.setQueue(queue);
mqBackup.setMsg(new String(body, StandardCharsets.UTF_8.name()));
mqBackup.setNeedRequeue(needRequeue);
mqBackup.setCreateTime(Calendar.getInstance().getTime());
this.save(mqBackup);
}else{
exit = true;
}
}while (!exit);
return ResultData.ok();
}
@Override
public ResultData rabbitmqRequeue(String queue) throws UnsupportedEncodingException {
int index = 0;
int size = 500;
boolean exit = false;
do {
List list = this.list(new LambdaQueryWrapper()
.eq(MqBackup::getQueue, queue)
.eq(MqBackup::getNeedRequeue, CommonStatus.YES.code().intValue())
.eq(MqBackup::getRequeue, CommonStatus.NO.code().intValue()).last("limit " + index + "," + size));
if(CollUtil.isNotEmpty(list)){
list.stream().forEach(mqBackup -> {
rabbitMqProducer.sendByWorkQueue(mqBackup.getQueue(), JSON.parseObject(mqBackup.getMsg()));
mqBackup.setRequeue(CommonStatus.YES.code().intValue());
mqBackup.setRecStatus(RecStatus.DISABLE.code());
this.updateById(mqBackup);
this.removeById(mqBackup);
});
index += size;
}else{
exit = true;
}
}while (!exit);
return ResultData.ok();
}
}