import { InjectRedis } from '@liaoliaots/nestjs-redis'; import { ConnectedSocket, MessageBody, OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, SubscribeMessage, WebSocketGateway, WebSocketServer, } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; import Redis from 'ioredis'; import { instrument } from '@socket.io/admin-ui'; import * as bcrypt from 'bcrypt'; import { RoomService } from '../room/room.service'; import { Logger } from '@nestjs/common'; import { DelayService } from 'src/room/delay/delay.service'; console.log('SOCKET_NAME', process.env.SOCKET_NAME); console.log('SOCKET_PATH', process.env.SOCKET_PATH); console.log('SOCKET_USE_MSPACK', process.env.SOCKET_USE_MSPACK); @WebSocketGateway(Number(process.env.SOCKET_PORT), { transports: ['websocket'], cors: '*', // namespace: "ws", // path: '/ws-sync', path: process.env.SOCKET_PATH, parser: Number(process.env.SOCKET_USE_MSPACK) === 1 ? require('socket.io-msgpack-parser') : '', }) export class SocketGateway implements OnGatewayInit, OnGatewayDisconnect, OnGatewayConnection { constructor( @InjectRedis() private readonly redis: Redis, private readonly roomService: RoomService, private readonly delayService: DelayService, ) {} public readonly logger = new Logger('socketGateway'); @WebSocketServer() server: Server; // public _loginLimit = new Map(); async handleConnection(@ConnectedSocket() socket: Socket) { const { isBenmark } = socket.handshake.query; if (Number(isBenmark) === 1) { socket.data.isBenmark = 1; } // this.delayService } async handleDisconnect(@ConnectedSocket() socket: Socket) { const deviceId = socket.data.deviceId; const isBenmark = socket.data.isBenmark; if (deviceId) { const redisFlag = `kankan:socket:deviceId`; // const did = this._loginLimit.get(deviceId); const did = await this.redis.hget(redisFlag, deviceId); if (did === socket.id) { await this.redis.hdel(redisFlag, deviceId); // this._loginLimit.delete(deviceId); } if (!!isBenmark) { // console.log('isBenmark', isBenmark); await this.redis.hdel(redisFlag, deviceId); } } await this.roomService.handleUserOffline(socket); socket.removeAllListeners(); socket.disconnect(); // console.log('handleDisconnect', client); } afterInit(server: any) { instrument(server, { auth: { type: 'basic', username: process.env.WATCH_USER, password: bcrypt.hashSync(process.env.WATCH_PASSWORD, 10), }, namespaceName: '/watch', }); this.delayService.init(); } @SubscribeMessage('join') async handleMessage( @MessageBody() message: UserInfoParams, @ConnectedSocket() socket: Socket, ): Promise { message.id = socket.id; // (socket as any).userId = message.userId; // (socket as any).roomId = message.roomId; socket.data.user = message; const isRepeat = await this.handleRepeatJoin(socket, message); // console.log('isRepeat', isRepeat); if (!isRepeat) { this.logger.warn( `当前加入房间:roomId: ${message.roomId} userId: ${message.userId}`, 'join-user', ); socket.join(message.roomId); if (!message.isClient) { await this.roomService.handleUserJoin(socket, message); } } } async handleRepeatJoin( socket: Socket, message: UserInfoParams, ): Promise { const from = !!message.isClient ? 1 : 0; const flag = `${message.userId}-${from}`; socket.data.deviceId = flag; const redisFlag = `kankan:socket:deviceId`; const isExist = await this.redis.hexists(redisFlag, flag); if (!isExist) { await this.redis.hset(redisFlag, flag, socket.id); // this._loginLimit.set(flag, socket.id); return Promise.resolve(false); } else { socket.emit('manager-error', { type: 'repeat-login', code: 306, }); this.logger.warn(`306:${message.userId}`, 'repeat-login'); return Promise.resolve(true); } } // 订阅action通道 @SubscribeMessage('action') async handleActionMessage( @ConnectedSocket() socket: Socket, @MessageBody() message: any, ): Promise { await this.roomService.handleUserAction(socket, message); } // 订阅sync通道 @SubscribeMessage('sync') async handleSyncMessage( @ConnectedSocket() socket: Socket, @MessageBody() message: any, ): Promise { await this.roomService.handleSyncAction(socket, message); } // 订阅paint通道 @SubscribeMessage('paint') async handlePaintessage( @ConnectedSocket() socket: Socket, @MessageBody() message: any, ): Promise { await this.roomService.handlePaintAction(socket, message); } // 订阅主动退出exit通道 @SubscribeMessage('exit') async handleExitessage( @ConnectedSocket() socket: Socket, @MessageBody() message: any, ): Promise { await this.roomService.handleExitAction(socket, message); } @SubscribeMessage('active-status') async handleActive(@ConnectedSocket() socket: Socket) { const sockets = await this.server.fetchSockets(); socket.emit('active-status', { number: sockets.length }); } }