Home Reference Source

js/Transaction.js

import Mitt from './util/Mitt.js'
import { ObservableEntry, wait } from './util/ObservableEntry.js'
import progressTracker from './util/eta.js'
import prettyBytes from './util/prettyBytes.js'
import createLogger from './util/createLogger.js'

// /** @typedef {import('./RTCSocket.js').default} RTCSocket */

/**
 * 단방향 데이터 전송을 위한 인터페이스. 한 피어에서 다른 피어로 파일과 같은 데이터를 전송할 때 사용됩니다.
 * `stream` 속성을 통해서 읽거나 쓸 수 있는 스트림을 이용할 수 있습니다.
 * 또, 메타데이터 전송 / 전송 컨트롤(일시정지, 재개, 중단) / 전송 속도 및 진행률 추적등의 기능을 제공합니다.
 * 이 클래스는 보내는 쪽과 받는 쪽에서 공통적으로 사용되는 기능을 구현한 베이스로 실제 파일 전송에 관련된 코드는 ReadableTransaction.js와 WritableTransaction.js에 있습니다.
 */
export default class Transaction extends Mitt {
  /**
   * 트렌젝션을 만듭니다.
   * @param {RTCSocket} socket 데이터 전송에 사용할 RTCSocket
   * @param {object} [metadata] 상대에게 전송할 메타데이터. 트렌젝션이 만들어진 후 `metadata` 속성으로 읽을 수 있습니다. Progress Tracking을 사용하려면 `size` 속성이 필요합니다. 그 이외의 속성은 임의로 추가할 수 있습니다.
   * @param {number} [metadata.size] 바이트로 나타낸 트렌젝션의 크기.
   */
  constructor (socket, metadata = {}) {
    super()

    /** @type {RTCSocket} */
    this.socket = socket
    this.metadata = metadata
    this.label = this.socket.label
    this.paused = new ObservableEntry(false)
    this.done = new ObservableEntry(false)

    // 전송 상태 트레킹
    this.lastPausedTimestamp = 0
    this.pausedMilliSeconds = 0
    this.processed = new ObservableEntry(0) // byte or length

    this.timeout = NaN
    this.logger = createLogger(`Transaction:${this.label}`)

    this.initProgressTracking()
  }

  async initProgressTracking () {
    // size가 설정되어 있지 않다면 progress tracking을 사용하지 않음
    if (this.metadata === undefined) {
      this.logger.warn('메타데이터가 undefined입니다. Progress Tracking 기능이 동작하지 않습니다.')
      return
    }
    if (typeof this.metadata.size !== 'number') {
      this.logger.warn('메타데이터의 size 필드가 숫자가 아니거나 정의되지 않았습니다. Progress Tracking 기능이 동작하지 않습니다.')
      return
    }

    await wait(this.processed).toBeChanged()

    // transaction writer 쪽에선 처음 시작부터 속도 측정시
    // 데이터 채널의 버퍼가 다 차기 전이라 속도가 비정상적으로 빠르게 측정되므로 1초 후 시작
    await new Promise(resolve => setTimeout(resolve, 1000))

    if (this.done.get()) return

    const processed = this.processed.get()
    this.progressTracker = progressTracker({
      min: processed,
      max: this.metadata.size + processed,
      historyTimeConstant: 10
    })

    this.timeout = setInterval(() => {
      if (this.paused.get()) return

      const timestamp = Date.now() - this.pausedMilliSeconds
      this.progressTracker.report(this.processed.get(), timestamp)

      const report = {
        processed: this.processed.get(),
        progress: this.progress,
        eta: this.eta,
        speed: this.speed
      }

      this.logger.debug('통계 데이터를 생성했습니다.', report)
      this.emit('report', report)
    }, 500)
  }

  get eta () {
    if (!this.progressTracker) {
      return NaN
    }

    if (this.paused.get()) {
      return Math.round(this.progressTracker.estimate(this.lastPausedTimestamp))
    }

    if (this.processed.get() === this.metadata.size) {
      return 0
    }

    return Math.round(this.progressTracker.estimate(Date.now() - this.pausedMilliSeconds)) // 결과는 초
  }

  get progress () {
    return this.processed.get() / this.metadata.size
  }

  get speed () {
    if (!this.progressTracker) {
      return 'NaNB/s'
    }

    if (this.paused.get()) {
      return '0B/s'
    }

    return prettyBytes(this.progressTracker.rate()) + '/s'
  }

  pause () {
    this.logger.debug('트렌젝션이 일시중지되었습니다.')
    this.paused.set(true)
    this.lastPausedTimestamp = Date.now()
  }

  resume () {
    this.logger.debug('트렌젝션이 재시작되었습니다.')
    this.paused.set(false)
    this.pausedMilliSeconds += (Date.now() - this.lastPausedTimestamp)
  }

  stopReport () {
    this.logger.debug('통계 데이터 생성이 중단되었습니다.')
    if (isNaN(this.timeout)) return
    clearInterval(this.timeout)
  }
}