123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import { InjectRedis } from '@liaoliaots/nestjs-redis';
- import {
- ConnectedSocket,
- MessageBody,
- OnGatewayConnection,
- OnGatewayDisconnect,
- OnGatewayInit,
- SubscribeMessage,
- WebSocketGateway,
- WebSocketServer,
- } from '@nestjs/websockets';
- import { Server, Socket } from 'socket.io';
- import Redis from 'ioredis';
- import { instrument } from '@socket.io/admin-ui';
- import * as bcrypt from 'bcrypt';
- import { RoomService } from '../room/room.service';
- import { Logger } from '@nestjs/common';
- import { DelayService } from 'src/room/delay/delay.service';
- console.log('SOCKET_NAME', process.env.SOCKET_NAME);
- console.log('SOCKET_PATH', process.env.SOCKET_PATH);
- console.log('SOCKET_USE_MSPACK', process.env.SOCKET_USE_MSPACK);
- @WebSocketGateway(Number(process.env.SOCKET_PORT), {
- transports: ['websocket'],
- cors: '*',
- // namespace: "ws",
- // path: '/ws-sync',
- path: process.env.SOCKET_PATH,
- parser:
- Number(process.env.SOCKET_USE_MSPACK) === 1
- ? require('socket.io-msgpack-parser')
- : '',
- })
- export class SocketGateway
- implements OnGatewayInit, OnGatewayDisconnect, OnGatewayConnection
- {
- constructor(
- @InjectRedis() private readonly redis: Redis,
- private readonly roomService: RoomService,
- private readonly delayService: DelayService,
- ) {}
- public readonly logger = new Logger('socketGateway');
- @WebSocketServer() server: Server;
- // public _loginLimit = new Map<string, string>();
- async handleConnection(@ConnectedSocket() socket: Socket) {
- const { isBenmark } = socket.handshake.query;
- if (Number(isBenmark) === 1) {
- socket.data.isBenmark = 1;
- }
- // this.delayService
- }
- async handleDisconnect(@ConnectedSocket() socket: Socket) {
- const deviceId = socket.data.deviceId;
- const isBenmark = socket.data.isBenmark;
- if (deviceId) {
- const redisFlag = `kankan:socket:deviceId`;
- // const did = this._loginLimit.get(deviceId);
- const did = await this.redis.hget(redisFlag, deviceId);
- if (did === socket.id) {
- await this.redis.hdel(redisFlag, deviceId);
- // this._loginLimit.delete(deviceId);
- }
- if (!!isBenmark) {
- // console.log('isBenmark', isBenmark);
- await this.redis.hdel(redisFlag, deviceId);
- }
- }
- await this.roomService.handleUserOffline(socket);
- socket.removeAllListeners();
- socket.disconnect();
- // console.log('handleDisconnect', client);
- }
- afterInit(server: any) {
- instrument(server, {
- auth: {
- type: 'basic',
- username: process.env.WATCH_USER,
- password: bcrypt.hashSync(process.env.WATCH_PASSWORD, 10),
- },
- namespaceName: '/watch',
- });
- this.delayService.init();
- }
- @SubscribeMessage('join')
- async handleMessage(
- @MessageBody() message: UserInfoParams,
- @ConnectedSocket() socket: Socket,
- ): Promise<void> {
- message.id = socket.id;
- // (socket as any).userId = message.userId;
- // (socket as any).roomId = message.roomId;
- socket.data.user = message;
- const isRepeat = await this.handleRepeatJoin(socket, message);
- // console.log('isRepeat', isRepeat);
- if (!isRepeat) {
- this.logger.warn(
- `当前加入房间:roomId: ${message.roomId} userId: ${message.userId}`,
- 'join-user',
- );
- socket.join(message.roomId);
- if (!message.isClient) {
- await this.roomService.handleUserJoin(socket, message);
- }
- }
- }
- async handleRepeatJoin(
- socket: Socket,
- message: UserInfoParams,
- ): Promise<boolean> {
- const from = !!message.isClient ? 1 : 0;
- const flag = `${message.userId}-${from}`;
- socket.data.deviceId = flag;
- const redisFlag = `kankan:socket:deviceId`;
- const isExist = await this.redis.hexists(redisFlag, flag);
- if (!isExist) {
- await this.redis.hset(redisFlag, flag, socket.id);
- // this._loginLimit.set(flag, socket.id);
- return Promise.resolve(false);
- } else {
- socket.emit('manager-error', {
- type: 'repeat-login',
- code: 306,
- });
- this.logger.warn(`306:${message.userId}`, 'repeat-login');
- return Promise.resolve(true);
- }
- }
- // 订阅action通道
- @SubscribeMessage('action')
- async handleActionMessage(
- @ConnectedSocket() socket: Socket,
- @MessageBody() message: any,
- ): Promise<void> {
- await this.roomService.handleUserAction(socket, message);
- }
- // 订阅sync通道
- @SubscribeMessage('sync')
- async handleSyncMessage(
- @ConnectedSocket() socket: Socket,
- @MessageBody() message: any,
- ): Promise<void> {
- await this.roomService.handleSyncAction(socket, message);
- }
- // 订阅paint通道
- @SubscribeMessage('paint')
- async handlePaintessage(
- @ConnectedSocket() socket: Socket,
- @MessageBody() message: any,
- ): Promise<void> {
- await this.roomService.handlePaintAction(socket, message);
- }
- // 订阅主动退出exit通道
- @SubscribeMessage('exit')
- async handleExitessage(
- @ConnectedSocket() socket: Socket,
- @MessageBody() message: any,
- ): Promise<void> {
- await this.roomService.handleExitAction(socket, message);
- }
- @SubscribeMessage('active-status')
- async handleActive(@ConnectedSocket() socket: Socket) {
- const sockets = await this.server.fetchSockets();
- socket.emit('active-status', { number: sockets.length });
- }
- }
|