Skip to content

Commit

Permalink
FAI-11030: Re-fetch additional fields when config is updated (#1518)
Browse files Browse the repository at this point in the history
* Sync additional fields stream when config is updated

* Move additional fields query and update to issues stream and augment state

* remove unused import

* Removed sync additional flags and simplified additional field syncs logic

* some nits

* nits + docs

* snap

---------

Co-authored-by: Yandry Perez Clemente <[email protected]>
  • Loading branch information
matiaslcoulougian and ypc-faros committed Jun 14, 2024
1 parent 6e9bb1b commit bfca372
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import {AirbyteRecord} from 'faros-airbyte-cdk';
import {IssueCompact} from 'faros-airbyte-common/lib/jira';
import {isString} from 'lodash';
import {Dictionary} from 'ts-essentials';

import {Converter, StreamContext} from '../converter';
import {Converter, DestinationRecord, StreamContext} from '../converter';

export interface SprintIssue {
id: number;
Expand Down Expand Up @@ -76,4 +77,21 @@ export abstract class JiraConverter extends Converter {
protected useBoardOwnership(ctx: StreamContext): boolean {
return this.jiraConfig(ctx).use_board_ownership ?? false;
}

protected convertAdditionalFieldsIssue(
issue: IssueCompact
): DestinationRecord {
const additionalFields: any[] = [];
for (const [name, value] of issue.additionalFields) {
additionalFields.push({name, value});
}
return {
model: 'tms_Task__Update',
record: {
where: {uid: issue.key, source: this.source},
mask: ['additionalFields'],
patch: {additionalFields},
},
};
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {AirbyteRecord} from 'faros-airbyte-cdk';
import {IssueCompact} from 'faros-airbyte-common/jira';

import {DestinationModel, DestinationRecord, StreamContext} from '../converter';
import {JiraConverter} from './common';
Expand All @@ -10,21 +11,7 @@ export class FarosIssueAdditionalFields extends JiraConverter {
record: AirbyteRecord,
ctx: StreamContext
): Promise<ReadonlyArray<DestinationRecord>> {
const issue = record.record.data;
const additionalFields: any[] = [];
for (const [name, value] of issue.additionalFields) {
additionalFields.push({name, value});
}

return [
{
model: 'tms_Task__Update',
record: {
where: {uid: issue.key, source: this.source},
mask: ['additionalFields'],
patch: {additionalFields},
},
},
];
const issue = record.record.data as IssueCompact;
return [this.convertAdditionalFieldsIssue(issue)];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export class FarosIssues extends JiraConverter {
const source = this.streamName.source;
const results: DestinationRecord[] = [];

if (issue.updateAdditionalFields) {
return [this.convertAdditionalFieldsIssue(issue)];
}

// For next-gen projects, epic should be parent of issue with issue
// type Epic otherwise use the epic key from custom field in the issue
const epicKey =
Expand Down
2 changes: 1 addition & 1 deletion faros-airbyte-common/src/jira/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface IssueCompact {
readonly updated?: Date;
readonly boardId?: string;
readonly additionalFields?: ReadonlyArray<[string, string]>;
readonly updateAdditionalFields?: boolean;
}

export interface Issue extends IssueCompact {
Expand All @@ -31,7 +32,6 @@ export interface Issue extends IssueCompact {
readonly points?: number;
readonly epic?: string;
readonly sprintInfo?: SprintInfo;
readonly additionalFields: ReadonlyArray<[string, string]>;
readonly url: string;
readonly resolution: string;
readonly resolutionDate: Date;
Expand Down
12 changes: 3 additions & 9 deletions sources/jira-source/resources/spec.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,16 @@
"description": "Personal access token. See https://confluence.atlassian.com/enterprise/using-personal-access-tokens-1026032365.html for more information.",
"airbyte_secret": true
},
"sync_additional_fields": {
"order": 5,
"type": "boolean",
"title": "Sync Additional Fields",
"description": "Include additional fields on tasks.",
"default": false
},
"additional_fields": {
"order": 6,
"order": 5,
"type": "array",
"title": "Additional Fields",
"items": {
"type": "string"
},
"description": "Fields to include on tasks when sync of additional fields is enabled (defaults to all fields).",
"description": "Fields to include on tasks. Use '*' to include all fields.",
"examples": [
"*",
"Last Viewed",
"Priority"
]
Expand Down
9 changes: 3 additions & 6 deletions sources/jira-source/src/jira.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ export interface JiraConfig extends AirbyteConfig {
readonly username?: string;
readonly password?: string;
readonly token?: string;
readonly sync_additional_fields?: boolean;
readonly additional_fields?: ReadonlyArray<string>;
readonly additional_fields_array_limit?: number;
readonly reject_unauthorized?: boolean;
Expand Down Expand Up @@ -229,11 +228,8 @@ export class Jira {
logger: logger,
});

const addAllFields =
cfg.sync_additional_fields && (cfg.additional_fields ?? []).length === 0;
const additionalFields = new Set(
cfg.sync_additional_fields ? cfg.additional_fields ?? [] : []
);
const addAllFields = cfg.additional_fields?.includes('*');
const additionalFields = new Set(cfg.additional_fields ?? []);
// We always add the following custom fields since they
// are promoted to standard fields
additionalFields.add(DEV_FIELD_NAME);
Expand Down Expand Up @@ -800,6 +796,7 @@ export class Jira {
for await (const issue of issues) {
yield {
key: issue.key,
updated: issue.updated,
additionalFields: issueTransformer.extractAdditionalFields(issue),
};
}
Expand Down
14 changes: 13 additions & 1 deletion sources/jira-source/src/streams/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,19 @@ export type BoardStreamSlice = {
};

export type StreamState = {
readonly [projectOrBoard: string]: {cutoff: number};
readonly [projectOrBoard: string]: {
cutoff: number;
};
};

export type IssueStreamState = {
readonly [projectOrBoard: string]: {
cutoff: number;
additionalFields?: ReadonlyArray<string>;
// The timestamp of the earliest issue update we've seen so far
// This is how far back we need to go to update the additional fields for the issues we've already fetched
earliestIssueUpdateTimestamp?: number;
};
};

export enum RunMode {
Expand Down
74 changes: 59 additions & 15 deletions sources/jira-source/src/streams/faros_issues.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import {StreamKey, SyncMode} from 'faros-airbyte-cdk';
import {Issue} from 'faros-airbyte-common/jira';
import {Issue, IssueCompact} from 'faros-airbyte-common/jira';
import {Utils} from 'faros-js-client';
import {omit} from 'lodash';
import {omit, xor} from 'lodash';
import {Dictionary} from 'ts-essentials';

import {Jira} from '../jira';
import {JqlBuilder} from '../jql-builder';
import {
IssueStreamState,
ProjectStreamSlice,
StreamState,
StreamWithProjectSlices,
} from './common';

export class FarosIssues extends StreamWithProjectSlices {
projectKey: string;

getJsonSchema(): Dictionary<any, string> {
return require('../../resources/schemas/farosIssues.json');
}
Expand All @@ -31,34 +29,80 @@ export class FarosIssues extends StreamWithProjectSlices {
syncMode: SyncMode,
cursorField?: string[],
streamSlice?: ProjectStreamSlice,
streamState?: StreamState
): AsyncGenerator<Issue> {
streamState?: IssueStreamState
): AsyncGenerator<Issue | IssueCompact> {
const jira = await Jira.instance(this.config, this.logger);
this.projectKey = streamSlice?.project;
const projectKey = streamSlice?.project;
const projectState = streamState?.[projectKey];
const updateRange =
syncMode === SyncMode.INCREMENTAL
? this.getUpdateRange(streamState?.[this.projectKey]?.cutoff)
? this.getUpdateRange(projectState?.cutoff)
: this.getUpdateRange();

if (
projectState &&
xor(projectState.additionalFields, this.config.additional_fields ?? [])
.length > 0
) {
yield* this.getPreviousIssuesAdditionalFields(streamSlice, streamState);
}

for await (const issue of jira.getIssues(
new JqlBuilder()
.withProject(this.projectKey)
.withProject(projectKey)
.withDateRange(updateRange)
.build()
)) {
yield omit(issue, 'fields');
}
}

private async *getPreviousIssuesAdditionalFields(
streamSlice?: ProjectStreamSlice,
streamState?: IssueStreamState
): AsyncGenerator<IssueCompact> {
const jira = await Jira.instance(this.config, this.logger);
const projectKey = streamSlice?.project;
const projectState = streamState?.[projectKey];
const from = new Date(projectState.earliestIssueUpdateTimestamp);
const to = new Date(projectState.cutoff);

this.logger.info(
`Refetching additional fields for issues in project ${projectKey} from ${from} to ${to}, since additional fields have changed.`
);

for await (const issue of jira.getIssueCompactWithAdditionalFields(
new JqlBuilder().withProject(projectKey).withDateRange([from, to]).build()
)) {
yield {...issue, updateAdditionalFields: true};
}
}

getUpdatedState(
currentStreamState: StreamState,
latestRecord: Issue
): StreamState {
currentStreamState: IssueStreamState,
latestRecord: Issue,
slice: ProjectStreamSlice
): IssueStreamState {
const latestRecordCutoff = Utils.toDate(latestRecord?.updated ?? 0);
return this.getUpdatedStreamState(
const updatedState = this.getUpdatedStreamState(
latestRecordCutoff,
currentStreamState,
this.projectKey
slice.project
);

const earliestIssueUpdateTimestamp = Math.min(
currentStreamState?.[slice.project]?.earliestIssueUpdateTimestamp ??
Infinity,
latestRecordCutoff.getTime()
);

return {
...updatedState,
[slice.project]: {
...updatedState[slice.project],
additionalFields: this.config.additional_fields ?? [],
earliestIssueUpdateTimestamp,
},
};
}
}

0 comments on commit bfca372

Please sign in to comment.