Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(app-gen,worker): move add job to worker #5458

Merged
merged 5 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Injectable, Logger } from '@nestjs/common';
import { ObservabilityBackgroundTransactionEnum } from '@novu/shared';
import {
getSubscriberProcessWorkerOptions,
SubscriberJobBound,
SubscriberProcessWorkerService,
PinoLogger,
storage,
Expand All @@ -15,6 +14,8 @@ import {
IProcessSubscriberDataDto,
} from '@novu/application-generic';

import { SubscriberJobBound } from '../usecases/subscriber-job-bound/subscriber-job-bound.usecase';

const LOG_CONTEXT = 'SubscriberProcessWorker';

@Injectable()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
import { forwardRef, Inject, Injectable, Logger } from '@nestjs/common';
import { forwardRef, Inject, Injectable } from '@nestjs/common';

import { JobRepository, JobStatusEnum } from '@novu/dal';
import { DelayTypeEnum, ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, StepTypeEnum } from '@novu/shared';
import {
DelayTypeEnum,
ExecutionDetailsSourceEnum,
ExecutionDetailsStatusEnum,
StepTypeEnum,
} from '@novu/shared';

import { ApiException } from '../../utils/exceptions';
import { AddJobCommand } from './add-job.command';
import { CalculateDelayService } from '../../services';
import { InstrumentUsecase } from '../../instrumentation';
import { DetailEnum } from '../create-execution-details';
import {
ApiException,
CalculateDelayService,
DetailEnum,
ExecutionLogRoute,
ExecutionLogRouteCommand,
} from '../execution-log-route';
InstrumentUsecase,
} from '@novu/application-generic';

import { AddJobCommand } from './add-job.command';

@Injectable()
export class AddDelayJob {
Expand Down Expand Up @@ -56,11 +51,7 @@ export class AddDelayJob {
: undefined,
});

