BigQuery example query

This example shows how to use a Flyte BigQueryTask to execute a query.

Note

To clone and run the example code on this page, see the Flytesnacks repo.

bigquery_plugin/bigquery_plugin_example.py
# %% [markdown]
# (bigquery_plugin_example)=
# # BigQuery plugin example
#
# %%

try:
    from typing import Annotated

This is the world’s simplest query. Note that in order for registration to work properly, you’ll need to give your BigQuery task a name that’s unique across your project/domain for your Flyte installation.

bigquery_plugin/bigquery_plugin_example.py
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask

# Note that in order for registration to work properly, you'll need to give your
# BigQuery task a name that's unique across your project/domain for your Flyte installation.
bigquery_task_no_io = BigQueryTask(
    name="sql.bigquery.no_io",
    inputs={},
    query_template="SELECT 1",
    task_config=BigQueryConfig(ProjectID="flyte"),

Of course, in real world applications we are usually more interested in using BigQuery to query a dataset. In this case we use crypto_dogecoin data which is public dataset in BigQuery. here

Let’s look out how we can parameterize our query to filter results for a specific transaction version, provided as a user input specifying a version.

bigquery_plugin/bigquery_plugin_example.py

@workflow
def no_io_wf():
    return bigquery_task_no_io()


DogeCoinDataset = Annotated[StructuredDataset, kwtypes(hash=str, size=int, block_number=int)]

bigquery_task_templatized_query = BigQueryTask(
    name="sql.bigquery.w_io",

StructuredDataset transformer can convert query result to pandas dataframe here. We can also change “pandas.dataframe” to “pyarrow.Table”, and convert result to Arrow table.

bigquery_plugin/bigquery_plugin_example.py
    output_structured_dataset_type=DogeCoinDataset,
    task_config=BigQueryConfig(ProjectID="flyte"),
    query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
)


@task
def convert_bq_table_to_pandas_dataframe(sd: DogeCoinDataset) -> pd.DataFrame:
    return sd.open(pd.DataFrame).all()

Check query result on bigquery console: https://console.cloud.google.com/bigquery