import { SubscribeMessage, WebSocketGateway, OnGatewayInit, WebSocketServer, OnGatewayConnection, OnGatewayDisconnect, } from '@nestjs/websockets'; import { Server } from 'ws'; import * as WebSocket from 'ws'; import { PeerConnection, initLogger, DataChannel } from 'node-datachannel'; import { Buffer } from 'buffer'; import { Logger } from '@nestjs/common'; import * as path from 'path'; import { createReadStream } from 'fs'; initLogger('Debug'); @WebSocketGateway({ transports: ['websocket'], cors: '*', // namespace: "ws", path: '/ws', }) export class MetaGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect { private logger: Logger = new Logger('MetaGateway'); private peer: PeerConnection = null; private timer: NodeJS.Timeout; private _webrtcInterval: NodeJS.Timeout; private heartBeatFlag: number; private gameChanel: DataChannel; @WebSocketServer() server: Server; // @SubscribeMessage('message') // handleMessage(client: any, payload: any) { // this.logger.log(`payload: ${JSON.stringify(payload)}`); // } @SubscribeMessage('init') handleInit(client: any, payload: any) { this.logger.log(`init: ${JSON.stringify(payload)}`); } @SubscribeMessage('heartbeat') handleHeartBeat(client: any, payload: any) { this.logger.log(`heartbeat: ${JSON.stringify(payload)}`); // console.log('hb', payload); this.heartBeatFlag = payload; const pong = { channel_id: '', client_os: '', data: payload, fe_version: '', id: 'heartbeat', packet_id: '', room_id: '', session_id: '', trace_id: '', user_id: '', }; if (this.gameChanel.isOpen()) { const heartPack = new DataView(new ArrayBuffer(4)); heartPack.setUint32(0, 2009889916); this.gameChanel.sendMessageBinary(Buffer.from(heartPack.buffer)); } return pong; } @SubscribeMessage('init_webrtc') handleInitWebRtc(client: any, payload: any): void { console.log('handleInitWebRtc'); this.peer = new PeerConnection('roomTest', { iceServers: ['stun:120.24.252.95:3478'], }); this.peer.onLocalDescription((sdp, type) => { console.warn('peer SDP:', sdp, ' Type:', type); const offer = { sdp, type }; const offerFormat = { id: 'offer', data: Buffer.from(JSON.stringify(offer)).toString('base64'), }; console.log('send', offerFormat); client.send(JSON.stringify(offerFormat)); // return ''; }); this.peer.onLocalCandidate((candidate, mid) => { console.warn('peer Candidate:', candidate); const iceRes = { candidate, sdpMid: mid, sdpMLineIndex: 0, }; const res = { channel_id: '', client_os: '', data: Buffer.from(JSON.stringify(iceRes)).toString('base64'), fe_version: '', id: 'ice_candidate', packet_id: '', room_id: '', session_id: '', trace_id: '', user_id: '', }; // client.send(JSON.stringify(res)); }); this.peer.onStateChange((state) => { console.log('peer-State:', state); }); this.peer.onGatheringStateChange((state) => { console.log('GatheringState:', state); }); this.gameChanel = this.peer.createDataChannel('game-input'); this.peer.onDataChannel((dc) => { console.log('onDataChannel', dc); }); this.gameChanel.onOpen(() => { console.log('channel is open'); clearInterval(this.timer); let i = 1; const paths = path.join(__dirname, '../ws/video/v11'); console.error('__dirname', __dirname); console.error('paths', paths); if (this.gameChanel.isOpen()) { console.log('gameChanel', this.gameChanel.isOpen()); this.sendWertcHeartPack(this.gameChanel); } this.timer = setInterval(() => { if (i < 30) { const steam = createReadStream(paths + `/${i}.h264`); // const steam = createReadStream(paths + `/test2`); steam.on('data', (data: Buffer) => { // console.log(data.buffer); const frame = new DataView(data.buffer); frame.setUint32(0, 1437227610); frame.setUint16(6, 36); frame.setUint16(24, 0); frame.setUint16(26, 0); frame.setUint32(28, 0); this.gameChanel.sendMessageBinary(Buffer.from(frame.buffer)); }); } i++; }, 10); }); this.gameChanel.onClosed(() => { console.log('gameChanel close'); this.stopSendWertcHeartPack(); }); this.gameChanel.onMessage((event) => { console.log('gameChanel onMessage', event); }); this.gameChanel.onError(() => { console.log('gameChanel close'); this.stopSendWertcHeartPack(); }); } sendWertcHeartPack(channel: DataChannel) { const heartPack = new DataView(new ArrayBuffer(4)); heartPack.setUint32(0, 2009889916); this._webrtcInterval = setInterval(() => { if (channel.isOpen()) { channel.sendMessageBinary(Buffer.from(heartPack.buffer)); } }, 1e3); } stopSendWertcHeartPack(): void { clearInterval(this._webrtcInterval); } @SubscribeMessage('ice_candidate') handlerIceCandidate(client: any, payload: any) { const iceCandidate = Buffer.from(payload, 'base64').toString('utf-8'); const candidate = JSON.parse(iceCandidate); console.error('收到ice_candidate', candidate); this.peer.addRemoteCandidate(candidate.candidate, candidate.sdpMid); } @SubscribeMessage('answer') handerAnswer(client: any, payload: any) { const answer = Buffer.from(payload, 'base64').toString('utf-8'); console.log('answer', answer); const clientAnswer = JSON.parse(answer); this.peer.setLocalDescription(clientAnswer.sdp); this.peer.setRemoteDescription(clientAnswer.sdp, clientAnswer.type); } @SubscribeMessage('start') handlerWebrtcStart(client: any, payload: any) { console.log('start', payload); } afterInit(server: Server) { this.logger.log('Init'); } handleConnection(client: WebSocket, ...args: any[]) { this.logger.log(`Client connected: ${args}`); const connected = { channel_id: '', client_os: '', data: '', fe_version: '', id: 'init', packet_id: '', room_id: '', session_id: '', trace_id: '', user_id: '', }; const tt = JSON.stringify(connected); // console.log('tt', tt); client.send(tt); } handleDisconnect(client: WebSocket) { this.logger.log(`Client disconnected: ${client.id}`); this.peer && this.peer.close(); } }