Ask AI

You are viewing an unreleased or outdated version of the documentation

Schedule examples#

This reference contains a variety of examples using Dagster schedules. Each example contains:

  • A summary
  • Additional notes
  • Links to relevant documentation
  • A list of the APIs used in the example

Defining basic schedules#

The following examples demonstrate how to define some basic schedules.

This example demonstrates how to define a schedule using ScheduleDefinition that will run a job every day at midnight. While this example uses op jobs (@job), the same approach will work with asset jobs (define_asset_job).

@job
def my_job(): ...


basic_schedule = ScheduleDefinition(job=my_job, cron_schedule="0 0 * * *")
NotesThe cron_schedule argument accepts standard cron expressions. If your croniter dependency's version is >= 1.0.12, the argument will also accept the following:
  • @daily
  • @hourly
  • @monthly
Related docs
APIs in this example@job, ScheduleDefinition

Emitting log messages from schedule evaluations#

This example demonstrates how to emit log messages from a schedule during its evaluation function. These logs will be visible in the UI when you inspect a tick in the schedule's tick history.

@schedule(job=my_job, cron_schedule="* * * * *")
def logs_then_skips(context):
    context.log.info("Logging from a schedule!")
    return SkipReason("Nothing to do")
NotesSchedule logs are stored in your Dagster instance's compute log storage. You should ensure that your compute log storage is configured to view your schedule logs.
Related docs
APIs in this example@schedule, ScheduleDefinition, SkipReason

Using resources in schedules#

This example demonstrates how to use resources in schedules. To specify a resource dependency, annotate the resource as a parameter to the schedule's function.

from dagster import (
    schedule,
    ScheduleEvaluationContext,
    ConfigurableResource,
    job,
    RunRequest,
    RunConfig,
    Definitions,
)
from datetime import datetime
from typing import List

class DateFormatter(ConfigurableResource):
    format: str

    def strftime(self, dt: datetime) -> str:
        return dt.strftime(self.format)

@job
def process_data(): ...

@schedule(job=process_data, cron_schedule="* * * * *")
def process_data_schedule(
    context: ScheduleEvaluationContext,
    date_formatter: DateFormatter,
):
    formatted_date = date_formatter.strftime(context.scheduled_execution_time)

    return RunRequest(
        run_key=None,
        tags={"date": formatted_date},
    )

defs = Definitions(
    jobs=[process_data],
    schedules=[process_data_schedule],
    resources={"date_formatter": DateFormatter(format="%Y-%m-%d")},
)
NotesAll Dagster definitions, including schedules and resources, must be attached to a Definitions call.
Related docs
APIs in this example

Configuring job behavior based on scheduled run time#

This example demonstrates how to use run config to vary the behavior of a job based on its scheduled run time.

@op(config_schema={"scheduled_date": str})
def configurable_op(context: OpExecutionContext):
    context.log.info(context.op_config["scheduled_date"])


@job
def configurable_job():
    configurable_op()


@schedule(job=configurable_job, cron_schedule="0 0 * * *")
def configurable_job_schedule(context: ScheduleEvaluationContext):
    scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
    return RunRequest(
        run_key=None,
        run_config={
            "ops": {"configurable_op": {"config": {"scheduled_date": scheduled_date}}}
        },
        tags={"date": scheduled_date},
    )
Notes
Related docs Op jobs
APIs in this example@op, @job, OpExecutionContext, ScheduleEvaluationContext, RunRequest

Customizing execution timezones#

This example demonstrates how to customize the timezone a schedule executes in. The schedule in this example will execute every day at 9AM in US/Pacific time.

my_timezone_schedule = ScheduleDefinition(
    job=my_job, cron_schedule="0 9 * * *", execution_timezone="America/Los_Angeles"
)
Notes
  • The @schedule decorator also accepts the execution_timezone argument
  • Schedules without a set timezone will run in UTC.
  • Schedules from partitioned jobs execute in the timezone defined on the partitioned config
APIs in this example@ScheduleDefinition

Constructing schedules for partitioned assets and jobs#

This section demonstrates how to use schedules with partitions. We'll cover:

  • Using a helper function to automatically construct schedules based on the partition's config
  • Using @schedule to manually construct schedules

Automatically constructing schedules#

The follow examples demonstrate how to automatically construct schedules for partitioned assets and jobs using a helper function. These examples use build_schedule_from_partitioned_job, which will build a schedule with a cadence that matches the spacing of the partitions in the asset or job.

This approach can be used with time or static-based partitions.

Partitioned assets#

This example demonstrates how to automatically construct a schedule for a time-partitioned asset using build_schedule_from_partitioned_job.

from dagster import (
    asset,
    build_schedule_from_partitioned_job,
    define_asset_job,
    DailyPartitionsDefinition,
)

daily_partition = DailyPartitionsDefinition(start_date="2024-05-20")


@asset(partitions_def=daily_partition)
def daily_asset(): ...


partitioned_asset_job = define_asset_job("partitioned_job", selection=[daily_asset])


asset_partitioned_schedule = build_schedule_from_partitioned_job(
    partitioned_asset_job,
)
NotesIf the partition has a timezone defined, the schedule will execute in the timezone specified on the partitioned config.
Related docs
APIs in this example

Manually constructing schedules#

This example demonstrates how to manually construct a schedule for a job with a static partition from scratch using the @schedule decorator.

Using @schedule allows for more flexibility in determining which partitions should be run by the schedule, rather than using build_schedule_from_partitioned_job which automatically creates the schedule based on the partitioned config.

from dagster import schedule, RunRequest


@schedule(cron_schedule="0 0 * * *", job=continent_job)
def continent_schedule():
    for c in CONTINENTS:
        yield RunRequest(run_key=c, partition_key=c)
Related docs
APIs in this example@schedule, RunRequest

Testing schedules#

Refer to the Testing schedules guide to view examples of tests alongside the schedules they target.


Want more inspiration?#

If you're looking for additional inspiration, we recommend:

  • Dagster Open Platform, which is Dagster Lab's open-source data platform. This full-sized project contains real assets and other Dagster features used by the Dagster Labs team.
  • GitHub Discussions, where you can ask questions and get inspired by the Dagster community
  • The Awesome Dagster repository, which is a collection of all awesome things related to Dagster, including other users' projects, talks, articles, and more