Skip to content

Commit

Permalink
perf: Add RLE encoding (#149)
Browse files Browse the repository at this point in the history
Co-authored-by: Julian Waller <[email protected]>
  • Loading branch information
ianshade and Julusian committed Aug 27, 2023
1 parent 56eb26c commit 4c06389
Show file tree
Hide file tree
Showing 16 changed files with 302 additions and 42 deletions.
16 changes: 12 additions & 4 deletions src/atem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -744,22 +744,30 @@ export class Atem extends BasicAtem {
return this.sendCommand(command)
}

public async uploadStill(index: number, data: Buffer, name: string, description: string): Promise<void> {
public async uploadStill(
index: number,
data: Buffer,
name: string,
description: string,
options?: DT.UploadStillEncodingOptions
): Promise<void> {
if (!this.state) return Promise.reject()
const resolution = Util.getVideoModeInfo(this.state.settings.videoMode)
if (!resolution) return Promise.reject()
return this.dataTransferManager.uploadStill(
index,
Util.convertRGBAToYUV422(resolution.width, resolution.height, data),
name,
description
description,
options
)
}

public async uploadClip(
index: number,
frames: Iterable<Buffer> | AsyncIterable<Buffer>,
name: string
name: string,
options?: DT.UploadStillEncodingOptions
): Promise<void> {
if (!this.state) return Promise.reject()
const resolution = Util.getVideoModeInfo(this.state.settings.videoMode)
Expand All @@ -769,7 +777,7 @@ export class Atem extends BasicAtem {
yield Util.convertRGBAToYUV422(resolution.width, resolution.height, frame)
}
}
return this.dataTransferManager.uploadClip(index, provideFrame(), name)
return this.dataTransferManager.uploadClip(index, provideFrame(), name, options)
}

public async uploadAudio(index: number, data: Buffer, name: string): Promise<void> {
Expand Down
6 changes: 2 additions & 4 deletions src/commands/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ function runTestForCommand(commandParser: CommandParser, i: number, testCase: Te

const versionName = ProtocolVersion[testCase.firstVersion] || `v${testCase.firstVersion}`

// if (testCase.name !== 'DCPC') return
// if (i !== 69) {
// return
// }
// if (testCase.name !== 'FTFD') return
// if (i !== 1673) return

let matchedCase = false
if (cmdConstructor) {
Expand Down
4 changes: 2 additions & 2 deletions src/dataTransfer/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ test('Still upload', async () => {
const newBuffer = Buffer.alloc(1920 * 1080 * 4, 0)

const manager = runDataTransferTest(spec)
await manager.uploadStill(2, newBuffer, 'some still', '')
await manager.uploadStill(2, newBuffer, 'some still', '', { disableRLE: true })

await new Promise((resolve) => setTimeout(resolve, 200))

Expand Down Expand Up @@ -91,7 +91,7 @@ test('clip upload', async () => {
const newBuffer = Buffer.alloc(1920 * 1080 * 4, 0)

const manager = runDataTransferTest(spec)
await manager.uploadClip(1, [newBuffer, newBuffer, newBuffer], 'clip file')
await manager.uploadClip(1, [newBuffer, newBuffer, newBuffer], 'clip file', { disableRLE: true })

await new Promise((resolve) => setTimeout(resolve, 200))

Expand Down
8 changes: 4 additions & 4 deletions src/dataTransfer/__tests__/upload-clip-sequence.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
{
"name": "DataTransferFileDescriptionCommand",
"properties": {
"fileHash": "\u0016V�������\u0016�\u0000��\u0010�",
"fileHash": "Flar6vr+iLSyFuMAorcQxQ==",
"transferId": 0
},
"direction": "send"
Expand Down Expand Up @@ -60037,7 +60037,7 @@
{
"name": "DataTransferFileDescriptionCommand",
"properties": {
"fileHash": "\u0016V�������\u0016�\u0000��\u0010�",
"fileHash": "Flar6vr+iLSyFuMAorcQxQ==",
"transferId": 1
},
"direction": "send"
Expand Down Expand Up @@ -119986,7 +119986,7 @@
{
"name": "DataTransferFileDescriptionCommand",
"properties": {
"fileHash": "\u0016V�������\u0016�\u0000��\u0010�",
"fileHash": "Flar6vr+iLSyFuMAorcQxQ==",
"transferId": 2
},
"direction": "send"
Expand Down Expand Up @@ -179966,4 +179966,4 @@
},
"direction": "recv"
}
]
]
4 changes: 2 additions & 2 deletions src/dataTransfer/__tests__/upload-still-sequence.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"properties": {
"name": "some still",
"description": "",
"fileHash": "\u0016V�������\u0016�\u0000��\u0010�",
"fileHash": "Flar6vr+iLSyFuMAorcQxQ==",
"transferId": 0
},
"direction": "send"
Expand Down Expand Up @@ -59971,4 +59971,4 @@
},
"direction": "recv"
}
]
]
4 changes: 2 additions & 2 deletions src/dataTransfer/__tests__/upload-wav-sequence.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"name": "DataTransferFileDescriptionCommand",
"properties": {
"name": "audio file",
"fileHash": "\u0012ج_�\u0014k����\fRb�P",
"fileHash": "EtisX5MUa4CiofcMUmK6UA==",
"transferId": 0
},
"direction": "send"
Expand Down Expand Up @@ -3044,4 +3044,4 @@
},
"direction": "recv"
}
]
]
6 changes: 5 additions & 1 deletion src/dataTransfer/dataTransferUploadAudio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ export default class DataTransferUploadAudio extends DataTransferUploadBuffer {
readonly #name: string

constructor(clipIndex: number, data: Buffer, name: string) {
super(data)
super({
encodedData: data,
rawDataLength: data.length,
hash: null,
})

this.#clipIndex = clipIndex
this.#name = name
Expand Down
47 changes: 40 additions & 7 deletions src/dataTransfer/dataTransferUploadBuffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,39 @@ import {
import * as crypto from 'crypto'
import { DataTransfer, ProgressTransferResult, DataTransferState } from './dataTransfer'
import debug0 = require('debug')
import * as Util from '../lib/atemUtil'

const debug = debug0('atem-connection:data-transfer:upload-buffer')

export interface UploadBufferInfo {
encodedData: Buffer
rawDataLength: number
hash: string | null
}

export function generateHashForBuffer(data: Buffer): string {
return data ? crypto.createHash('md5').update(data).digest('base64') : ''
}

export function generateBufferInfo(data: Buffer, shouldEncodeRLE: boolean): UploadBufferInfo {
return {
encodedData: shouldEncodeRLE ? Util.encodeRLE(data) : data,
rawDataLength: data.length,
hash: generateHashForBuffer(data),
}
}

export abstract class DataTransferUploadBuffer extends DataTransfer<void> {
protected readonly hash: string
protected readonly data: Buffer

#bytesSent = 0

constructor(data: Buffer) {
constructor(buffer: UploadBufferInfo) {
super()

this.data = data
this.hash = this.data ? crypto.createHash('md5').update(this.data).digest().toString() : ''
this.hash = buffer.hash ?? generateHashForBuffer(buffer.encodedData)
this.data = buffer.encodedData
}

protected abstract generateDescriptionCommand(transferId: number): ISerializableCommand
Expand Down Expand Up @@ -94,19 +113,33 @@ export abstract class DataTransferUploadBuffer extends DataTransfer<void> {
const commands: ISerializableCommand[] = []

// Take a little less because the atem does that?
const chunkSize = props.chunkSize - 4
// const chunkSize = props.chunkSize - 4
const chunkSize = Math.floor(props.chunkSize / 8) * 8

for (let i = 0; i < props.chunkCount; i++) {
// Make sure we don't end up with an empty slice
// Make sure the packet isn't empty
if (this.#bytesSent >= this.data.length) break

// Make sure the packet doesn't end in the middle of a RLE block
let shortenBy = 0
if (chunkSize + this.#bytesSent > this.data.length) {
// The last chunk can't end with a RLE header
shortenBy = this.#bytesSent + chunkSize - this.data.length
} else if (Util.RLE_HEADER === this.data.readBigUint64BE(this.#bytesSent + chunkSize - 8)) {
// RLE header starts 8 bytes before the end
shortenBy = 8
} else if (Util.RLE_HEADER === this.data.readBigUint64BE(this.#bytesSent + chunkSize - 16)) {
// RLE header starts 16 bytes before the end
shortenBy = 16
}

commands.push(
new DataTransferDataCommand({
transferId: props.transferId,
body: this.data.slice(this.#bytesSent, this.#bytesSent + chunkSize),
body: this.data.slice(this.#bytesSent, this.#bytesSent + chunkSize - shortenBy),
})
)
this.#bytesSent += chunkSize
this.#bytesSent += chunkSize - shortenBy
}

debug(`Generated ${commands.length} chunks for size ${chunkSize}`)
Expand Down
12 changes: 7 additions & 5 deletions src/dataTransfer/dataTransferUploadClip.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { IDeserializedCommand, ISerializableCommand } from '../commands/CommandBase'
import { DataTransferFileDescriptionCommand, DataTransferUploadRequestCommand } from '../commands/DataTransfer'
import { DataTransfer, DataTransferState, ProgressTransferResult } from './dataTransfer'
import { DataTransferUploadBuffer } from './dataTransferUploadBuffer'
import { MediaPoolClearClipCommand, MediaPoolSetClipCommand } from '../commands/Media'
import debug0 = require('debug')
import { DataTransferUploadBuffer, UploadBufferInfo } from './dataTransferUploadBuffer'

const debug = debug0('atem-connection:data-transfer:upload-clip')

Expand Down Expand Up @@ -119,21 +119,23 @@ export class DataTransferUploadClip extends DataTransfer<void> {
export class DataTransferUploadClipFrame extends DataTransferUploadBuffer {
readonly #clipIndex: number
readonly #frameIndex: number
readonly #dataLength: number

constructor(clipIndex: number, frameIndex: number, data: Buffer) {
super(data)
constructor(clipIndex: number, frameIndex: number, buffer: UploadBufferInfo) {
super(buffer)

this.#clipIndex = clipIndex
this.#frameIndex = frameIndex
this.#dataLength = buffer.rawDataLength
}

public async startTransfer(transferId: number): Promise<ProgressTransferResult> {
debug(`Start transfer ${transferId} (${this.data.length})`)
debug(`Start transfer ${transferId} (${this.#dataLength})`)
const command = new DataTransferUploadRequestCommand({
transferId: transferId,
transferStoreId: this.#clipIndex + 1,
transferIndex: this.#frameIndex,
size: this.data.length,
size: this.#dataLength,
mode: 1,
})

Expand Down
6 changes: 5 additions & 1 deletion src/dataTransfer/dataTransferUploadMacro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import { DataTransferUploadBuffer } from './dataTransferUploadBuffer'

export class DataTransferUploadMacro extends DataTransferUploadBuffer {
constructor(public readonly macroIndex: number, public readonly data: Buffer, private name: string) {
super(data)
super({
encodedData: data,
rawDataLength: data.length,
hash: null,
})
}

public async startTransfer(transferId: number): Promise<ProgressTransferResult> {
Expand Down
6 changes: 5 additions & 1 deletion src/dataTransfer/dataTransferUploadMultiViewerLabel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ export default class DataTransferUploadMultiViewerLabel extends DataTransferUplo
readonly #sourceId: number

constructor(sourceId: number, data: Buffer) {
super(data)
super({
encodedData: data,
rawDataLength: data.length,
hash: null,
})

this.#sourceId = sourceId
}
Expand Down
10 changes: 6 additions & 4 deletions src/dataTransfer/dataTransferUploadStill.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
import { ISerializableCommand } from '../commands/CommandBase'
import { DataTransferUploadBuffer } from './dataTransferUploadBuffer'
import { DataTransferFileDescriptionCommand, DataTransferUploadRequestCommand } from '../commands/DataTransfer'
import { ProgressTransferResult, DataTransferState } from './dataTransfer'
import { DataTransferUploadBuffer, UploadBufferInfo } from './dataTransferUploadBuffer'

export default class DataTransferUploadStill extends DataTransferUploadBuffer {
readonly #stillIndex: number
readonly #name: string
readonly #description: string
readonly #dataLength: number

constructor(stillIndex: number, data: Buffer, name: string, description: string) {
super(data)
constructor(stillIndex: number, buffer: UploadBufferInfo, name: string, description: string) {
super(buffer)

this.#stillIndex = stillIndex
this.#name = name
this.#description = description
this.#dataLength = buffer.rawDataLength
}

public async startTransfer(transferId: number): Promise<ProgressTransferResult> {
const command = new DataTransferUploadRequestCommand({
transferId: transferId,
transferStoreId: 0,
transferIndex: this.#stillIndex,
size: this.data.length,
size: this.#dataLength,
mode: 1,
})

Expand Down
24 changes: 19 additions & 5 deletions src/dataTransfer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ import { DataTransferDownloadMacro } from './dataTransferDownloadMacro'
import { DataTransferUploadMacro } from './dataTransferUploadMacro'
import { LockObtainedCommand, LockStateUpdateCommand } from '../commands/DataTransfer'
import debug0 from 'debug'
import { generateBufferInfo } from './dataTransferUploadBuffer'

const MAX_PACKETS_TO_SEND_PER_TICK = 10
const MAX_PACKETS_TO_SEND_PER_TICK = 50
const MAX_TRANSFER_INDEX = (1 << 16) - 1 // Inclusive maximum

const debug = debug0('atem-connection:data-transfer:manager')

export interface UploadStillEncodingOptions {
disableRLE?: boolean
}

export class DataTransferManager {
#nextTransferIdInner = 0
readonly #nextTransferId = (): number => {
Expand Down Expand Up @@ -165,21 +170,30 @@ export class DataTransferManager {
}
}

public async uploadStill(index: number, data: Buffer, name: string, description: string): Promise<void> {
const transfer = new DataTransferUploadStill(index, data, name, description)
public async uploadStill(
index: number,
data: Buffer,
name: string,
description: string,
options?: UploadStillEncodingOptions
): Promise<void> {
const buffer = generateBufferInfo(data, !options?.disableRLE)
const transfer = new DataTransferUploadStill(index, buffer, name, description)
return this.#stillsLock.enqueue(transfer)
}

public async uploadClip(
index: number,
data: Iterable<Buffer> | AsyncIterable<Buffer>,
name: string
name: string,
options?: UploadStillEncodingOptions
): Promise<void> {
const provideFrame = async function* (): AsyncGenerator<DataTransferUploadClipFrame, undefined> {
let id = -1
for await (const frame of data) {
id++
yield new DataTransferUploadClipFrame(index, id, frame)
const buffer = generateBufferInfo(frame, !options?.disableRLE)
yield new DataTransferUploadClipFrame(index, id, buffer)
}
return undefined
}
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ import * as InputState from './state/input'
import * as MacroState from './state/macro'
import * as SettingsState from './state/settings'
export { VideoState, AudioState, MediaState, InfoState, InputState, MacroState, SettingsState }
export { UploadStillEncodingOptions } from './dataTransfer'
Loading

0 comments on commit 4c06389

Please sign in to comment.