js/Transaction.js
- import Mitt from './util/Mitt.js'
- import { ObservableEntry, wait } from './util/ObservableEntry.js'
- import progressTracker from './util/eta.js'
- import prettyBytes from './util/prettyBytes.js'
- import createLogger from './util/createLogger.js'
-
- // /** @typedef {import('./RTCSocket.js').default} RTCSocket */
-
- /**
- * 단방향 데이터 전송을 위한 인터페이스. 한 피어에서 다른 피어로 파일과 같은 데이터를 전송할 때 사용됩니다.
- * `stream` 속성을 통해서 읽거나 쓸 수 있는 스트림을 이용할 수 있습니다.
- * 또, 메타데이터 전송 / 전송 컨트롤(일시정지, 재개, 중단) / 전송 속도 및 진행률 추적등의 기능을 제공합니다.
- * 이 클래스는 보내는 쪽과 받는 쪽에서 공통적으로 사용되는 기능을 구현한 베이스로 실제 파일 전송에 관련된 코드는 ReadableTransaction.js와 WritableTransaction.js에 있습니다.
- */
- export default class Transaction extends Mitt {
- /**
- * 트렌젝션을 만듭니다.
- * @param {RTCSocket} socket 데이터 전송에 사용할 RTCSocket
- * @param {object} [metadata] 상대에게 전송할 메타데이터. 트렌젝션이 만들어진 후 `metadata` 속성으로 읽을 수 있습니다. Progress Tracking을 사용하려면 `size` 속성이 필요합니다. 그 이외의 속성은 임의로 추가할 수 있습니다.
- * @param {number} [metadata.size] 바이트로 나타낸 트렌젝션의 크기.
- */
- constructor (socket, metadata = {}) {
- super()
-
- /** @type {RTCSocket} */
- this.socket = socket
- this.metadata = metadata
- this.label = this.socket.label
- this.paused = new ObservableEntry(false)
- this.done = new ObservableEntry(false)
-
- // 전송 상태 트레킹
- this.lastPausedTimestamp = 0
- this.pausedMilliSeconds = 0
- this.processed = new ObservableEntry(0) // byte or length
-
- this.timeout = NaN
- this.logger = createLogger(`Transaction:${this.label}`)
-
- this.initProgressTracking()
- }
-
- async initProgressTracking () {
- // size가 설정되어 있지 않다면 progress tracking을 사용하지 않음
- if (this.metadata === undefined) {
- this.logger.warn('메타데이터가 undefined입니다. Progress Tracking 기능이 동작하지 않습니다.')
- return
- }
- if (typeof this.metadata.size !== 'number') {
- this.logger.warn('메타데이터의 size 필드가 숫자가 아니거나 정의되지 않았습니다. Progress Tracking 기능이 동작하지 않습니다.')
- return
- }
-
- await wait(this.processed).toBeChanged()
-
- // transaction writer 쪽에선 처음 시작부터 속도 측정시
- // 데이터 채널의 버퍼가 다 차기 전이라 속도가 비정상적으로 빠르게 측정되므로 1초 후 시작
- await new Promise(resolve => setTimeout(resolve, 1000))
-
- if (this.done.get()) return
-
- const processed = this.processed.get()
- this.progressTracker = progressTracker({
- min: processed,
- max: this.metadata.size + processed,
- historyTimeConstant: 10
- })
-
- this.timeout = setInterval(() => {
- if (this.paused.get()) return
-
- const timestamp = Date.now() - this.pausedMilliSeconds
- this.progressTracker.report(this.processed.get(), timestamp)
-
- const report = {
- processed: this.processed.get(),
- progress: this.progress,
- eta: this.eta,
- speed: this.speed
- }
-
- this.logger.debug('통계 데이터를 생성했습니다.', report)
- this.emit('report', report)
- }, 500)
- }
-
- get eta () {
- if (!this.progressTracker) {
- return NaN
- }
-
- if (this.paused.get()) {
- return Math.round(this.progressTracker.estimate(this.lastPausedTimestamp))
- }
-
- if (this.processed.get() === this.metadata.size) {
- return 0
- }
-
- return Math.round(this.progressTracker.estimate(Date.now() - this.pausedMilliSeconds)) // 결과는 초
- }
-
- get progress () {
- return this.processed.get() / this.metadata.size
- }
-
- get speed () {
- if (!this.progressTracker) {
- return 'NaNB/s'
- }
-
- if (this.paused.get()) {
- return '0B/s'
- }
-
- return prettyBytes(this.progressTracker.rate()) + '/s'
- }
-
- pause () {
- this.logger.debug('트렌젝션이 일시중지되었습니다.')
- this.paused.set(true)
- this.lastPausedTimestamp = Date.now()
- }
-
- resume () {
- this.logger.debug('트렌젝션이 재시작되었습니다.')
- this.paused.set(false)
- this.pausedMilliSeconds += (Date.now() - this.lastPausedTimestamp)
- }
-
- stopReport () {
- this.logger.debug('통계 데이터 생성이 중단되었습니다.')
- if (isNaN(this.timeout)) return
- clearInterval(this.timeout)
- }
- }