Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

default parameters are not coerced when running a flow from a deployment #13273

Open
4 tasks done
masonmenges opened this issue May 7, 2024 · 0 comments
Open
4 tasks done
Labels
bug Something isn't working

Comments

@masonmenges
Copy link
Contributor

masonmenges commented May 7, 2024

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

When running a deployment if the flow function has Pydantic Type hinted inputs and no parameters are specified on the deployment default values specified in the function are not coerced into the Pydantic model specified in the function definition.

@flow(
    name="test_flow"
)
def test_flow(
    input: SampleContract = {"field_1": ["val1", "val2"], "field_2": [1, 2]}
) -> SampleContract2:
    logger = get_run_logger()
    logger.info(f"Data at Start of Flow: {input}")

    output = add_fruits(input)

    logger.info(f"Data at End of Flow: {output}")

    return SampleContract2.model_validate(output)

The above code completes successfully when testing locally as the default input is coerced into the model and deployment runs complete successfully when input parameters are specified on the deployment since we are validating those inputs prior to executing the flow run. However running this deployment on a schedule or as a quick run fails since the inputs are not coerced into the Pyadntic model.

It makes sense to set a default that corresponds to the appropriate type like this and in general is something we should encourage

@flow(
    name="test_flow"
)
def test_flow(
    input: SampleContract = SampleContract(field_1 = ["val1", "val2"], field_2= [1, 2])
) -> SampleContract2:
    logger = get_run_logger()
    logger.info(f"Data at Start of Flow: {input}")

    output = add_fruits(input)

    logger.info(f"Data at End of Flow: {output}")

    return SampleContract2.model_validate(output)

However this validation step is being performed when running the flow locally and when passing parameters to the deployment at runtime so the experience could be jarring for an end user expecting that validation step to function the same way when deploying the flow without specifying parameters on the deployment and instead relying on the default values specified on the function.

Reproduction

from pydantic import BaseModel 
from pydantic.types import List
from prefect import flow, task, get_run_logger

class SampleContract(BaseModel):
    field_1: List[str]
    field_2: List[int]

class SampleContract2(BaseModel):
    fruit: List[str]

@task
def add_fruits(input: SampleContract) -> SampleContract2:
    output = input.model_dump()
    output["fruit"] = ["apple", "banana"]
    return output

@flow(
    name="test_flow"
)
def test_flow(
    input: SampleContract = {"field_1": ["val1", "val2"], "field_2": [1, 2]}
) -> SampleContract2:
    logger = get_run_logger()
    logger.info(f"Data at Start of Flow: {input}")

    output = add_fruits(input)

    logger.info(f"Data at End of Flow: {output}")

    return SampleContract2.model_validate(output)

Error

# Error from a scheduled run
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/prefect/engine.py", line 2101, in orchestrate_task_run
    result = await call.aresult()
             ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 327, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 352, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/prefect/mm2-sanbox-main/flows/schema_validation.py", line 14, in add_fruits
    output = input.model_dump()
             ^^^^^^^^^^^^^^^^
AttributeError: 'dict' object has no attribute 'model_dump'
01:50:03 PM
add_fruits-0
prefect.task_runs
Finished in state Failed("Task run encountered an exception AttributeError: 'dict' object has no attribute 'model_dump'")

Versions

Version:             2.18.1
API version:         0.8.4
Python version:      3.12.2
Git commit:          8cff545a
Built:               Thu, Apr 25, 2024 3:40 PM
OS/Arch:             darwin/arm64
Profile:             masonsandbox
Server type:         cloud

Additional context

Example deployment yaml for deploying the reproduction code, specifically the parameters field on the deployment should be left blank

name: mm2-sanbox
prefect-version: 2.18.1

build:
- prefect_docker.deployments.steps.build_docker_image:
    id: build_image
    requires: prefect-docker>=0.3.1
    image_name: some_docker_repo_name
    tag: schemavalidation
    dockerfile: ./Dockerfile

# push section allows you to manage if and how this project is uploaded to remote locations
push:
- prefect_docker.deployments.steps.push_docker_image:
    requires: prefect-docker>=0.3.1
    image_name: '{{ build_image.image_name }}'
    tag: '{{ build_image.tag }}'

# pull section allows you to provide instructions for cloning this project in remote locations
pull:
- prefect.deployments.steps.git_clone:
    repository: https://github.com/masonmenges/mm2-sanbox.git
    branch: main
    access_token: null

# the deployments section allows you to provide configuration for deploying flows
deployments:
- name: schema_validation_fail
  version: null
  tags: []
  description: null
  schedule: {}
  flow_name: null
  entrypoint: flows/schema_validation.py:test_flow
  parameters: {}
  work_pool:
    name: k8s-minikube-test
    work_queue_name: null
    job_variables:
      image: '{{ build_image.image }}'
@masonmenges masonmenges added bug Something isn't working needs:triage Needs feedback from the Prefect product team labels May 7, 2024
@zhen0 zhen0 removed the needs:triage Needs feedback from the Prefect product team label May 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants