Skip to content

Commit

Permalink
refactor(core): Rename ActiveWorkflowRunner to `ActiveWorkflowManag…
Browse files Browse the repository at this point in the history
…er` (no-changelog) (#9280)
  • Loading branch information
ivov committed May 6, 2024
1 parent 552cf8f commit 7b925ab
Show file tree
Hide file tree
Showing 25 changed files with 129 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ interface QueuedActivation {
}

@Service()
export class ActiveWorkflowRunner {
export class ActiveWorkflowManager {
private queuedActivations: { [workflowId: string]: QueuedActivation } = {};

constructor(
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type {
StartNodeData,
} from 'n8n-workflow';

import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import type { ActiveWorkflowManager } from '@/ActiveWorkflowManager';

import type { WorkflowExecute } from 'n8n-core';

Expand Down Expand Up @@ -638,7 +638,7 @@ export interface N8nApp {
app: Application;
restEndpoint: string;
externalHooks: ExternalHooks;
activeWorkflowRunner: ActiveWorkflowRunner;
activeWorkflowManager: ActiveWorkflowManager;
}

export type UserSettings = Pick<User, 'id' | 'settings'>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { FindOptionsWhere } from '@n8n/typeorm';
import { In, Like, QueryFailedError } from '@n8n/typeorm';
import { v4 as uuid } from 'uuid';

import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import config from '@/config';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { ExternalHooks } from '@/ExternalHooks';
Expand Down Expand Up @@ -179,12 +179,12 @@ export = {
await replaceInvalidCredentials(updateData);
addNodeIds(updateData);

const workflowRunner = Container.get(ActiveWorkflowRunner);
const workflowManager = Container.get(ActiveWorkflowManager);

if (sharedWorkflow.workflow.active) {
// When workflow gets saved always remove it as the triggers could have been
// changed and so the changes would not take effect
await workflowRunner.remove(id);
await workflowManager.remove(id);
}

try {
Expand All @@ -197,7 +197,7 @@ export = {

if (sharedWorkflow.workflow.active) {
try {
await workflowRunner.add(sharedWorkflow.workflowId, 'update');
await workflowManager.add(sharedWorkflow.workflowId, 'update');
} catch (error) {
if (error instanceof Error) {
return res.status(400).json({ message: error.message });
Expand Down Expand Up @@ -236,7 +236,7 @@ export = {

if (!sharedWorkflow.workflow.active) {
try {
await Container.get(ActiveWorkflowRunner).add(sharedWorkflow.workflowId, 'activate');
await Container.get(ActiveWorkflowManager).add(sharedWorkflow.workflowId, 'activate');
} catch (error) {
if (error instanceof Error) {
return res.status(400).json({ message: error.message });
Expand Down Expand Up @@ -268,10 +268,10 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

const workflowRunner = Container.get(ActiveWorkflowRunner);
const activeWorkflowManager = Container.get(ActiveWorkflowManager);

if (sharedWorkflow.workflow.active) {
await workflowRunner.remove(sharedWorkflow.workflowId);
await activeWorkflowManager.remove(sharedWorkflow.workflowId);

await setWorkflowAsInactive(sharedWorkflow.workflow);

Expand Down
16 changes: 8 additions & 8 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { jsonParse } from 'n8n-workflow';

import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { Server } from '@/Server';
import { EDITOR_UI_DIST_DIR, LICENSE_FEATURES } from '@/constants';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
Expand Down Expand Up @@ -57,7 +57,7 @@ export class Start extends BaseCommand {
}),
};

protected activeWorkflowRunner: ActiveWorkflowRunner;
protected activeWorkflowManager: ActiveWorkflowManager;

protected server = Container.get(Server);

Expand Down Expand Up @@ -92,14 +92,14 @@ export class Start extends BaseCommand {

try {
// Stop with trying to activate workflows that could not be activated
this.activeWorkflowRunner.removeAllQueuedWorkflowActivations();
this.activeWorkflowManager.removeAllQueuedWorkflowActivations();

Container.get(WaitTracker).stopTracking();

await this.externalHooks?.run('n8n.stop', []);

if (Container.get(OrchestrationService).isMultiMainSetupEnabled) {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();

await Container.get(OrchestrationService).shutdown();
}
Expand Down Expand Up @@ -171,7 +171,7 @@ export class Start extends BaseCommand {
}

await super.init();
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
this.activeWorkflowManager = Container.get(ActiveWorkflowManager);

await this.initLicense();

Expand Down Expand Up @@ -212,11 +212,11 @@ export class Start extends BaseCommand {
orchestrationService.multiMainSetup
.on('leader-stepdown', async () => {
await this.license.reinit(); // to disable renewal
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
})
.on('leader-takeover', async () => {
await this.license.reinit(); // to enable renewal
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.addAllTriggerAndPollerBasedWorkflows();
});
}

Expand Down Expand Up @@ -286,7 +286,7 @@ export class Start extends BaseCommand {
await this.initPruning();

// Start to get active workflows and run their triggers
await this.activeWorkflowRunner.init();
await this.activeWorkflowManager.init();

const editorUrl = Container.get(UrlService).baseUrl;
this.log(`\nEditor is now accessible via:\n${editorUrl}`);
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/controllers/debug.controller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Get, RestController } from '@/decorators';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { OrchestrationService } from '@/services/orchestration.service';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';

@RestController('/debug')
export class DebugController {
constructor(
private readonly orchestrationService: OrchestrationService,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly workflowRepository: WorkflowRepository,
) {}

Expand All @@ -16,12 +16,12 @@ export class DebugController {
const leaderKey = await this.orchestrationService.multiMainSetup.fetchLeaderKey();

const triggersAndPollers = await this.workflowRepository.findIn(
this.activeWorkflowRunner.allActiveInMemory(),
this.activeWorkflowManager.allActiveInMemory(),
);

const webhooks = await this.workflowRepository.findWebhookBasedActiveWorkflows();

const activationErrors = await this.activeWorkflowRunner.getAllWorkflowActivationErrors();
const activationErrors = await this.activeWorkflowManager.getAllWorkflowActivationErrors();

return {
instanceId: this.orchestrationService.instanceId,
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/controllers/e2e.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { v4 as uuid } from 'uuid';
import config from '@/config';
import { SettingsRepository } from '@db/repositories/settings.repository';
import { UserRepository } from '@db/repositories/user.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { License } from '@/License';
import { LICENSE_FEATURES, inE2ETests } from '@/constants';
Expand Down Expand Up @@ -87,7 +87,7 @@ export class E2EController {
license: License,
private readonly settingsRepo: SettingsRepository,
private readonly userRepo: UserRepository,
private readonly workflowRunner: ActiveWorkflowRunner,
private readonly workflowRunner: ActiveWorkflowManager,
private readonly mfaService: MfaService,
private readonly cacheService: CacheService,
private readonly push: Push,
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/controllers/users.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
UserRoleChangePayload,
UserSettingsUpdatePayload,
} from '@/requests';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import type { PublicUser, ITelemetryUserDeletionData } from '@/Interfaces';
import { AuthIdentity } from '@db/entities/AuthIdentity';
import { SharedCredentialsRepository } from '@db/repositories/sharedCredentials.repository';
Expand All @@ -36,7 +36,7 @@ export class UsersController {
private readonly sharedCredentialsRepository: SharedCredentialsRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly userRepository: UserRepository,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly authService: AuthService,
private readonly userService: UserService,
) {}
Expand Down Expand Up @@ -264,7 +264,7 @@ export class UsersController {
ownedSharedWorkflows.map(async ({ workflow }) => {
if (workflow.active) {
// deactivate before deleting
await this.activeWorkflowRunner.remove(workflow.id);
await this.activeWorkflowManager.remove(workflow.id);
}
return workflow;
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906994 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN triggerCount integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}

async down({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906995 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN "triggerCount" integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}

async down({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906993 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` ADD COLUMN "triggerCount" integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}

async down({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type { Variables } from '@db/entities/Variables';
import { SharedCredentials } from '@db/entities/SharedCredentials';
import type { WorkflowTagMapping } from '@db/entities/WorkflowTagMapping';
import type { TagEntity } from '@db/entities/TagEntity';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { In } from '@n8n/typeorm';
import { isUniqueConstraintError } from '@/ResponseHelper';
import type { SourceControlWorkflowVersionId } from './types/sourceControlWorkflowVersionId';
Expand Down Expand Up @@ -45,7 +45,7 @@ export class SourceControlImportService {
constructor(
private readonly logger: Logger,
private readonly variablesService: VariablesService,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly tagRepository: TagRepository,
instanceSettings: InstanceSettings,
) {
Expand Down Expand Up @@ -203,7 +203,7 @@ export class SourceControlImportService {
}

public async importWorkflowFromWorkFolder(candidates: SourceControlledFile[], userId: string) {
const workflowRunner = this.activeWorkflowRunner;
const workflowRunner = this.activeWorkflowManager;
const candidateIds = candidates.map((c) => c.id);
const existingWorkflows = await Container.get(WorkflowRepository).findByIds(candidateIds, {
fields: ['id', 'name', 'versionId', 'active'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { License } from '@/License';
import { Logger } from '@/Logger';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { Push } from '@/push';
import { TestWebhooks } from '@/TestWebhooks';
import { OrchestrationService } from '@/services/orchestration.service';
Expand Down Expand Up @@ -93,7 +93,7 @@ export async function handleCommandMessageMain(messageString: string) {
const { workflowId } = message.payload;

try {
await Container.get(ActiveWorkflowRunner).add(workflowId, 'activate', undefined, {
await Container.get(ActiveWorkflowManager).add(workflowId, 'activate', undefined, {
shouldPublish: false, // prevent leader re-publishing message
});

Expand Down Expand Up @@ -134,10 +134,10 @@ export async function handleCommandMessageMain(messageString: string) {

const { workflowId } = message.payload;

const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
const activeWorkflowManager = Container.get(ActiveWorkflowManager);

await activeWorkflowRunner.removeActivationError(workflowId);
await activeWorkflowRunner.removeWorkflowTriggersAndPollers(workflowId);
await activeWorkflowManager.removeActivationError(workflowId);
await activeWorkflowManager.removeWorkflowTriggersAndPollers(workflowId);

push.broadcast('workflowDeactivated', { workflowId });

Expand Down
10 changes: 5 additions & 5 deletions packages/cli/src/workflows/workflow.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { validateEntity } from '@/GenericHelpers';
import { ExternalHooks } from '@/ExternalHooks';
Expand Down Expand Up @@ -41,7 +41,7 @@ export class WorkflowService {
private readonly workflowHistoryService: WorkflowHistoryService,
private readonly orchestrationService: OrchestrationService,
private readonly externalHooks: ExternalHooks,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
) {}

async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) {
Expand Down Expand Up @@ -120,7 +120,7 @@ export class WorkflowService {
* will take effect only on removing and re-adding.
*/
if (shared.workflow.active) {
await this.activeWorkflowRunner.remove(workflowId);
await this.activeWorkflowManager.remove(workflowId);
}

const workflowSettings = workflow.settings ?? {};
Expand Down Expand Up @@ -200,7 +200,7 @@ export class WorkflowService {
// When the workflow is supposed to be active add it again
try {
await this.externalHooks.run('workflow.activate', [updatedWorkflow]);
await this.activeWorkflowRunner.add(
await this.activeWorkflowManager.add(
workflowId,
shared.workflow.active ? 'update' : 'activate',
);
Expand Down Expand Up @@ -245,7 +245,7 @@ export class WorkflowService {

if (sharedWorkflow.workflow.active) {
// deactivate before deleting
await this.activeWorkflowRunner.remove(workflowId);
await this.activeWorkflowManager.remove(workflowId);
}

const idsForDeletion = await this.executionRepository
Expand Down

0 comments on commit 7b925ab

Please sign in to comment.