Ask AI

You are viewing an unreleased or outdated version of the documentation

GCP + PySpark (dagster-gcp-pyspark)

Google BigQuery

This library provides an integration with the BigQuery database and PySpark data processing library.

Related Guides:

dagster_gcp_pyspark.BigQueryPySparkIOManager IOManagerDefinition[source]

Config Schema:
project (dagster.StringSource):

The GCP project to use.

dataset (Union[dagster.StringSource, None], optional):

Name of the BigQuery dataset to use. If not provided, the last prefix before the asset name will be used.

Default Value: None

location (Union[dagster.StringSource, None], optional):

The GCP location. Note: When using PySpark DataFrames, the default location of the project will be used. A custom location can be specified in your SparkSession configuration.

Default Value: None

gcp_credentials (Union[dagster.StringSource, None], optional):

GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64

Default Value: None

temporary_gcs_bucket (Union[dagster.StringSource, None], optional):

When using PySpark DataFrames, optionally specify a temporary GCS bucket to store data. If not provided, data will be directly written to BigQuery.

Default Value: None

timeout (Union[Float, None], optional):

When using Pandas DataFrames, optionally specify a timeout for the BigQuery queries (loading and reading from tables).

Default Value: None

An I/O manager definition that reads inputs from and writes PySpark DataFrames to BigQuery.

Returns:

IOManagerDefinition

Examples

from dagster_gcp_pyspark import BigQueryPySparkIOManager
from dagster import Definitions, EnvVar

@asset(
    key_prefix=["my_dataset"]  # will be used as the dataset in BigQuery
)
def my_table() -> pyspark.sql.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": BigQueryPySparkIOManager(project=EnvVar("GCP_PROJECT"))
    }
)

You can set a default dataset to store the assets using the dataset configuration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table],
    resources={
            "io_manager": BigQueryPySparkIOManager(project=EnvVar("GCP_PROJECT", dataset="my_dataset")
        }
)

On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_dataset"]  # will be used as the dataset in BigQuery
)
def my_table() -> pyspark.sql.DataFrame:
    ...

@asset(
    # note that the key needs to be "schema"
    metadata={"schema": "my_dataset"}  # will be used as the dataset in BigQuery
)
def my_other_table() -> pyspark.sql.DataFrame:
    ...

For ops, the dataset can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
    ...

If none of these is provided, the dataset will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    # my_table will just contain the data from column "a"
    ...

If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the “gcp_credentials” configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64

class dagster_gcp_pyspark.BigQueryPySparkTypeHandler[source]

Plugin for the BigQuery I/O Manager that can store and load PySpark DataFrames as BigQuery tables.

Examples

from dagster_gcp import BigQueryIOManager
from dagster_bigquery_pandas import BigQueryPySparkTypeHandler
from dagster import Definitions, EnvVar

class MyBigQueryIOManager(BigQueryIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [BigQueryPySparkTypeHandler()]

@asset(
    key_prefix=["my_dataset"]  # my_dataset will be used as the dataset in BigQuery
)
def my_table() -> pyspark.sql.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": MyBigQueryIOManager(project=EnvVar("GCP_PROJECT"))
    }
)

Legacy

dagster_gcp_pyspark.bigquery_pyspark_io_manager IOManagerDefinition

Config Schema:
project (dagster.StringSource):

The GCP project to use.

dataset (Union[dagster.StringSource, None], optional):

Name of the BigQuery dataset to use. If not provided, the last prefix before the asset name will be used.

Default Value: None

location (Union[dagster.StringSource, None], optional):

The GCP location. Note: When using PySpark DataFrames, the default location of the project will be used. A custom location can be specified in your SparkSession configuration.

Default Value: None

gcp_credentials (Union[dagster.StringSource, None], optional):

GCP authentication credentials. If provided, a temporary file will be created with the credentials and GOOGLE_APPLICATION_CREDENTIALS will be set to the temporary file. To avoid issues with newlines in the keys, you must base64 encode the key. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_AUTH_CREDENTIALS | base64

Default Value: None

temporary_gcs_bucket (Union[dagster.StringSource, None], optional):

When using PySpark DataFrames, optionally specify a temporary GCS bucket to store data. If not provided, data will be directly written to BigQuery.

Default Value: None

timeout (Union[Float, None], optional):

When using Pandas DataFrames, optionally specify a timeout for the BigQuery queries (loading and reading from tables).

Default Value: None

An I/O manager definition that reads inputs from and writes PySpark DataFrames to BigQuery.

Returns:

IOManagerDefinition

Examples

from dagster_gcp_pyspark import bigquery_pyspark_io_manager
from dagster import Definitions

@asset(
    key_prefix=["my_dataset"]  # will be used as the dataset in BigQuery
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...

defs = Definitions(
    assets=[my_table],
    resources={
        "io_manager": bigquery_pyspark_io_manager.configured({
            "project" : {"env": "GCP_PROJECT"}
        })
    }
)

You can set a default dataset to store the assets using the dataset configuration value of the BigQuery I/O Manager. This dataset will be used if no other dataset is specified directly on an asset or op.

defs = Definitions(
    assets=[my_table],
    resources={
            "io_manager": bigquery_pandas_io_manager.configured({
                "project" : {"env": "GCP_PROJECT"}
                "dataset": "my_dataset"
            })
        }
)

On individual assets, you an also specify the dataset where they should be stored using metadata or by adding a key_prefix to the asset key. If both key_prefix and metadata are defined, the metadata will take precedence.

@asset(
    key_prefix=["my_dataset"]  # will be used as the dataset in BigQuery
)
def my_table() -> pyspark.sql.DataFrame:
    ...

@asset(
    # note that the key needs to be "schema"
    metadata={"schema": "my_dataset"}  # will be used as the dataset in BigQuery
)
def my_other_table() -> pyspark.sql.DataFrame:
    ...

For ops, the dataset can be specified by including a “schema” entry in output metadata.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pyspark.sql.DataFrame:
    ...

If none of these is provided, the dataset will default to “public”.

To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    # my_table will just contain the data from column "a"
    ...

If you cannot upload a file to your Dagster deployment, or otherwise cannot authenticate with GCP via a standard method, you can provide a service account key as the “gcp_credentials” configuration. Dagster will store this key in a temporary file and set GOOGLE_APPLICATION_CREDENTIALS to point to the file. After the run completes, the file will be deleted, and GOOGLE_APPLICATION_CREDENTIALS will be unset. The key must be base64 encoded to avoid issues with newlines in the keys. You can retrieve the base64 encoded key with this shell command: cat $GOOGLE_APPLICATION_CREDENTIALS | base64