external_code.py which is a standalone Python script that you want to orchestrate with Dagster.
dagster_code.py which includes a Dagster asset and other Dagster definitions.
In this section, you'll learn how to modify the standalone Python script to work with Dagster Pipes in order to stream information back to Dagster. To do this, you'll:
Step 1: Make Dagster context available in external code#
Getting external code to send information back to Dagster via Dagster Pipes requires adding a few lines of code:
Imports from dagster-pipes
A call that connects to Dagster Pipes: open_dagster_pipes initializes the Dagster Pipes context that can be used to stream information back to Dagster. We recommend calling this function near the entry point of a pipes session.
The with open_dagster_pipes(): is a context manager in Python, ensuring resource setup and cleanup for a specific segment of code. It's useful for tasks requiring initial setup and final teardown, like opening and closing connections. In this case, the context manager is used to initialize and close the Dagster Pipes connection.
An instance of the Dagster Pipes context via PipesContext.get. You can access information like partition_key and asset_key via this context object. Refer to the the API documentation for more information.
In our sample Python script, the changes would look like the following:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})
total_orders =len(orders_df)# get the Dagster Pipes context
context = PipesContext.get()print(f"processing total {total_orders} orders")if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
Dagster Pipes context offers a built-in logging capability that enables you to stream log messages back to Dagster. Instead of printing to the standard output, you can use the context.log method on PipesContext to send log messages back to Dagster. In this case, we’re sending an info level log message:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})
total_orders =len(orders_df)# get the Dagster Pipes context
context = PipesContext.get()
context.log.info(f"processing total {total_orders} orders")if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
Then, the log messages will show up in the Run details page of the Dagster UI. You can filter the log levels to only view info level messages:
Click the Levels filter next to the log filter field. This will present a dropdown of all log levels.
Select the info checkbox and deselect the others. This will show only the logs marked as info level.
Sometimes, you may want to log information from your external code as structured metadata shown in the Dagster UI. Dagster Pipes context also comes with the ability to log structured metadata back to Dagster.
In this example, we’re passing a piece of metadata named total_orders to the metadata parameter of the PipesContext.report_asset_materialization. This payload will be sent from the external process back to Dagster:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})
total_orders =len(orders_df)# get the Dagster Pipes context
context = PipesContext.get()# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
Then, total_orders will show up in the UI as structured metadata:
This metadata will also be displayed on the Events tab of the Asset Details page in the UI:
Dagster allows you to define and execute data quality checks on assets. Refer to the Asset Checks documentation for more information.
If your asset has data quality checks defined, you can report to Dagster that an asset check has been performed via PipesContext.report_asset_check:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})
total_orders =len(orders_df)# get the Dagster Pipes context
context = PipesContext.get()# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})# report data quality check result back to Dagster
context.report_asset_check(
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",)if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
At this point, your two files should look like the following:
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes
defmain():
orders_df = pd.DataFrame({"order_id":[1,2],"item_id":[432,878]})
total_orders =len(orders_df)# get the Dagster Pipes context
context = PipesContext.get()# send structured metadata back to Dagster
context.report_asset_materialization(metadata={"total_orders": total_orders})# report data quality check result back to Dagster
context.report_asset_check(
passed=orders_df[["item_id"]].notnull().all().bool(),
check_name="no_empty_order_check",)if __name__ =="__main__":# connect to Dagster Pipeswith open_dagster_pipes():
main()
In this tutorial, you learned how to get access to Dagster Pipes context, report log messages events from the external process, and send structured events back to Dagster.
What's next? From here, you can:
Learn about other capabilities of executing external code in subprocess via Dagster Pipes in the Subprocess reference