Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Rename ActiveWorkflowRunner to ActiveWorkflowManager (no-changelog) #9280

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ interface QueuedActivation {
}

@Service()
export class ActiveWorkflowRunner {
export class ActiveWorkflowManager {
private queuedActivations: { [workflowId: string]: QueuedActivation } = {};

constructor(
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import type {
StartNodeData,
} from 'n8n-workflow';

import type { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import type { ActiveWorkflowManager } from '@/ActiveWorkflowManager';

import type { WorkflowExecute } from 'n8n-core';

Expand Down Expand Up @@ -638,7 +638,7 @@ export interface N8nApp {
app: Application;
restEndpoint: string;
externalHooks: ExternalHooks;
activeWorkflowRunner: ActiveWorkflowRunner;
activeWorkflowManager: ActiveWorkflowManager;
}

export type UserSettings = Pick<User, 'id' | 'settings'>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { FindOptionsWhere } from '@n8n/typeorm';
import { In, Like, QueryFailedError } from '@n8n/typeorm';
import { v4 as uuid } from 'uuid';

import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import config from '@/config';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { ExternalHooks } from '@/ExternalHooks';
Expand Down Expand Up @@ -179,7 +179,7 @@ export = {
await replaceInvalidCredentials(updateData);
addNodeIds(updateData);

const workflowRunner = Container.get(ActiveWorkflowRunner);
const workflowRunner = Container.get(ActiveWorkflowManager);
despairblue marked this conversation as resolved.
Show resolved Hide resolved

if (sharedWorkflow.workflow.active) {
// When workflow gets saved always remove it as the triggers could have been
Expand Down Expand Up @@ -236,7 +236,7 @@ export = {

if (!sharedWorkflow.workflow.active) {
try {
await Container.get(ActiveWorkflowRunner).add(sharedWorkflow.workflowId, 'activate');
await Container.get(ActiveWorkflowManager).add(sharedWorkflow.workflowId, 'activate');
} catch (error) {
if (error instanceof Error) {
return res.status(400).json({ message: error.message });
Expand Down Expand Up @@ -268,7 +268,7 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}

const workflowRunner = Container.get(ActiveWorkflowRunner);
const workflowRunner = Container.get(ActiveWorkflowManager);
despairblue marked this conversation as resolved.
Show resolved Hide resolved

if (sharedWorkflow.workflow.active) {
await workflowRunner.remove(sharedWorkflow.workflowId);
Expand Down
16 changes: 8 additions & 8 deletions packages/cli/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { jsonParse } from 'n8n-workflow';

import config from '@/config';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { Server } from '@/Server';
import { EDITOR_UI_DIST_DIR, LICENSE_FEATURES } from '@/constants';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
Expand Down Expand Up @@ -57,7 +57,7 @@ export class Start extends BaseCommand {
}),
};

protected activeWorkflowRunner: ActiveWorkflowRunner;
protected activeWorkflowManager: ActiveWorkflowManager;

protected server = Container.get(Server);

Expand Down Expand Up @@ -92,14 +92,14 @@ export class Start extends BaseCommand {

try {
// Stop with trying to activate workflows that could not be activated
this.activeWorkflowRunner.removeAllQueuedWorkflowActivations();
this.activeWorkflowManager.removeAllQueuedWorkflowActivations();

Container.get(WaitTracker).stopTracking();

await this.externalHooks?.run('n8n.stop', []);

if (Container.get(OrchestrationService).isMultiMainSetupEnabled) {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();

await Container.get(OrchestrationService).shutdown();
}
Expand Down Expand Up @@ -171,7 +171,7 @@ export class Start extends BaseCommand {
}

await super.init();
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
this.activeWorkflowManager = Container.get(ActiveWorkflowManager);

await this.initLicense();

Expand Down Expand Up @@ -211,10 +211,10 @@ export class Start extends BaseCommand {

orchestrationService.multiMainSetup
.on('leader-stepdown', async () => {
await this.activeWorkflowRunner.removeAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.removeAllTriggerAndPollerBasedWorkflows();
})
.on('leader-takeover', async () => {
await this.activeWorkflowRunner.addAllTriggerAndPollerBasedWorkflows();
await this.activeWorkflowManager.addAllTriggerAndPollerBasedWorkflows();
});
}

Expand Down Expand Up @@ -284,7 +284,7 @@ export class Start extends BaseCommand {
await this.initPruning();

// Start to get active workflows and run their triggers
await this.activeWorkflowRunner.init();
await this.activeWorkflowManager.init();

const editorUrl = Container.get(UrlService).baseUrl;
this.log(`\nEditor is now accessible via:\n${editorUrl}`);
Expand Down
8 changes: 4 additions & 4 deletions packages/cli/src/controllers/debug.controller.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Get, RestController } from '@/decorators';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { OrchestrationService } from '@/services/orchestration.service';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';

@RestController('/debug')
export class DebugController {
constructor(
private readonly orchestrationService: OrchestrationService,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly workflowRepository: WorkflowRepository,
) {}

Expand All @@ -16,12 +16,12 @@ export class DebugController {
const leaderKey = await this.orchestrationService.multiMainSetup.fetchLeaderKey();

const triggersAndPollers = await this.workflowRepository.findIn(
this.activeWorkflowRunner.allActiveInMemory(),
this.activeWorkflowManager.allActiveInMemory(),
);

const webhooks = await this.workflowRepository.findWebhookBasedActiveWorkflows();

const activationErrors = await this.activeWorkflowRunner.getAllWorkflowActivationErrors();
const activationErrors = await this.activeWorkflowManager.getAllWorkflowActivationErrors();

return {
instanceId: this.orchestrationService.instanceId,
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/controllers/e2e.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { v4 as uuid } from 'uuid';
import config from '@/config';
import { SettingsRepository } from '@db/repositories/settings.repository';
import { UserRepository } from '@db/repositories/user.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { License } from '@/License';
import { LICENSE_FEATURES, inE2ETests } from '@/constants';
Expand Down Expand Up @@ -85,7 +85,7 @@ export class E2EController {
license: License,
private readonly settingsRepo: SettingsRepository,
private readonly userRepo: UserRepository,
private readonly workflowRunner: ActiveWorkflowRunner,
private readonly workflowRunner: ActiveWorkflowManager,
private readonly mfaService: MfaService,
private readonly cacheService: CacheService,
private readonly push: Push,
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/controllers/users.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
UserRoleChangePayload,
UserSettingsUpdatePayload,
} from '@/requests';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import type { PublicUser, ITelemetryUserDeletionData } from '@/Interfaces';
import { AuthIdentity } from '@db/entities/AuthIdentity';
import { SharedCredentialsRepository } from '@db/repositories/sharedCredentials.repository';
Expand All @@ -36,7 +36,7 @@ export class UsersController {
private readonly sharedCredentialsRepository: SharedCredentialsRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly userRepository: UserRepository,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly authService: AuthService,
private readonly userService: UserService,
) {}
Expand Down Expand Up @@ -264,7 +264,7 @@ export class UsersController {
ownedSharedWorkflows.map(async ({ workflow }) => {
if (workflow.active) {
// deactivate before deleting
await this.activeWorkflowRunner.remove(workflow.id);
await this.activeWorkflowManager.remove(workflow.id);
}
return workflow;
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906994 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN triggerCount integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}

async down({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906995 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE ${tablePrefix}workflow_entity ADD COLUMN "triggerCount" integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}

async down({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export class AddTriggerCountColumn1669823906993 implements ReversibleMigration {
await queryRunner.query(
`ALTER TABLE \`${tablePrefix}workflow_entity\` ADD COLUMN "triggerCount" integer NOT NULL DEFAULT 0`,
);
// Table will be populated by n8n startup - see ActiveWorkflowRunner.ts
// Table will be populated by n8n startup - see ActiveWorkflowManager.ts
}

async down({ queryRunner, tablePrefix }: MigrationContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import type { Variables } from '@db/entities/Variables';
import { SharedCredentials } from '@db/entities/SharedCredentials';
import type { WorkflowTagMapping } from '@db/entities/WorkflowTagMapping';
import type { TagEntity } from '@db/entities/TagEntity';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { In } from '@n8n/typeorm';
import { isUniqueConstraintError } from '@/ResponseHelper';
import type { SourceControlWorkflowVersionId } from './types/sourceControlWorkflowVersionId';
Expand Down Expand Up @@ -45,7 +45,7 @@ export class SourceControlImportService {
constructor(
private readonly logger: Logger,
private readonly variablesService: VariablesService,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly tagRepository: TagRepository,
instanceSettings: InstanceSettings,
) {
Expand Down Expand Up @@ -203,7 +203,7 @@ export class SourceControlImportService {
}

public async importWorkflowFromWorkFolder(candidates: SourceControlledFile[], userId: string) {
const workflowRunner = this.activeWorkflowRunner;
const workflowRunner = this.activeWorkflowManager;
const candidateIds = candidates.map((c) => c.id);
const existingWorkflows = await Container.get(WorkflowRepository).findByIds(candidateIds, {
fields: ['id', 'name', 'versionId', 'active'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { MessageEventBus } from '@/eventbus/MessageEventBus/MessageEventBus';
import { ExternalSecretsManager } from '@/ExternalSecrets/ExternalSecretsManager.ee';
import { License } from '@/License';
import { Logger } from '@/Logger';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { Push } from '@/push';
import { TestWebhooks } from '@/TestWebhooks';
import { OrchestrationService } from '@/services/orchestration.service';
Expand Down Expand Up @@ -93,7 +93,7 @@ export async function handleCommandMessageMain(messageString: string) {
const { workflowId } = message.payload;

try {
await Container.get(ActiveWorkflowRunner).add(workflowId, 'activate', undefined, {
await Container.get(ActiveWorkflowManager).add(workflowId, 'activate', undefined, {
shouldPublish: false, // prevent leader re-publishing message
});

Expand Down Expand Up @@ -134,10 +134,10 @@ export async function handleCommandMessageMain(messageString: string) {

const { workflowId } = message.payload;

const activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
const activeWorkflowManager = Container.get(ActiveWorkflowManager);

await activeWorkflowRunner.removeActivationError(workflowId);
await activeWorkflowRunner.removeWorkflowTriggersAndPollers(workflowId);
await activeWorkflowManager.removeActivationError(workflowId);
await activeWorkflowManager.removeWorkflowTriggersAndPollers(workflowId);

push.broadcast('workflowDeactivated', { workflowId });

Expand Down
10 changes: 5 additions & 5 deletions packages/cli/src/workflows/workflow.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowTagMappingRepository } from '@db/repositories/workflowTagMapping.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { validateEntity } from '@/GenericHelpers';
import { ExternalHooks } from '@/ExternalHooks';
Expand Down Expand Up @@ -41,7 +41,7 @@ export class WorkflowService {
private readonly workflowHistoryService: WorkflowHistoryService,
private readonly orchestrationService: OrchestrationService,
private readonly externalHooks: ExternalHooks,
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
private readonly activeWorkflowManager: ActiveWorkflowManager,
) {}

async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) {
Expand Down Expand Up @@ -120,7 +120,7 @@ export class WorkflowService {
* will take effect only on removing and re-adding.
*/
if (shared.workflow.active) {
await this.activeWorkflowRunner.remove(workflowId);
await this.activeWorkflowManager.remove(workflowId);
}

const workflowSettings = workflow.settings ?? {};
Expand Down Expand Up @@ -200,7 +200,7 @@ export class WorkflowService {
// When the workflow is supposed to be active add it again
try {
await this.externalHooks.run('workflow.activate', [updatedWorkflow]);
await this.activeWorkflowRunner.add(
await this.activeWorkflowManager.add(
workflowId,
shared.workflow.active ? 'update' : 'activate',
);
Expand Down Expand Up @@ -245,7 +245,7 @@ export class WorkflowService {

if (sharedWorkflow.workflow.active) {
// deactivate before deleting
await this.activeWorkflowRunner.remove(workflowId);
await this.activeWorkflowManager.remove(workflowId);
}

const idsForDeletion = await this.executionRepository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow';
import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow';

import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';
import { ExternalHooks } from '@/ExternalHooks';
import { Push } from '@/push';
import { SecretsHelper } from '@/SecretsHelpers';
Expand Down Expand Up @@ -47,15 +47,15 @@ Object.assign(loader.loadedNodes, {
const webhookService = mockInstance(WebhookService);
const externalHooks = mockInstance(ExternalHooks);

let runner: ActiveWorkflowRunner;
let runner: ActiveWorkflowManager;

let createActiveWorkflow: () => Promise<WorkflowEntity>;
let createInactiveWorkflow: () => Promise<WorkflowEntity>;

beforeAll(async () => {
await testDb.init();

runner = Container.get(ActiveWorkflowRunner);
runner = Container.get(ActiveWorkflowManager);
despairblue marked this conversation as resolved.
Show resolved Hide resolved

const owner = await createOwner();
createActiveWorkflow = async () => await createWorkflow({ active: true }, owner);
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/test/integration/auth.mw.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { ActiveWorkflowManager } from '@/ActiveWorkflowManager';

import type { SuperAgentTest } from 'supertest';
import * as utils from './shared/utils/';
import { createUser } from './shared/db/users';
import { mockInstance } from '../shared/mocking';

describe('Auth Middleware', () => {
mockInstance(ActiveWorkflowRunner);
mockInstance(ActiveWorkflowManager);

const testServer = utils.setupTestServer({
endpointGroups: ['me', 'auth', 'owner', 'users', 'invitations'],
Expand Down