await this.jobRepository.updateStatus(
command.environmentId,
data._id,
JobStatusEnum.DELAYED
);
await this.jobRepository.updateStatus(command.environmentId, data._id, JobStatusEnum.DELAYED);
} catch (error: any) {
await this.executionLogRoute.execute(
ExecutionLogRouteCommand.create({
Expand All @@ -74,11 +65,7 @@ export class AddDelayJob {
})
);

await this.jobRepository.updateStatus(
command.environmentId,
data._id,
JobStatusEnum.CANCELED
);
await this.jobRepository.updateStatus(command.environmentId, data._id, JobStatusEnum.CANCELED);

throw error;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { IsDefined } from 'class-validator';
import { JobEntity } from '@novu/dal';

import { EnvironmentWithUserCommand } from '../../commands';
import { EnvironmentWithUserCommand } from '@novu/application-generic';

export class AddJobCommand extends EnvironmentWithUserCommand {
@IsDefined()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,25 @@ import { AddDelayJob } from './add-delay-job.usecase';
import { MergeOrCreateDigestCommand } from './merge-or-create-digest.command';
import { MergeOrCreateDigest } from './merge-or-create-digest.usecase';
import { AddJobCommand } from './add-job.command';
import { validateDigest } from './validation';
import { ModuleRef } from '@nestjs/core';
import {
CalculateDelayService,
ConditionsFilter,
ConditionsFilterCommand,
DetailEnum,
} from '../../usecases';
import {
CalculateDelayService,
JobsOptions,
StandardQueueService,
} from '../../services';
import { LogDecorator } from '../../logging';
import { InstrumentUsecase } from '../../instrumentation';
import { validateDigest } from './validation';
import {
ExecutionLogRoute,
ExecutionLogRouteCommand,
} from '../execution-log-route';
import { ModuleRef } from '@nestjs/core';
import {
ExecuteOutput,
IChimeraDigestResponse,
IFilterVariables,
InstrumentUsecase,
IUseCaseInterfaceInline,
JobsOptions,
LogDecorator,
requireInject,
} from '../../utils/require-inject';
import { IFilterVariables } from '../../utils/filter-processing-details';
StandardQueueService,
ExecuteOutput,
} from '@novu/application-generic';

export enum BackoffStrategiesEnum {
WEBHOOK_FILTER_BACKOFF = 'webhookFilterBackoff',
Expand Down Expand Up @@ -73,26 +67,19 @@ export class AddJob {
Logger.debug(`Job contents for job ${job._id}`, job, LOG_CONTEXT);

if (!job) {
Logger.warn(
`Job ${job._id} was null in both the input and search`,
LOG_CONTEXT
);
Logger.warn(`Job was null in both the input and search`, LOG_CONTEXT);

return;
}

Logger.log(
`Scheduling New Job ${job._id} of type: ${job.type}`,
LOG_CONTEXT
);
Logger.log(`Scheduling New Job ${job._id} of type: ${job.type}`, LOG_CONTEXT);

let digestAmount: number | undefined;
let delayAmount: number | undefined = undefined;

let filtered = false;
let filterVariables: IFilterVariables | undefined;
if (
[StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(
job.type as StepTypeEnum
)
) {
if ([StepTypeEnum.DELAY, StepTypeEnum.DIGEST].includes(job.type as StepTypeEnum)) {
const shouldRun = await this.conditionsFilter.filter(
ConditionsFilterCommand.create({
filters: job.step.filters || [],
Expand All @@ -106,105 +93,89 @@ export class AddJob {

filterVariables = shouldRun.variables;
filtered = !shouldRun.passed;
}

let digestAmount: number | undefined;
let digestCreationResult: DigestCreationResultEnum | undefined;
if (job.type === StepTypeEnum.DIGEST) {
const resonateResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>({
...command,
variables: filterVariables,
});

validateDigest(job);

digestAmount = this.calculateDelayService.calculateDelay({
stepMetadata: job.digest,
payload: job.payload,
overrides: job.overrides,
// TODO: Remove fallback after other digest types are implemented.
chimeraResponse: resonateResponse
? { type: DigestTypeEnum.REGULAR, ...resonateResponse.outputs }
: undefined,
});
let digestCreationResult: DigestCreationResultEnum | undefined;
if (job.type === StepTypeEnum.DIGEST) {
const resonateResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>({
...command,
variables: filterVariables,
});

Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT);
validateDigest(job);

digestCreationResult = await this.mergeOrCreateDigestUsecase.execute(
MergeOrCreateDigestCommand.create({
job,
filtered,
chimeraData: resonateResponse?.outputs,
})
);
digestAmount = this.calculateDelayService.calculateDelay({
stepMetadata: job.digest,
payload: job.payload,
overrides: job.overrides,
chimeraResponse: this.fallbackToRegularDigest(resonateResponse?.outputs),
});

if (digestCreationResult === DigestCreationResultEnum.MERGED) {
Logger.log('Digest was merged, queueing next job', LOG_CONTEXT);
Logger.debug(`Digest step amount is: ${digestAmount}`, LOG_CONTEXT);

return;
}
digestCreationResult = await this.mergeOrCreateDigestUsecase.execute(
MergeOrCreateDigestCommand.create({
job,
filtered,
chimeraData: resonateResponse?.outputs,
})
);

if (digestCreationResult === DigestCreationResultEnum.SKIPPED) {
const nextJobToSchedule = await this.jobRepository.findOne({
_environmentId: command.environmentId,
_parentId: job._id,
});
if (digestCreationResult === DigestCreationResultEnum.MERGED) {
Logger.log('Digest was merged, queueing next job', LOG_CONTEXT);

if (!nextJobToSchedule) {
return;
}

await this.execute({
userId: job._userId,
environmentId: job._environmentId,
organizationId: command.organizationId,
jobId: nextJobToSchedule._id,
job: nextJobToSchedule,
});
if (digestCreationResult === DigestCreationResultEnum.SKIPPED) {
const nextJobToSchedule = await this.jobRepository.findOne({
_environmentId: command.environmentId,
_parentId: job._id,
});

return;
}
}
if (!nextJobToSchedule) {
return;
}

let delayAmount: number | undefined = undefined;
await this.execute({
userId: job._userId,
environmentId: job._environmentId,
organizationId: command.organizationId,
jobId: nextJobToSchedule._id,
job: nextJobToSchedule,
});

if (job.type === StepTypeEnum.DELAY) {
const resonateResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>({
...command,
variables: filterVariables,
});
return;
}
}

command.chimeraResponse = resonateResponse;
delayAmount = await this.addDelayJob.execute(command);
if (job.type === StepTypeEnum.DELAY) {
const resonateResponse = await this.resonateUsecase.execute<
AddJobCommand & { variables: IFilterVariables },
ExecuteOutput<IChimeraDigestResponse>
>({
...command,
variables: filterVariables,
});

Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT);
command.chimeraResponse = resonateResponse;
delayAmount = await this.addDelayJob.execute(command);

if (delayAmount === undefined) {
Logger.warn(
`Delay Amount does not exist on a delay job ${job._id}`,
LOG_CONTEXT
);
Logger.debug(`Delay step Amount is: ${delayAmount}`, LOG_CONTEXT);

return;
if (delayAmount === undefined) {
Logger.warn(`Delay Amount does not exist on a delay job ${job._id}`, LOG_CONTEXT);

return;
}
}
}

if (digestAmount === undefined && delayAmount === undefined) {
Logger.verbose(
`Updating status to queued for job ${job._id}`,
LOG_CONTEXT
);
await this.jobRepository.updateStatus(
command.environmentId,
job._id,
JobStatusEnum.QUEUED
);
Logger.verbose(`Updating status to queued for job ${job._id}`, LOG_CONTEXT);
await this.jobRepository.updateStatus(command.environmentId, job._id, JobStatusEnum.QUEUED);
}

await this.executionLogRoute.execute(
Expand All @@ -218,18 +189,30 @@ export class AddJob {
})
);

const delay = filtered ? 0 : digestAmount ?? delayAmount;
const delay = (filtered ? 0 : digestAmount ?? delayAmount) ?? 0;

if ((digestAmount || delayAmount) && filtered) {
Logger.verbose(
`Delay for job ${job._id} will be 0 because job was filtered`,
LOG_CONTEXT
);
Logger.verbose(`Delay for job ${job._id} will be 0 because job was filtered`, LOG_CONTEXT);
}

await this.queueJob(job, delay);
}

/*
* Fallback to regular digest type.
* This is a temporary solution until other digest types are implemented.
*/
private fallbackToRegularDigest(outputs: IChimeraDigestResponse | undefined): IChimeraDigestResponse | undefined {
let resonateResponseOutput: IChimeraDigestResponse | undefined = undefined;

if (outputs) {
const { type, ...resonateResponseOutputsOmitType } = outputs;
resonateResponseOutput = { type: DigestTypeEnum.REGULAR, ...resonateResponseOutputsOmitType };
}

return resonateResponseOutput;
}

public async queueJob(job: JobEntity, delay: number) {
Logger.verbose(`Adding Job ${job._id} to Queue`, LOG_CONTEXT);
const stepContainsWebhookFilter = this.stepContainsFilter(job, 'webhook');
Expand All @@ -250,11 +233,7 @@ export class AddJob {
_userId: job._userId,
};

Logger.verbose(
jobData,
'Going to add a minimal job in Standard Queue',
LOG_CONTEXT
);
Logger.verbose(jobData, 'Going to add a minimal job in Standard Queue', LOG_CONTEXT);

await this.standardQueueService.add({
name: job._id,
Expand All @@ -276,10 +255,7 @@ export class AddJob {
await this.executionLogRoute.execute(
ExecutionLogRouteCommand.create({
...ExecutionLogRouteCommand.getDetailsFromJob(job),
detail:
job.type === StepTypeEnum.DELAY
? DetailEnum.STEP_DELAYED
: DetailEnum.STEP_DIGESTED,
detail: job.type === StepTypeEnum.DELAY ? DetailEnum.STEP_DELAYED : DetailEnum.STEP_DIGESTED,
source: ExecutionDetailsSourceEnum.INTERNAL,
status: ExecutionDetailsStatusEnum.PENDING,
isTest: false,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { IsDefined, IsOptional } from 'class-validator';
import { JobEntity } from '@novu/dal';

import { BaseCommand } from '../../commands/base.command';
import { IChimeraDigestResponse } from '../../utils/require-inject';
import { JobEntity } from '@novu/dal';
import { BaseCommand, IChimeraDigestResponse } from '@novu/application-generic';

export class MergeOrCreateDigestCommand extends BaseCommand {
@IsDefined()
Expand Down