write test for dagster asset job

I am trying to write a simple test for a dagster job and I can't get it through...

I am using dagster 1.3.6

So I have defined this job using the function dagster.define_asset_job

from dagster import define_asset_job
my_job: UnresolvedAssetJobDefinition = define_asset_job( name='name_for_my_job', selection=AssetSelection.assets( source_asset_1, source_asset_2, asset_1, asset_2 )
)

Intuitive try

By reading the documentation, I figured that I had to call the execute_in_process method, which is defined in the JobDefinition class.

from my_package import my_job
def test_documentation(): result = my_job.execute_in_process() assert result.success

But like I've highligted in the first code block, my_job is of type UnresolvedAssetJobDefinition. By digging a bit more in the code, I see that there is a resolve method, which returns a JobDefinition.

So I wanted to do that, but I've seen that you can't call resolve without parameter; you are required to provide asset_graph.

But it's exactly what I was trying to avoid. I don't want to provide the list of the assets/source assets, I want them to be deduced from the job definition.

Journey

I've seen that in addition to the UnresolvedAssetJobDefinition.resolve().execute_in_process(), I could look at the materialize_to_memory function; but I faced the same issue: I need to provide a list of assets.

I spent some time trying to get the assets out of the UnresolvedAssetJobDefinition. I've seen that there is a .selection property that allows me to get a KeysAssetSelection, which basically contains a list of AssetKey.

But I need a list of Union[AssetsDefinition, SourceAsset] and I don't know how to convert an AssetKey into an AssetDefinition.

Last try

Hereafter there is my last try, you can see that I am just trying to wire things together, as a admission of my weakness I am not even trying to use the job definition to get the assets.

import pytest
from my_package import my_job, source_asset_1, source_asset_2, asset_1, asset_2
from dagster._core.definitions.asset_graph import AssetGraph
@pytest.fixture
def test_resources() -> Mapping[str, object]: return { "parquet_io_manager": parquet_io_manager.configured({'data_path': DATA_FOLDER }), }
def test_my_job( test_resources: Mapping[str, object],
): graph = AssetGraph.from_assets([source_asset_1, source_asset_2, asset_1, asset_2]) job = my_job.resolve(asset_graph=graph) result = job.execute_in_process(resources=test_resources) assert result.success

but I can't quite get what I want. In the last example, I got this error

dagster._core.errors.DagsterInvalidSubsetError: AssetKey(s) {AssetKey(['source_asset_1']), AssetKey(['source_asset_2']), AssetKey(['asset_1']), AssetKey(['asset_2'])} were selected, but no AssetsDefinition objects supply these keys. Make sure all keys are spelled correctly, and all AssetsDefinitions are correctly added to the Definitions.

Help

I know that I can test each asset by just importing and calling the function decorated by the @asset dagster keyword. But I want to be able to launch all the assets from the job, without having to rewrite this test function.

Do you think that it's something possible? Am I doing something wrong? I must miss something obvious... any help would be appreciated.

Have a nice day!

2

1 Answer

The object that's produced by define_asset_job does not include object references to the asset definitions selected in the job.

This means that, to execute an asset job in-process, you need to somehow pass those asset definition object references.

One way to do this is through a Definitions object:

from dagster import
from my_package import my_module
my_job = define_asset_job("my_job", ...)
all_assets = load_assets_from_modules([my_module])
defs = Definitions(assets=all_assets, jobs=[my_job])
result = defs.get_job_def("my_job").execute_in_process()

Your Answer

Sign up or log in

Sign up using Google Sign up using Facebook Sign up using Email and Password

Post as a guest

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge that you have read and understand our privacy policy and code of conduct.

You Might Also Like