Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create an asset factory to generate FERC1 output tables #3557

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

hfireborn
Copy link

draft of asset factories for #3147.

Based on the tutorial in https://dagster.io/blog/python-factory-patterns, creates 3 assets using a factory for: out_ferc1__yearly_purchased_power_and_exchanges_sched326
out_ferc1__yearly_plant_in_service_sched204
out_ferc1__yearly_balance_sheet_assets_sched110

all of which just merge and then organize columns.

a few questions:

  1. Is this the type of solution you were looking for?
  2. double checking: Is it ok to omit the organize_cols parts of those functions?
  3. We are currently omitting the pd.DataFrame input type specification in these assets, is this ok?

@zaneselvans zaneselvans added community ferc1 Anything having to do with FERC Form 1 dagster Issues related to our use of the Dagster orchestrator labels Apr 12, 2024
@zaneselvans zaneselvans linked an issue Apr 12, 2024 that may be closed by this pull request
Copy link
Member

@zaneselvans zaneselvans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is generally the shape of the solution we're looking for, but see my inline comments in the PR.

It looks like you don't have the pre-commit hooks installed locally, or the linters set up in VS Code, as the pre-commit checks and unit tests are failing on the PR, so you'll want to get that set up. The documentation build is also failing. It looks like you've got an undefined variable.

You'll definitely need to try running the ETL via Dagster locally to know whether your changes are working, and debug issues. You'll probably need to run it many many times, but you can just run the portion of the DAG that you're working on, in this case the FERC 1 related parts.

Comment on lines 261 to 270
# pudl.helpers.organize_cols,
# [
# "report_year",
# "utility_id_ferc1",
# "utility_id_pudl",
# "utility_name_ferc1",
# "seller_name",
# "record_id",
# ],
# )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the organize_cols can and should be removed from all of the asset definitions in this module, since the database schema is now what determines the ordering of the columns.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zaneselvans If all of the helpers.organize_cols can be removed, is it necessary to make an asset factory for this? It looks to me like all of the functions are essentially doing the same thing then; they take two DataFrames as inputs, merge them on something hard-coded and the same across all the functions, and then return.

Is there a reason why we can't we just simplify them all into one function and change where the functions are called?

Copy link
Member

@zaneselvans zaneselvans Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need an asset factory that pumps out the function definitions, because those @asset decorated function definitions and their inputs and outputs are what define the dependency graph in Dagster. It might help to read some background on Dagster's software defined assets or some of their examples, and also to try running the ETL through the Dagster UI locally if you haven't already. It looks like they've got some videos too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course each of the assets (functions) defined by the factory will be very similar (which is why we want to use a factory) but it's the additional information from the decorator or the arguments and return value that stitch the functions and the data flowing through them together.

# draft asset_factory
def generate_asset_factory(spec):
@asset(io_manager_key="pudl_io_manager", compute_kind="Python")
def _asset() -> pd.DataFrame:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function which is used to define the asset needs to have arguments, because that's how Dagster infers the dependencies between assets. For more complex cases they can also be defined with arguments in the asset decorator, but I don't think that's necessary here. You can't just add them inside the inner function definition.

Search for "_asset_factory" in the code base to find other examples of how we're using this pattern.

{
"name": "out_ferc1__yearly_purchased_power_and_exchanges_sched326",
"df1": "core_ferc1__yearly_purchased_power_and_exchanges_sched326",
"df2": "core_pudl__assn_ferc1_pudl_utilities",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in all cases, df2 is the same. So it doesn't need to be part of these inputs. It can be hard-coded as one of the input assets that are used by the asset definition (I think it's used in all of the FERC 1 output assets)

