js/WritableTransaction.js
import Transaction from './Transaction.js'
import ChunkProducer from './ChunkProducer.js'
import { ObservableEntry, wait, waitAll } from './util/ObservableEntry.js'
// 한번에 큰 arraybuffer를 전송시에도 채널이 터질 수 있음
// 따라서 데이터를 청크로 끊어서 보내야 함
const CHUNK_SIZE = 200 * 1024 // 200KB
/*
전체 파이프 구조:
source -> chunkingStream -> writable --- socket --- readable -> destination
- 전송 완료: writable close 불림 -> socket에서 done 이벤트 발생 -> readable에서 데이터 채널 close 이벤트 기다림 -> 스트림 닫기
-> 데이터 채널 닫음(done 이벤트와 바이너리 데이터의 전송 순서는 지켜지지 않으므로 readable에서 닫을 수 없음)
- 보내는 쪽에서 중단할때: writable abort 불림 -> socket에서 abort 이벤트 발생 -> readable에서 에러 발생시키고 소켓 닫음
- 받는 쪽에서 중단할때: readable cancel 불림 -> socket에서 cancel 이벤트 발생 -> writable에서 에러 발생시키고 소켓 닫음
- 일시정지(readable): readable에서 'pause', 'resume' 이벤트 발생 -> writable에서 받아서 흐름 조절
*/
// /**
// * @typedef {import('./RTCSocket.js').default} RTCSocket
// */
export default class WritableTransaction extends Transaction {
/**
* 트렌젝션을 만듭니다.
* @param {RTCSocket} socket 데이터 전송에 사용할 RTCSocket
* @param {object} [metadata] 상대에게 전송할 메타데이터. 트렌젝션이 만들어진 후 `metadata` 속성으로 읽을 수 있습니다. Progress Tracking을 사용하려면 `size` 속성이 필요합니다. 그 이외의 속성은 임의로 추가할 수 있습니다.
* @param {number} [metadata.size] 바이트로 나타낸 트렌젝션의 크기.
*/
constructor (socket, metadata = {}) {
super(socket, metadata)
this.readableBufferFull = new ObservableEntry(false)
this.aborted = false
this.canceled = false
const writable = new WritableStream({
start: controller => {
const isDone = () => this.processed.get() === metadata.size
// cancel 이벤트 오면 에러 발생시켜서 스트림을 멈춤
socket.once('cancel', errMsg => {
this.canceled = true
socket.close()
controller.error(new Error('상대방이 트렌젝션의 ReadableStream을 Cancel했습니다.' + errMsg))
})
socket.once('close', () => {
this.stopReport()
if (this.aborted || this.canceled || isDone()) return
controller.error(new Error('예상치 못하게 소켓이 닫혔습니다.'))
})
// readable측에서 요청하는 pause / resume 이벤트 받기
socket.on('pause', () => {
this.logger.debug('상대방이 트렌젝션 일시중지를 요청했습니다.')
super.pause()
})
socket.on('resume', () => {
this.logger.debug('상대방이 트렌젝션 재시작을 요청했습니다.')
super.resume()
})
socket.on('buffer-full', () => {
this.logger.debug('상대방의 스트림 버퍼가 가득 찼습니다.')
this.readableBufferFull.set(true)
})
socket.on('pull', () => {
this.logger.debug('상대방의 스트림 버퍼에 공간이 확보되었습니다.')
this.readableBufferFull.set(false)
})
},
/**
* @param {Uint8Array} data
*/
write: async data => {
// 일시정지 기능
if (this.paused.get() || this.readableBufferFull.get()) {
await waitAll(wait => {
wait(this.paused).toBe(false)
wait(this.readableBufferFull).toBe(false)
})
}
await socket.write(data.buffer)
await wait(socket.ready).toBe(true)
this.processed.set(this.processed.get() + data.length)
},
close: async () => {
// 여기는 위 write가 완료되어야 호출되므로 일단 모든 메시지가 데이터 채널의 버퍼로 들어간 상태
// 데이터 채널의 버퍼가 비면 닫기(close() 시 버퍼에 있는 메시지는 전송될지 확신할 수 없음)
// if (socket.dataChannel.bufferedAmount > 0) {
// console.log(`[Transaction:${this.label}] 소켓이 닫히기를 기다리는 중`)
// socket.dataChannel.bufferedAmountLowThreshold = 0
// await once(socket.dataChannel, 'bufferedamountlow')
// }
// // 전송 완료 이벤트 전달
// // ready-to-close 이벤트를 받는 이유: 그냥 닫으면 done 이벤트가 아에 전송이 안되는 경우가 발생
// socket.writeEvent('done')
// await once(socket, 'ready-to-close')
// socket.close()
this.done.set(true)
this.logger.log('⚡ 전송이 완료되었습니다.')
},
// abort되면 abort 이벤트 전달
abort: reason => {
this.aborted = true
if (reason instanceof Error) {
socket.writeEvent('abort', reason.message)
} else {
socket.writeEvent('abort', reason)
}
this.logger.warn('트렌젝션의 WritableStream이 Abort되었습니다. 이유: ', reason)
}
})
this.abortController = new AbortController()
// 이렇게 하면 pipeTo(transactionWriter.stream)처럼 사용 가능
// 뒤의 catch()문은 abort시 에러가 두군데에서 발생하는데(여기와 this.stream에 pipeTo 한 부분)
// 여기서 에러가 발생하지 않게 하기 위한 것임
// eslint-disable-next-line no-undef
const chunkingStream = new TransformStream(new ChunkProducer(CHUNK_SIZE))
chunkingStream.readable.pipeTo(writable, { signal: this.abortController.signal }).catch(() => {})
this.stream = chunkingStream.writable
}
/**
* 트렌젝션을 중지합니다.
*/
stop () {
// stream의 write 메소드가 resolve 되어야지 abort가 정상적으로 처리됨
// 따라서 강제로 상태 업데이트
this.paused.set(false)
this.readableBufferFull = false
this.abortController.abort('보내는 쪽에서 stop() 메소드를 호출했습니다.')
}
pause () {
super.pause()
this.socket.writeEvent('pause')
}
resume () {
super.resume()
this.socket.writeEvent('resume')
}
}