js/ReadableTransaction.js
- import Transaction from './Transaction.js'
-
- export default class ReadableTransaction extends Transaction {
- /**
- * 트렌젝션을 만듭니다.
- * @param {RTCSocket} socket 데이터 전송에 사용할 RTCSocket
- * @param {object} [metadata] 상대에게 전송할 메타데이터. 트렌젝션이 만들어진 후 `metadata` 속성으로 읽을 수 있습니다. Progress Tracking을 사용하려면 `size` 속성이 필요합니다. 그 이외의 속성은 임의로 추가할 수 있습니다.
- * @param {number} [metadata.size] 바이트로 나타낸 트렌젝션의 크기.
- */
- constructor (socket, metadata = {}) {
- super(socket, metadata)
-
- this.bufferFullInformed = false
- this.aborted = false
- this.canceled = false
- this.cancelReason = null
-
- this.stream = new ReadableStream({
- start: controller => {
- const isDone = () => this.processed.get() === metadata.size
-
- socket.on('data', data => {
- if (!(data instanceof ArrayBuffer)) return
-
- controller.enqueue(data)
- this.processed.set(this.processed.get() + data.byteLength)
-
- if (controller.desiredSize < 0 && !this.bufferFullInformed) {
- this.bufferFullInformed = true
- socket.writeEvent('buffer-full')
- this.logger.debug('ReadableStream 버퍼가 가득 찼습니다. 전송 일시중지를 요청했습니다.')
- }
-
- if (isDone()) {
- socket.close()
- controller.close()
- this.done.set(true)
- this.logger.log('⚡ 전송이 완료되었습니다.')
- }
- })
-
- socket.once('close', () => {
- this.stopReport()
- if (this.aborted || isDone()) return
-
- if (this.canceled) {
- if (this.cancelReason instanceof Error) {
- controller.error(this.cancelReason)
- } else {
- controller.error('트렌젝션이 Cancel되었습니다. 이유: ' + this.cancelReason)
- }
- }
-
- controller.error(new Error('예상치 못하게 소켓이 닫혔습니다.'))
- })
-
- socket.once('abort', errMsg => {
- this.aborted = true
- socket.close()
- controller.error(new Error('상대방이 트렌젝션의 WritableStream을 Abort했습니다.' + errMsg))
- })
-
- // writer측에서 일시정지/재개됬을때
- socket.on('pause', () => {
- this.logger.debug('상대방이 트렌젝션 일시중지를 요청했습니다.')
- super.pause()
- })
- socket.on('resume', () => {
- this.logger.debug('상대방이 트렌젝션 재시작을 요청했습니다.')
- super.resume()
- })
- },
- pull: () => {
- if (!this.bufferFullInformed) return
-
- socket.writeEvent('pull')
- this.bufferFullInformed = false
- this.logger.debug('ReadableStream 버퍼가 공간이 확보되어서 전송 재개를 요청했습니다.')
- },
- cancel: reason => {
- this.cancel(reason)
- }
- }, new ByteLengthQueuingStrategy({ highWaterMark: 10 * 1024 * 1024 /* 10 MB */ }))
- }
-
- cancel (reason = '') {
- this.canceled = true
- this.cancelReason = reason
-
- if (reason instanceof Error) {
- this.socket.writeEvent('cancel', reason.message)
- } else {
- this.socket.writeEvent('cancel', reason)
- }
-
- this.logger.warn('트렌젝션의 ReadableStream이 Cancel되었습니다. 이유: ', reason)
- }
-
- stop () {
- this.cancel('받는 쪽에서 stop() 메소드를 호출했습니다.')
- }
-
- pause () {
- super.pause()
- this.socket.writeEvent('pause')
- }
-
- resume () {
- super.resume()
- this.socket.writeEvent('resume')
- }
- }