## example/draft factory pattern
specs = [
{
"name": "out_ferc1__yearly_purchased_power_and_exchanges_sched326",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that in all cases, there's an easy way to construct the input (core) and output asset names, given just a single string, (purchased_power_and_exchanges_sched326 in this case) -- they all have the same prefix structure. So you only need to store that string.

"name": "out_ferc1__yearly_purchased_power_and_exchanges_sched326",
"df1": "core_ferc1__yearly_purchased_power_and_exchanges_sched326",
"df2": "core_pudl__assn_ferc1_pudl_utilities",
"mg": "utility_id_ferc1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, they all seem to get merged on the same column, so this doesn't need to be part of the specification -- it can just be hard-coded inside the inner factory function definition (unless there are some tables that need a different merge key)

@zaneselvans zaneselvans changed the title #3147 draft1 Create an asset factory to generate FERC1 output tables Apr 16, 2024
@hfireborn
Copy link
Author

We've created an asset factory that appeared to work locally on dagster. When trying to push our code, we ran into some errors getting our codebase up to date that we are currently resolving.

@hfireborn
Copy link
Author

We've created an asset factory that appeared to work locally on dagster. When trying to push our code, we ran into some errors getting our codebase up to date that we are currently resolving.

@zaneselvans For some reason we are failing unit tests when we try to commit that seem unrelated to the code in output/ferc1.py we were working with.

=========================== short test summary info ============================
FAILED test/unit/workspace/datastore_test.py::TestDatapackageDescriptor::test_get_partition_filters - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestDatapackageDescriptor::test_get_resource_path - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestDatapackageDescriptor::test_get_resources_filtering - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestDatapackageDescriptor::test_json_string_representation - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestDatapackageDescriptor::test_modernize_zenodo_legacy_api_url - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestZenodoFetcher::test_doi_format_is_correct - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestZenodoFetcher::test_doi_of_prod_epacems_matches - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestZenodoFetcher::test_get_descriptor_http_calls - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestZenodoFetcher::test_get_known_datasets - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestZenodoFetcher::test_get_resource - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestZenodoFetcher::test_get_resource_with_invalid_checksum - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestZenodoFetcher::test_get_resource_with_nonexistent_resource_fails - TypeError: Package.metadata_validate() missing 1 required positional argume...
FAILED test/unit/workspace/datastore_test.py::TestZenodoFetcher::test_get_unknown_dataset - TypeError: Package.metadata_validate() missing 1 required positional argume...
= 13 failed, 1367 passed, 1 skipped, 3 deselected, 9 xfailed, 44 warnings in 32.77s =

conda-lock...........................................(no files to check)Skipped

We thought it was a package update issue, guessing that we were pulling code from the remote but not updating the packages that accompany them, so we tried:

make conda-clean
make conda-lock.yml
make install-pudl

but we are still receiving these errors. Not sure where to go from here, any help is appreciated!

@zaneselvans
Copy link
Member

Those unit test errors look like they result from having an older version of the frictionless package installed in your environment. We recently updated from v4 to v5.

I would remove any local changes to the conda lockfiles, remove your existing pudl-dev environment, make sure your local branch has all the most recent changes from main upstream, and rebuild the conda environment, which should look something like:

git checkout -- environments
mamba deactivate
mamba env remove -n pudl-dev
git pull
make install-pudl
mamba activate pudl-dev

Given the changes you're making, you shouldn't need to run make conda-clean or make conda-lock.yml, which will update/change the locked dependencies.

Comment on lines +449 to +453
specs = [
{"name": "ferc1__yearly_purchased_power_and_exchanges_sched326"},
{"name": "ferc1__yearly_plant_in_service_sched204"},
{"name": "ferc1__yearly_balance_sheet_assets_sched110"},
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be stored in a separate variable or a dictionary. The list of AssetsDefinitions can be built using a list comprehension that iterates over a list of strings (the base table name).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the specs variable and just list a list of the table names with no prefix in a list comprehension to generate the assets.

Comment on lines 457 to 476
def generate_asset_factory(spec) -> AssetsDefinition:
var_name = "core_" + spec["name"]
core_ = globals()[var_name]

@asset(
name=f"_out_{get_core_ferc1_asset_description(spec["name"])}",
io_manager_key="pudl_io_manager",
compute_kind="Python",
)
def _asset(
core_: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Generate a dataframe for {} asset specification.""".format(spec["name"])
return_df = core_.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
)
return return_df

return _asset
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more specific name would be helpful, and I think this can be as simple as just taking a string as input:

Suggested change
def generate_asset_factory(spec) -> AssetsDefinition:
var_name = "core_" + spec["name"]
core_ = globals()[var_name]
@asset(
name=f"_out_{get_core_ferc1_asset_description(spec["name"])}",
io_manager_key="pudl_io_manager",
compute_kind="Python",
)
def _asset(
core_: pd.DataFrame,
core_pudl__assn_ferc1_pudl_utilities: pd.DataFrame,
) -> pd.DataFrame:
"""Generate a dataframe for {} asset specification.""".format(spec["name"])
return_df = core_.merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
)
return return_df
return _asset
def ferc1_output_asset_factory(table_name: str) -> AssetsDefinition:
@asset(
name=f"out_ferc1__{table_name}",
ins={
core_pudl__assn_ferc1_pudl_utilities: AssetIn(),
f"core_ferc1__{table_name}": AssetIn(),
},
io_manager_key="pudl_io_manager",
compute_kind="Python",
)
def _ferc1_output_asset(**kwargs) -> pd.DataFrame:
f"""Generate a dataframe for out_ferc1__{table_name} asset specification."""
return kwargs[core_ferc1__{table_name}].merge(
core_pudl__assn_ferc1_pudl_utilities, on="utility_id_ferc1"
)
return _ferc1_output_asset

Comment on lines 482 to 492
def create_generated_assets() -> list[AssetsDefinition]:
"""Create a list of generated FERC Form 1 assets.

Returns:
A list of :class:`AssetsDefinitions` where each asset is an generated FERC Form 1
table.
"""
return [generate_asset_factory(**kwargs) for kwargs in specs]


exploded_ferc1_assets = create_generated_assets()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a whole separate function is needed here, we can generate the AssetsDefinitions for dagster to pick up on module import with something like:

ferc1_output_assets = [
    ferc1_output_asset_factory(table_name) for table name in
    [
        "yearly_purchased_power_and_exchanges_sched326",
        "yearly_plant_in_service_sched204",
        "yearly_balance_sheet_assets_sched110",
    ]
]

@hfireborn
Copy link
Author

@zaneselvans our most recent commit from right before our meeting has some updated changes to it as well as fixing the merge conflicts, if you are able to take a look at that.

Comment on lines +449 to +453
specs = [
{"name": "ferc1__yearly_purchased_power_and_exchanges_sched326"},
{"name": "ferc1__yearly_plant_in_service_sched204"},
{"name": "ferc1__yearly_balance_sheet_assets_sched110"},
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the specs variable and just list a list of the table names with no prefix in a list comprehension to generate the assets.

def _asset(
**kwargs: dict[str, pd.DataFrame],
) -> pd.DataFrame:
"""Generate a dataframe for {} asset specification.""".format(spec["name"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use f-string formatting here.

)
return out_ferc1__yearly_balance_sheet_assets_sched110

def create_generated_assets() -> list[AssetsDefinition]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this function and use a list comprehension that iterates of the table names with no prefix to call the asset factory function.

"record_id",
],
# draft asset_factory
def generate_asset_factory(spec) -> AssetsDefinition:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename to ferc1_output_asset_factory and have the factory depend on a string, which is the table name with no prefix, rather than spec.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community dagster Issues related to our use of the Dagster orchestrator ferc1 Anything having to do with FERC Form 1
Projects
Status: In progress
Development

Successfully merging this pull request may close these issues.

Consolidate ferc1 outputs using Dagster asset factories
2 participants