Home Reference Source

js/RTCSocket.js

import Mitt from './util/Mitt.js'
import { ObservableEntry } from './util/ObservableEntry.js'
import once from './util/once.js'
import createLogger from './util/createLogger.js'

// RTCDataChannel은 실제 버퍼 사이즈를 보여주지 않으나 사이즈를 넘게 데이터가 들어가면 채널이 터져버림
// 따라서 10MB를 데이터 채널의 버퍼 사이즈로 생각
// 또 writable 스트림의 버퍼 사이즈로도 사용
const DATA_CHANNEL_BUFFER_SIZE = 10 * 1024 * 1024 // 10MB

export default class RTCSocket extends Mitt {
  // options - received boolean
  constructor (dataChannel, options = {}) {
    super()

    /** @type {RTCDataChannel} */
    this.dataChannel = dataChannel
    this.dataChannel.binaryType = 'arraybuffer'
    this.dataChannel.addEventListener('message', ({ data }) => this.recvData(data))
    this.dataChannel.addEventListener('close', () => {
      this.emit('close')

      if (this.closed) return

      // close() 호출 이외의 이유로 닫힌 경우
      this.ready.set(false)
      this.logger.log('🚫 상대방에 의해서 소켓이 닫혔습니다.')
    })
    this.label = this.dataChannel.label

    // 앞의 메시지가 버퍼 사이즈 문제로 인해 대기 중이라면 이게 true로 설정됨
    // 뒤의 메시지는 이 속성이 false가 되면 처리됨
    this.ready = new ObservableEntry(true)
    this.closed = false

    this.logger = createLogger(`Socket:${this.label}`)

    if (options.received) {
      this.writeEvent('__received')
    }
  }

  async writeEvent (eventName, payload) {
    this.logger.debug(`커스텀 이벤트 ${eventName} 전송을 요청했습니다. 페이로드:`, payload)
    return this.write({
      _channelEngineCustomEvent: true,
      event: eventName,
      payload
    })
  }

  recvData (msg) {
    let data
    if (msg instanceof ArrayBuffer) {
      data = msg
      this.logger.debug('이진 데이터를 전송받았습니다. 사이즈:', msg.byteLength)
    } else {
      data = JSON.parse(msg)
      this.logger.debug('데이터를 전송받았습니다.', data)
    }

    // 커스텀 이벤트 처리
    if (typeof data === 'object') {
      if ('_channelEngineCustomEvent' in data) {
        this.emit(data.event, data.payload)
        return
      }
    }

    this.emit('data', data)
  }

  /**
     *
     * @param {object|string|number|ArrayBuffer} data
     */
  async write (data) {
    let msg
    if (data instanceof ArrayBuffer) {
      // 데이터채널 버퍼 관리
      if (data.byteLength > DATA_CHANNEL_BUFFER_SIZE) {
        throw new Error('데이터 크기가 데이터 채널의 버퍼 사이즈보다 커서 전송할 수 없습니다.')
      }

      if (data.byteLength + this.dataChannel.bufferedAmount > DATA_CHANNEL_BUFFER_SIZE) {
        this.ready.set(false)
        this.dataChannel.bufferedAmountLowThreshold = DATA_CHANNEL_BUFFER_SIZE - data.byteLength
        this.logger.debug(`버퍼에 공간이 확보되기를 대기하는 중입니다. 메시지 사이즈: ${data.byteLength}, 버퍼 공간: ${DATA_CHANNEL_BUFFER_SIZE - this.dataChannel.bufferedAmount}`)
        await once(this.dataChannel, 'bufferedamountlow')
        this.logger.debug('버퍼에 공간이 확보되었습니다.')
        this.dataChannel.bufferedAmountLowThreshold = 0
      }

      msg = data
      this.logger.debug('이진 데이터를 전송했습니다. 사이즈:', msg.byteLength)
    } else {
      msg = JSON.stringify(data)
      this.logger.debug('데이터를 전송했습니다. ', data)
    }

    if (this.dataChannel.readyState !== 'open') {
      await once(this.dataChannel, 'open')
    }

    this.dataChannel.send(msg)
    this.ready.set(true)
  }

  close () {
    this.dataChannel.close()
    this.ready.set(false)
    this.closed = true
    this.logger.log('🚫 close() 메소드가 호출되서 소켓을 닫았습니다.')
  }
}