diff --git a/src/atem.ts b/src/atem.ts index 404139f1f..5294508a4 100644 --- a/src/atem.ts +++ b/src/atem.ts @@ -744,7 +744,13 @@ export class Atem extends BasicAtem { return this.sendCommand(command) } - public async uploadStill(index: number, data: Buffer, name: string, description: string): Promise { + public async uploadStill( + index: number, + data: Buffer, + name: string, + description: string, + options?: DT.UploadStillEncodingOptions + ): Promise { if (!this.state) return Promise.reject() const resolution = Util.getVideoModeInfo(this.state.settings.videoMode) if (!resolution) return Promise.reject() @@ -752,14 +758,16 @@ export class Atem extends BasicAtem { index, Util.convertRGBAToYUV422(resolution.width, resolution.height, data), name, - description + description, + options ) } public async uploadClip( index: number, frames: Iterable | AsyncIterable, - name: string + name: string, + options?: DT.UploadStillEncodingOptions ): Promise { if (!this.state) return Promise.reject() const resolution = Util.getVideoModeInfo(this.state.settings.videoMode) @@ -769,7 +777,7 @@ export class Atem extends BasicAtem { yield Util.convertRGBAToYUV422(resolution.width, resolution.height, frame) } } - return this.dataTransferManager.uploadClip(index, provideFrame(), name) + return this.dataTransferManager.uploadClip(index, provideFrame(), name, options) } public async uploadAudio(index: number, data: Buffer, name: string): Promise { diff --git a/src/commands/__tests__/index.spec.ts b/src/commands/__tests__/index.spec.ts index 10a98748b..acbbab28e 100644 --- a/src/commands/__tests__/index.spec.ts +++ b/src/commands/__tests__/index.spec.ts @@ -43,10 +43,8 @@ function runTestForCommand(commandParser: CommandParser, i: number, testCase: Te const versionName = ProtocolVersion[testCase.firstVersion] || `v${testCase.firstVersion}` - // if (testCase.name !== 'DCPC') return - // if (i !== 69) { - // return - // } + // if (testCase.name !== 'FTFD') return + // if (i !== 1673) return let matchedCase = false if (cmdConstructor) { diff --git a/src/dataTransfer/__tests__/index.spec.ts b/src/dataTransfer/__tests__/index.spec.ts index 8fc7293ea..bd8db2a8a 100644 --- a/src/dataTransfer/__tests__/index.spec.ts +++ b/src/dataTransfer/__tests__/index.spec.ts @@ -63,7 +63,7 @@ test('Still upload', async () => { const newBuffer = Buffer.alloc(1920 * 1080 * 4, 0) const manager = runDataTransferTest(spec) - await manager.uploadStill(2, newBuffer, 'some still', '') + await manager.uploadStill(2, newBuffer, 'some still', '', { disableRLE: true }) await new Promise((resolve) => setTimeout(resolve, 200)) @@ -91,7 +91,7 @@ test('clip upload', async () => { const newBuffer = Buffer.alloc(1920 * 1080 * 4, 0) const manager = runDataTransferTest(spec) - await manager.uploadClip(1, [newBuffer, newBuffer, newBuffer], 'clip file') + await manager.uploadClip(1, [newBuffer, newBuffer, newBuffer], 'clip file', { disableRLE: true }) await new Promise((resolve) => setTimeout(resolve, 200)) diff --git a/src/dataTransfer/__tests__/upload-clip-sequence.json b/src/dataTransfer/__tests__/upload-clip-sequence.json index b8647eca4..56157030d 100644 --- a/src/dataTransfer/__tests__/upload-clip-sequence.json +++ b/src/dataTransfer/__tests__/upload-clip-sequence.json @@ -52,7 +52,7 @@ { "name": "DataTransferFileDescriptionCommand", "properties": { - "fileHash": "\u0016V�������\u0016�\u0000��\u0010�", + "fileHash": "Flar6vr+iLSyFuMAorcQxQ==", "transferId": 0 }, "direction": "send" @@ -60037,7 +60037,7 @@ { "name": "DataTransferFileDescriptionCommand", "properties": { - "fileHash": "\u0016V�������\u0016�\u0000��\u0010�", + "fileHash": "Flar6vr+iLSyFuMAorcQxQ==", "transferId": 1 }, "direction": "send" @@ -119986,7 +119986,7 @@ { "name": "DataTransferFileDescriptionCommand", "properties": { - "fileHash": "\u0016V�������\u0016�\u0000��\u0010�", + "fileHash": "Flar6vr+iLSyFuMAorcQxQ==", "transferId": 2 }, "direction": "send" @@ -179966,4 +179966,4 @@ }, "direction": "recv" } -] \ No newline at end of file +] diff --git a/src/dataTransfer/__tests__/upload-still-sequence.json b/src/dataTransfer/__tests__/upload-still-sequence.json index 4547cbc03..5f9a036fc 100644 --- a/src/dataTransfer/__tests__/upload-still-sequence.json +++ b/src/dataTransfer/__tests__/upload-still-sequence.json @@ -47,7 +47,7 @@ "properties": { "name": "some still", "description": "", - "fileHash": "\u0016V�������\u0016�\u0000��\u0010�", + "fileHash": "Flar6vr+iLSyFuMAorcQxQ==", "transferId": 0 }, "direction": "send" @@ -59971,4 +59971,4 @@ }, "direction": "recv" } -] \ No newline at end of file +] diff --git a/src/dataTransfer/__tests__/upload-wav-sequence.json b/src/dataTransfer/__tests__/upload-wav-sequence.json index ecd53bbda..b46bd6308 100644 --- a/src/dataTransfer/__tests__/upload-wav-sequence.json +++ b/src/dataTransfer/__tests__/upload-wav-sequence.json @@ -46,7 +46,7 @@ "name": "DataTransferFileDescriptionCommand", "properties": { "name": "audio file", - "fileHash": "\u0012ج_�\u0014k����\fRb�P", + "fileHash": "EtisX5MUa4CiofcMUmK6UA==", "transferId": 0 }, "direction": "send" @@ -3044,4 +3044,4 @@ }, "direction": "recv" } -] \ No newline at end of file +] diff --git a/src/dataTransfer/dataTransferUploadAudio.ts b/src/dataTransfer/dataTransferUploadAudio.ts index 4582fa3fe..b356cda6e 100644 --- a/src/dataTransfer/dataTransferUploadAudio.ts +++ b/src/dataTransfer/dataTransferUploadAudio.ts @@ -8,7 +8,11 @@ export default class DataTransferUploadAudio extends DataTransferUploadBuffer { readonly #name: string constructor(clipIndex: number, data: Buffer, name: string) { - super(data) + super({ + encodedData: data, + rawDataLength: data.length, + hash: null, + }) this.#clipIndex = clipIndex this.#name = name diff --git a/src/dataTransfer/dataTransferUploadBuffer.ts b/src/dataTransfer/dataTransferUploadBuffer.ts index fcae388cb..be33de037 100644 --- a/src/dataTransfer/dataTransferUploadBuffer.ts +++ b/src/dataTransfer/dataTransferUploadBuffer.ts @@ -10,20 +10,39 @@ import { import * as crypto from 'crypto' import { DataTransfer, ProgressTransferResult, DataTransferState } from './dataTransfer' import debug0 = require('debug') +import * as Util from '../lib/atemUtil' const debug = debug0('atem-connection:data-transfer:upload-buffer') +export interface UploadBufferInfo { + encodedData: Buffer + rawDataLength: number + hash: string | null +} + +export function generateHashForBuffer(data: Buffer): string { + return data ? crypto.createHash('md5').update(data).digest('base64') : '' +} + +export function generateBufferInfo(data: Buffer, shouldEncodeRLE: boolean): UploadBufferInfo { + return { + encodedData: shouldEncodeRLE ? Util.encodeRLE(data) : data, + rawDataLength: data.length, + hash: generateHashForBuffer(data), + } +} + export abstract class DataTransferUploadBuffer extends DataTransfer { protected readonly hash: string protected readonly data: Buffer #bytesSent = 0 - constructor(data: Buffer) { + constructor(buffer: UploadBufferInfo) { super() - this.data = data - this.hash = this.data ? crypto.createHash('md5').update(this.data).digest().toString() : '' + this.hash = buffer.hash ?? generateHashForBuffer(buffer.encodedData) + this.data = buffer.encodedData } protected abstract generateDescriptionCommand(transferId: number): ISerializableCommand @@ -94,19 +113,33 @@ export abstract class DataTransferUploadBuffer extends DataTransfer { const commands: ISerializableCommand[] = [] // Take a little less because the atem does that? - const chunkSize = props.chunkSize - 4 + // const chunkSize = props.chunkSize - 4 + const chunkSize = Math.floor(props.chunkSize / 8) * 8 for (let i = 0; i < props.chunkCount; i++) { - // Make sure we don't end up with an empty slice + // Make sure the packet isn't empty if (this.#bytesSent >= this.data.length) break + // Make sure the packet doesn't end in the middle of a RLE block + let shortenBy = 0 + if (chunkSize + this.#bytesSent > this.data.length) { + // The last chunk can't end with a RLE header + shortenBy = this.#bytesSent + chunkSize - this.data.length + } else if (Util.RLE_HEADER === this.data.readBigUint64BE(this.#bytesSent + chunkSize - 8)) { + // RLE header starts 8 bytes before the end + shortenBy = 8 + } else if (Util.RLE_HEADER === this.data.readBigUint64BE(this.#bytesSent + chunkSize - 16)) { + // RLE header starts 16 bytes before the end + shortenBy = 16 + } + commands.push( new DataTransferDataCommand({ transferId: props.transferId, - body: this.data.slice(this.#bytesSent, this.#bytesSent + chunkSize), + body: this.data.slice(this.#bytesSent, this.#bytesSent + chunkSize - shortenBy), }) ) - this.#bytesSent += chunkSize + this.#bytesSent += chunkSize - shortenBy } debug(`Generated ${commands.length} chunks for size ${chunkSize}`) diff --git a/src/dataTransfer/dataTransferUploadClip.ts b/src/dataTransfer/dataTransferUploadClip.ts index f50e7cd32..d9edae3c4 100644 --- a/src/dataTransfer/dataTransferUploadClip.ts +++ b/src/dataTransfer/dataTransferUploadClip.ts @@ -1,9 +1,9 @@ import { IDeserializedCommand, ISerializableCommand } from '../commands/CommandBase' import { DataTransferFileDescriptionCommand, DataTransferUploadRequestCommand } from '../commands/DataTransfer' import { DataTransfer, DataTransferState, ProgressTransferResult } from './dataTransfer' -import { DataTransferUploadBuffer } from './dataTransferUploadBuffer' import { MediaPoolClearClipCommand, MediaPoolSetClipCommand } from '../commands/Media' import debug0 = require('debug') +import { DataTransferUploadBuffer, UploadBufferInfo } from './dataTransferUploadBuffer' const debug = debug0('atem-connection:data-transfer:upload-clip') @@ -119,21 +119,23 @@ export class DataTransferUploadClip extends DataTransfer { export class DataTransferUploadClipFrame extends DataTransferUploadBuffer { readonly #clipIndex: number readonly #frameIndex: number + readonly #dataLength: number - constructor(clipIndex: number, frameIndex: number, data: Buffer) { - super(data) + constructor(clipIndex: number, frameIndex: number, buffer: UploadBufferInfo) { + super(buffer) this.#clipIndex = clipIndex this.#frameIndex = frameIndex + this.#dataLength = buffer.rawDataLength } public async startTransfer(transferId: number): Promise { - debug(`Start transfer ${transferId} (${this.data.length})`) + debug(`Start transfer ${transferId} (${this.#dataLength})`) const command = new DataTransferUploadRequestCommand({ transferId: transferId, transferStoreId: this.#clipIndex + 1, transferIndex: this.#frameIndex, - size: this.data.length, + size: this.#dataLength, mode: 1, }) diff --git a/src/dataTransfer/dataTransferUploadMacro.ts b/src/dataTransfer/dataTransferUploadMacro.ts index a8f5269c6..d8a641709 100644 --- a/src/dataTransfer/dataTransferUploadMacro.ts +++ b/src/dataTransfer/dataTransferUploadMacro.ts @@ -5,7 +5,11 @@ import { DataTransferUploadBuffer } from './dataTransferUploadBuffer' export class DataTransferUploadMacro extends DataTransferUploadBuffer { constructor(public readonly macroIndex: number, public readonly data: Buffer, private name: string) { - super(data) + super({ + encodedData: data, + rawDataLength: data.length, + hash: null, + }) } public async startTransfer(transferId: number): Promise { diff --git a/src/dataTransfer/dataTransferUploadMultiViewerLabel.ts b/src/dataTransfer/dataTransferUploadMultiViewerLabel.ts index 329dcec4e..d313dfe29 100644 --- a/src/dataTransfer/dataTransferUploadMultiViewerLabel.ts +++ b/src/dataTransfer/dataTransferUploadMultiViewerLabel.ts @@ -7,7 +7,11 @@ export default class DataTransferUploadMultiViewerLabel extends DataTransferUplo readonly #sourceId: number constructor(sourceId: number, data: Buffer) { - super(data) + super({ + encodedData: data, + rawDataLength: data.length, + hash: null, + }) this.#sourceId = sourceId } diff --git a/src/dataTransfer/dataTransferUploadStill.ts b/src/dataTransfer/dataTransferUploadStill.ts index 7aa24ead6..4ae3e7fbc 100644 --- a/src/dataTransfer/dataTransferUploadStill.ts +++ b/src/dataTransfer/dataTransferUploadStill.ts @@ -1,19 +1,21 @@ import { ISerializableCommand } from '../commands/CommandBase' -import { DataTransferUploadBuffer } from './dataTransferUploadBuffer' import { DataTransferFileDescriptionCommand, DataTransferUploadRequestCommand } from '../commands/DataTransfer' import { ProgressTransferResult, DataTransferState } from './dataTransfer' +import { DataTransferUploadBuffer, UploadBufferInfo } from './dataTransferUploadBuffer' export default class DataTransferUploadStill extends DataTransferUploadBuffer { readonly #stillIndex: number readonly #name: string readonly #description: string + readonly #dataLength: number - constructor(stillIndex: number, data: Buffer, name: string, description: string) { - super(data) + constructor(stillIndex: number, buffer: UploadBufferInfo, name: string, description: string) { + super(buffer) this.#stillIndex = stillIndex this.#name = name this.#description = description + this.#dataLength = buffer.rawDataLength } public async startTransfer(transferId: number): Promise { @@ -21,7 +23,7 @@ export default class DataTransferUploadStill extends DataTransferUploadBuffer { transferId: transferId, transferStoreId: 0, transferIndex: this.#stillIndex, - size: this.data.length, + size: this.#dataLength, mode: 1, }) diff --git a/src/dataTransfer/index.ts b/src/dataTransfer/index.ts index d0e65f2c9..a62716b59 100644 --- a/src/dataTransfer/index.ts +++ b/src/dataTransfer/index.ts @@ -10,12 +10,17 @@ import { DataTransferDownloadMacro } from './dataTransferDownloadMacro' import { DataTransferUploadMacro } from './dataTransferUploadMacro' import { LockObtainedCommand, LockStateUpdateCommand } from '../commands/DataTransfer' import debug0 from 'debug' +import { generateBufferInfo } from './dataTransferUploadBuffer' -const MAX_PACKETS_TO_SEND_PER_TICK = 10 +const MAX_PACKETS_TO_SEND_PER_TICK = 50 const MAX_TRANSFER_INDEX = (1 << 16) - 1 // Inclusive maximum const debug = debug0('atem-connection:data-transfer:manager') +export interface UploadStillEncodingOptions { + disableRLE?: boolean +} + export class DataTransferManager { #nextTransferIdInner = 0 readonly #nextTransferId = (): number => { @@ -165,21 +170,30 @@ export class DataTransferManager { } } - public async uploadStill(index: number, data: Buffer, name: string, description: string): Promise { - const transfer = new DataTransferUploadStill(index, data, name, description) + public async uploadStill( + index: number, + data: Buffer, + name: string, + description: string, + options?: UploadStillEncodingOptions + ): Promise { + const buffer = generateBufferInfo(data, !options?.disableRLE) + const transfer = new DataTransferUploadStill(index, buffer, name, description) return this.#stillsLock.enqueue(transfer) } public async uploadClip( index: number, data: Iterable | AsyncIterable, - name: string + name: string, + options?: UploadStillEncodingOptions ): Promise { const provideFrame = async function* (): AsyncGenerator { let id = -1 for await (const frame of data) { id++ - yield new DataTransferUploadClipFrame(index, id, frame) + const buffer = generateBufferInfo(frame, !options?.disableRLE) + yield new DataTransferUploadClipFrame(index, id, buffer) } return undefined } diff --git a/src/index.ts b/src/index.ts index 4ca875f66..8475e9859 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,3 +15,4 @@ import * as InputState from './state/input' import * as MacroState from './state/macro' import * as SettingsState from './state/settings' export { VideoState, AudioState, MediaState, InfoState, InputState, MacroState, SettingsState } +export { UploadStillEncodingOptions } from './dataTransfer' diff --git a/src/lib/__tests__/atemUtil.spec.ts b/src/lib/__tests__/atemUtil.spec.ts new file mode 100644 index 000000000..6a72bc583 --- /dev/null +++ b/src/lib/__tests__/atemUtil.spec.ts @@ -0,0 +1,136 @@ +import { encodeRLE } from '../atemUtil' + +describe('RLE', () => { + test('no repetitions', () => { + const source = `abababababababab\ +cdcdcdcdcdcdcdcd\ +abababababababab\ +cdcdcdcdcdcdcdcd` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(source) + }) + + test('two repetitions', () => { + const source = `abababababababab\ +abababababababab\ +cdcdcdcdcdcdcdcd\ +0000000000000000\ +1111111111111111` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(source) + }) + + test('three repetitions', () => { + const source = `abababababababab\ +abababababababab\ +abababababababab\ +cdcdcdcdcdcdcdcd\ +0000000000000000\ +1111111111111111` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(source) + }) + + test('four repetitions', () => { + const source = `abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +cdcdcdcdcdcdcdcd\ +0000000000000000\ +1111111111111111` + const expectation = `fefefefefefefefe\ +0000000000000004\ +abababababababab\ +cdcdcdcdcdcdcdcd\ +0000000000000000\ +1111111111111111` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(expectation) + }) + + test('five repetitions at the beginning', () => { + const source = `abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +cdcdcdcdcdcdcdcd\ +0000000000000000\ +1111111111111111` + const expectation = `fefefefefefefefe\ +0000000000000005\ +abababababababab\ +cdcdcdcdcdcdcdcd\ +0000000000000000\ +1111111111111111` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(expectation) + }) + + test('five repetitions in the midddle', () => { + const source = `2323232323232323\ +abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +cdcdcdcdcdcdcdcd\ +0000000000000000\ +1111111111111111` + const expectation = `2323232323232323\ +fefefefefefefefe\ +0000000000000005\ +abababababababab\ +cdcdcdcdcdcdcdcd\ +0000000000000000\ +1111111111111111` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(expectation) + }) + + test('five repetitions in the midddle #2', () => { + const source = `2323232323232323\ +abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +cdcdcdcdcdcdcdcd` + const expectation = `2323232323232323\ +fefefefefefefefe\ +0000000000000005\ +abababababababab\ +cdcdcdcdcdcdcdcd` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(expectation) + }) + + test('five repetitions at the end', () => { + const source = `2323232323232323\ +abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +abababababababab` + const expectation = `2323232323232323\ +fefefefefefefefe\ +0000000000000005\ +abababababababab` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(expectation) + }) + + test('only five repetitions', () => { + const source = `abababababababab\ +abababababababab\ +abababababababab\ +abababababababab\ +abababababababab` + const expectation = `fefefefefefefefe\ +0000000000000005\ +abababababababab` + const encoded = encodeRLE(Buffer.from(source, 'hex')) + expect(encoded.toString('hex')).toEqual(expectation) + }) +}) diff --git a/src/lib/atemUtil.ts b/src/lib/atemUtil.ts index 4b9fee164..9664bd256 100644 --- a/src/lib/atemUtil.ts +++ b/src/lib/atemUtil.ts @@ -69,6 +69,60 @@ export function convertRGBAToYUV422(width: number, height: number, data: Buffer) return buffer } +export const RLE_HEADER = 0xfefefefefefefefen + +export function encodeRLE(data: Buffer): Buffer { + const result = Buffer.alloc(data.length) + let lastBlock = data.readBigUInt64BE() + let identicalCount = 0 + let differentCount = 0 + let resultOffset = -8 + + for (let sourceOffset = 8; sourceOffset < data.length; sourceOffset += 8) { + const block = data.readBigUInt64BE(sourceOffset) + + if (block === lastBlock) { + ++identicalCount + if (differentCount) { + data.copy(result, resultOffset + 8, sourceOffset - 8 * (differentCount + 1), sourceOffset - 8) + resultOffset += differentCount * 8 + differentCount = 0 + } + lastBlock = block + continue + } + if (identicalCount > 2) { + result.writeBigUInt64BE(RLE_HEADER, (resultOffset += 8)) + result.writeBigUInt64BE(BigInt(identicalCount + 1), (resultOffset += 8)) + result.writeBigUInt64BE(lastBlock, (resultOffset += 8)) + } else if (identicalCount > 0) { + for (let i = 0; i <= identicalCount; ++i) { + result.writeBigUInt64BE(lastBlock, (resultOffset += 8)) + } + } else { + ++differentCount + } + lastBlock = block + identicalCount = 0 + } + + if (identicalCount > 2) { + result.writeBigUInt64BE(RLE_HEADER, (resultOffset += 8)) + result.writeBigUInt64BE(BigInt(identicalCount + 1), (resultOffset += 8)) + result.writeBigUInt64BE(lastBlock, (resultOffset += 8)) + } else if (identicalCount > 0) { + for (let i = 0; i <= identicalCount; ++i) { + result.writeBigUInt64BE(lastBlock, (resultOffset += 8)) + } + } else { + ++differentCount + data.copy(result, resultOffset + 8, data.length - 8 * differentCount, data.length) + resultOffset += differentCount * 8 + } + + return result.slice(0, resultOffset + 8) +} + export interface VideoModeInfo { format: Enums.VideoFormat width: number