Home Reference Source

js/ReadableTransaction.js

import Transaction from './Transaction.js'

export default class ReadableTransaction extends Transaction {
  /**
   * 트렌젝션을 만듭니다.
   * @param {RTCSocket} socket 데이터 전송에 사용할 RTCSocket
   * @param {object} [metadata] 상대에게 전송할 메타데이터. 트렌젝션이 만들어진 후 `metadata` 속성으로 읽을 수 있습니다. Progress Tracking을 사용하려면 `size` 속성이 필요합니다. 그 이외의 속성은 임의로 추가할 수 있습니다.
   * @param {number} [metadata.size] 바이트로 나타낸 트렌젝션의 크기.
   */
  constructor (socket, metadata = {}) {
    super(socket, metadata)

    this.bufferFullInformed = false
    this.aborted = false
    this.canceled = false
    this.cancelReason = null

    this.stream = new ReadableStream({
      start: controller => {
        const isDone = () => this.processed.get() === metadata.size

        socket.on('data', data => {
          if (!(data instanceof ArrayBuffer)) return

          controller.enqueue(data)
          this.processed.set(this.processed.get() + data.byteLength)

          if (controller.desiredSize < 0 && !this.bufferFullInformed) {
            this.bufferFullInformed = true
            socket.writeEvent('buffer-full')
            this.logger.debug('ReadableStream 버퍼가 가득 찼습니다. 전송 일시중지를 요청했습니다.')
          }

          if (isDone()) {
            socket.close()
            controller.close()
            this.done.set(true)
            this.logger.log('⚡ 전송이 완료되었습니다.')
          }
        })

        socket.once('close', () => {
          this.stopReport()
          if (this.aborted || isDone()) return

          if (this.canceled) {
            if (this.cancelReason instanceof Error) {
              controller.error(this.cancelReason)
            } else {
              controller.error('트렌젝션이 Cancel되었습니다. 이유: ' + this.cancelReason)
            }
          }

          controller.error(new Error('예상치 못하게 소켓이 닫혔습니다.'))
        })

        socket.once('abort', errMsg => {
          this.aborted = true
          socket.close()
          controller.error(new Error('상대방이 트렌젝션의 WritableStream을 Abort했습니다.' + errMsg))
        })

        // writer측에서 일시정지/재개됬을때
        socket.on('pause', () => {
          this.logger.debug('상대방이 트렌젝션 일시중지를 요청했습니다.')
          super.pause()
        })
        socket.on('resume', () => {
          this.logger.debug('상대방이 트렌젝션 재시작을 요청했습니다.')
          super.resume()
        })
      },
      pull: () => {
        if (!this.bufferFullInformed) return

        socket.writeEvent('pull')
        this.bufferFullInformed = false
        this.logger.debug('ReadableStream 버퍼가 공간이 확보되어서 전송 재개를 요청했습니다.')
      },
      cancel: reason => {
        this.cancel(reason)
      }
    }, new ByteLengthQueuingStrategy({ highWaterMark: 10 * 1024 * 1024 /* 10 MB */ }))
  }

  cancel (reason = '') {
    this.canceled = true
    this.cancelReason = reason

    if (reason instanceof Error) {
      this.socket.writeEvent('cancel', reason.message)
    } else {
      this.socket.writeEvent('cancel', reason)
    }

    this.logger.warn('트렌젝션의 ReadableStream이 Cancel되었습니다. 이유: ', reason)
  }

  stop () {
    this.cancel('받는 쪽에서 stop() 메소드를 호출했습니다.')
  }

  pause () {
    super.pause()
    this.socket.writeEvent('pause')
  }

  resume () {
    super.resume()
    this.socket.writeEvent('resume')
  }
}