Home Reference Source

js/ReadableTransaction.js

  1. import Transaction from './Transaction.js'
  2.  
  3. export default class ReadableTransaction extends Transaction {
  4. /**
  5. * 트렌젝션을 만듭니다.
  6. * @param {RTCSocket} socket 데이터 전송에 사용할 RTCSocket
  7. * @param {object} [metadata] 상대에게 전송할 메타데이터. 트렌젝션이 만들어진 후 `metadata` 속성으로 읽을 수 있습니다. Progress Tracking을 사용하려면 `size` 속성이 필요합니다. 그 이외의 속성은 임의로 추가할 수 있습니다.
  8. * @param {number} [metadata.size] 바이트로 나타낸 트렌젝션의 크기.
  9. */
  10. constructor (socket, metadata = {}) {
  11. super(socket, metadata)
  12.  
  13. this.bufferFullInformed = false
  14. this.aborted = false
  15. this.canceled = false
  16. this.cancelReason = null
  17.  
  18. this.stream = new ReadableStream({
  19. start: controller => {
  20. const isDone = () => this.processed.get() === metadata.size
  21.  
  22. socket.on('data', data => {
  23. if (!(data instanceof ArrayBuffer)) return
  24.  
  25. controller.enqueue(data)
  26. this.processed.set(this.processed.get() + data.byteLength)
  27.  
  28. if (controller.desiredSize < 0 && !this.bufferFullInformed) {
  29. this.bufferFullInformed = true
  30. socket.writeEvent('buffer-full')
  31. this.logger.debug('ReadableStream 버퍼가 가득 찼습니다. 전송 일시중지를 요청했습니다.')
  32. }
  33.  
  34. if (isDone()) {
  35. socket.close()
  36. controller.close()
  37. this.done.set(true)
  38. this.logger.log('⚡ 전송이 완료되었습니다.')
  39. }
  40. })
  41.  
  42. socket.once('close', () => {
  43. this.stopReport()
  44. if (this.aborted || isDone()) return
  45.  
  46. if (this.canceled) {
  47. if (this.cancelReason instanceof Error) {
  48. controller.error(this.cancelReason)
  49. } else {
  50. controller.error('트렌젝션이 Cancel되었습니다. 이유: ' + this.cancelReason)
  51. }
  52. }
  53.  
  54. controller.error(new Error('예상치 못하게 소켓이 닫혔습니다.'))
  55. })
  56.  
  57. socket.once('abort', errMsg => {
  58. this.aborted = true
  59. socket.close()
  60. controller.error(new Error('상대방이 트렌젝션의 WritableStream을 Abort했습니다.' + errMsg))
  61. })
  62.  
  63. // writer측에서 일시정지/재개됬을때
  64. socket.on('pause', () => {
  65. this.logger.debug('상대방이 트렌젝션 일시중지를 요청했습니다.')
  66. super.pause()
  67. })
  68. socket.on('resume', () => {
  69. this.logger.debug('상대방이 트렌젝션 재시작을 요청했습니다.')
  70. super.resume()
  71. })
  72. },
  73. pull: () => {
  74. if (!this.bufferFullInformed) return
  75.  
  76. socket.writeEvent('pull')
  77. this.bufferFullInformed = false
  78. this.logger.debug('ReadableStream 버퍼가 공간이 확보되어서 전송 재개를 요청했습니다.')
  79. },
  80. cancel: reason => {
  81. this.cancel(reason)
  82. }
  83. }, new ByteLengthQueuingStrategy({ highWaterMark: 10 * 1024 * 1024 /* 10 MB */ }))
  84. }
  85.  
  86. cancel (reason = '') {
  87. this.canceled = true
  88. this.cancelReason = reason
  89.  
  90. if (reason instanceof Error) {
  91. this.socket.writeEvent('cancel', reason.message)
  92. } else {
  93. this.socket.writeEvent('cancel', reason)
  94. }
  95.  
  96. this.logger.warn('트렌젝션의 ReadableStream이 Cancel되었습니다. 이유: ', reason)
  97. }
  98.  
  99. stop () {
  100. this.cancel('받는 쪽에서 stop() 메소드를 호출했습니다.')
  101. }
  102.  
  103. pause () {
  104. super.pause()
  105. this.socket.writeEvent('pause')
  106. }
  107.  
  108. resume () {
  109. super.resume()
  110. this.socket.writeEvent('resume')
  111. }
  112. }