Skip to content

Commit

Permalink
Support workflow status listener (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed May 22, 2024
1 parent f28cda9 commit 5bc0ebf
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 1 deletion.
28 changes: 28 additions & 0 deletions examples/workflow_status_listner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time
import uuid

from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models import StartWorkflowRequest, RerunWorkflowRequest, TaskResult
from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from conductor.client.workflow.task.http_task import HttpTask
from conductor.client.workflow.task.wait_task import WaitTask


def main():
api_config = Configuration()
clients = OrkesClients(configuration=api_config)

workflow = ConductorWorkflow(name='workflow_status_listener_demo', version=1,
executor=clients.get_workflow_executor())
workflow >> HttpTask(task_ref_name='http_ref', http_input={
'uri': 'https://orkes-api-tester.orkesconductor.com/api'
})
workflow.enable_status_listener('kafka:abcd')
workflow.register(overwrite=True)
print(f'Registered {workflow.name}')


if __name__ == '__main__':
main()
16 changes: 15 additions & 1 deletion src/conductor/client/http/models/workflow_def.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class WorkflowDef(object):
'schema_version': 'int',
'restartable': 'bool',
'workflow_status_listener_enabled': 'bool',
'workflow_status_listener_sink': 'str',
'owner_email': 'str',
'timeout_policy': 'str',
'timeout_seconds': 'int',
Expand All @@ -58,6 +59,7 @@ class WorkflowDef(object):
'schema_version': 'schemaVersion',
'restartable': 'restartable',
'workflow_status_listener_enabled': 'workflowStatusListenerEnabled',
'workflow_status_listener_sink': 'workflowStatusListenerSink',
'owner_email': 'ownerEmail',
'timeout_policy': 'timeoutPolicy',
'timeout_seconds': 'timeoutSeconds',
Expand All @@ -68,6 +70,7 @@ class WorkflowDef(object):
def __init__(self, owner_app=None, create_time=None, update_time=None, created_by=None, updated_by=None, name=None,
description=None, version=None, tasks=None, input_parameters=None, output_parameters: dict = {},
failure_workflow=None, schema_version=None, restartable=None, workflow_status_listener_enabled=None,
workflow_status_listener_sink=None,
owner_email=None, timeout_policy=None, timeout_seconds=None, variables=None,
input_template=None): # noqa: E501
"""WorkflowDef - a model defined in Swagger""" # noqa: E501
Expand All @@ -86,6 +89,7 @@ def __init__(self, owner_app=None, create_time=None, update_time=None, created_b
self._schema_version = None
self._restartable = None
self._workflow_status_listener_enabled = None
self._workflow_status_listener_sink = None
self._owner_email = None
self._timeout_policy = None
self._timeout_seconds = None
Expand Down Expand Up @@ -119,7 +123,9 @@ def __init__(self, owner_app=None, create_time=None, update_time=None, created_b
if restartable is not None:
self.restartable = restartable
if workflow_status_listener_enabled is not None:
self.workflow_status_listener_enabled = workflow_status_listener_enabled
self._workflow_status_listener_enabled = workflow_status_listener_enabled
if workflow_status_listener_sink is not None:
self._workflow_status_listener_sink = workflow_status_listener_sink
if owner_email is not None:
self.owner_email = owner_email
if timeout_policy is not None:
Expand Down Expand Up @@ -445,6 +451,14 @@ def workflow_status_listener_enabled(self, workflow_status_listener_enabled):

self._workflow_status_listener_enabled = workflow_status_listener_enabled

@property
def workflow_status_listener_sink(self):
return self._workflow_status_listener_sink

@workflow_status_listener_sink.setter
def workflow_status_listener_sink(self, workflow_status_listener_sink):
self._workflow_status_listener_sink = workflow_status_listener_sink

@property
def owner_email(self):
"""Gets the owner_email of this WorkflowDef. # noqa: E501
Expand Down
2 changes: 2 additions & 0 deletions src/conductor/client/http/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def request(self, method, url, query_params=None, headers=None,
request_body = '{}'
if body is not None:
request_body = json.dumps(body)
if isinstance(body, str):
request_body = request_body.strip('"')
r = self.connection.request(
method, url,
data=request_body,
Expand Down
12 changes: 12 additions & 0 deletions src/conductor/client/workflow/conductor_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def __init__(self,
self._input_template = {}
self._variables = {}
self._restartable = True
self._workflow_status_listener_enabled = False
self._workflow_status_listener_sink = None

@property
def name(self) -> str:
Expand Down Expand Up @@ -101,6 +103,14 @@ def restartable(self, restartable: bool) -> Self:
self._restartable = deepcopy(restartable)
return self

def enable_status_listener(self, sink_name: bool) -> Self:
self._workflow_status_listener_sink = sink_name
self._workflow_status_listener_enabled = True

def disable_status_listener(self) -> Self:
self._workflow_status_listener_sink = None
self._workflow_status_listener_enabled = False

# Workflow output follows similar structure as task input
# See https://conductor.netflix.com/how-tos/Tasks/task-inputs.html for more details
def output_parameters(self, output_parameters: Dict[str, Any]) -> Self:
Expand Down Expand Up @@ -257,6 +267,8 @@ def to_workflow_def(self) -> WorkflowDef:
timeout_seconds=self._timeout_seconds,
variables=self._variables,
input_template=self._input_template,
workflow_status_listener_enabled=self._workflow_status_listener_enabled,
workflow_status_listener_sink=self._workflow_status_listener_sink
)

def to_workflow_task(self):
Expand Down

0 comments on commit 5bc0ebf

Please sign in to comment.