Home Reference Source

js/Transaction.js

  1. import Mitt from './util/Mitt.js'
  2. import { ObservableEntry, wait } from './util/ObservableEntry.js'
  3. import progressTracker from './util/eta.js'
  4. import prettyBytes from './util/prettyBytes.js'
  5. import createLogger from './util/createLogger.js'
  6.  
  7. // /** @typedef {import('./RTCSocket.js').default} RTCSocket */
  8.  
  9. /**
  10. * 단방향 데이터 전송을 위한 인터페이스. 한 피어에서 다른 피어로 파일과 같은 데이터를 전송할 때 사용됩니다.
  11. * `stream` 속성을 통해서 읽거나 쓸 수 있는 스트림을 이용할 수 있습니다.
  12. * 또, 메타데이터 전송 / 전송 컨트롤(일시정지, 재개, 중단) / 전송 속도 및 진행률 추적등의 기능을 제공합니다.
  13. * 이 클래스는 보내는 쪽과 받는 쪽에서 공통적으로 사용되는 기능을 구현한 베이스로 실제 파일 전송에 관련된 코드는 ReadableTransaction.js와 WritableTransaction.js에 있습니다.
  14. */
  15. export default class Transaction extends Mitt {
  16. /**
  17. * 트렌젝션을 만듭니다.
  18. * @param {RTCSocket} socket 데이터 전송에 사용할 RTCSocket
  19. * @param {object} [metadata] 상대에게 전송할 메타데이터. 트렌젝션이 만들어진 후 `metadata` 속성으로 읽을 수 있습니다. Progress Tracking을 사용하려면 `size` 속성이 필요합니다. 그 이외의 속성은 임의로 추가할 수 있습니다.
  20. * @param {number} [metadata.size] 바이트로 나타낸 트렌젝션의 크기.
  21. */
  22. constructor (socket, metadata = {}) {
  23. super()
  24.  
  25. /** @type {RTCSocket} */
  26. this.socket = socket
  27. this.metadata = metadata
  28. this.label = this.socket.label
  29. this.paused = new ObservableEntry(false)
  30. this.done = new ObservableEntry(false)
  31.  
  32. // 전송 상태 트레킹
  33. this.lastPausedTimestamp = 0
  34. this.pausedMilliSeconds = 0
  35. this.processed = new ObservableEntry(0) // byte or length
  36.  
  37. this.timeout = NaN
  38. this.logger = createLogger(`Transaction:${this.label}`)
  39.  
  40. this.initProgressTracking()
  41. }
  42.  
  43. async initProgressTracking () {
  44. // size가 설정되어 있지 않다면 progress tracking을 사용하지 않음
  45. if (this.metadata === undefined) {
  46. this.logger.warn('메타데이터가 undefined입니다. Progress Tracking 기능이 동작하지 않습니다.')
  47. return
  48. }
  49. if (typeof this.metadata.size !== 'number') {
  50. this.logger.warn('메타데이터의 size 필드가 숫자가 아니거나 정의되지 않았습니다. Progress Tracking 기능이 동작하지 않습니다.')
  51. return
  52. }
  53.  
  54. await wait(this.processed).toBeChanged()
  55.  
  56. // transaction writer 쪽에선 처음 시작부터 속도 측정시
  57. // 데이터 채널의 버퍼가 다 차기 전이라 속도가 비정상적으로 빠르게 측정되므로 1초 후 시작
  58. await new Promise(resolve => setTimeout(resolve, 1000))
  59.  
  60. if (this.done.get()) return
  61.  
  62. const processed = this.processed.get()
  63. this.progressTracker = progressTracker({
  64. min: processed,
  65. max: this.metadata.size + processed,
  66. historyTimeConstant: 10
  67. })
  68.  
  69. this.timeout = setInterval(() => {
  70. if (this.paused.get()) return
  71.  
  72. const timestamp = Date.now() - this.pausedMilliSeconds
  73. this.progressTracker.report(this.processed.get(), timestamp)
  74.  
  75. const report = {
  76. processed: this.processed.get(),
  77. progress: this.progress,
  78. eta: this.eta,
  79. speed: this.speed
  80. }
  81.  
  82. this.logger.debug('통계 데이터를 생성했습니다.', report)
  83. this.emit('report', report)
  84. }, 500)
  85. }
  86.  
  87. get eta () {
  88. if (!this.progressTracker) {
  89. return NaN
  90. }
  91.  
  92. if (this.paused.get()) {
  93. return Math.round(this.progressTracker.estimate(this.lastPausedTimestamp))
  94. }
  95.  
  96. if (this.processed.get() === this.metadata.size) {
  97. return 0
  98. }
  99.  
  100. return Math.round(this.progressTracker.estimate(Date.now() - this.pausedMilliSeconds)) // 결과는 초
  101. }
  102.  
  103. get progress () {
  104. return this.processed.get() / this.metadata.size
  105. }
  106.  
  107. get speed () {
  108. if (!this.progressTracker) {
  109. return 'NaNB/s'
  110. }
  111.  
  112. if (this.paused.get()) {
  113. return '0B/s'
  114. }
  115.  
  116. return prettyBytes(this.progressTracker.rate()) + '/s'
  117. }
  118.  
  119. pause () {
  120. this.logger.debug('트렌젝션이 일시중지되었습니다.')
  121. this.paused.set(true)
  122. this.lastPausedTimestamp = Date.now()
  123. }
  124.  
  125. resume () {
  126. this.logger.debug('트렌젝션이 재시작되었습니다.')
  127. this.paused.set(false)
  128. this.pausedMilliSeconds += (Date.now() - this.lastPausedTimestamp)
  129. }
  130.  
  131. stopReport () {
  132. this.logger.debug('통계 데이터 생성이 중단되었습니다.')
  133. if (isNaN(this.timeout)) return
  134. clearInterval(this.timeout)
  135. }
  136. }