Skip to content

WherobotsSqlOperator

Use the WherobotsSqlOperator to execute SQL queries on Wherobots Cloud against your datasets in your Wherobots catalogs.

Using the Operator

The WherobotsSqlOperator requires a sql argument, which can be a SQL query string, or a list of query strings. You can also optionally specify the runtime you want to use to power your query.

Below is a simple example of using the operator.

simple-operator-example.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from wherobots.db.runtime import Runtime
from wherobots.db.region import Region
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator

operator = WherobotsSqlOperator(
        # region parameter establishes a connection to a specified AWS cloud provider region.
        # Replace 'Region.AWS_US_WEST_2' with the desired AWS region, for example:
        # - For AWS US East (N. Virginia): region=Region.AWS_US_EAST_1
        region=Region.AWS_US_WEST_2,
        task_id="execute_query",
        # runtime parameter specifies the compute resources allocated for the runtime environment.
        # Replace 'Runtime.TINY' with the desired runtime size, for example:
        # - For a small runtime: runtime=Runtime.SMALL
        # - For a medium runtime: runtime=Runtime.MEDIUM
        runtime=Runtime.TINY,
        sql="""
        SELECT id, geometry, confidence, geohash
        FROM wherobots_open_data.overture.places_place
        LIMIT 100
        """,
        return_last=False,
    )

Runtime and region selection

You can choose the Wherobots runtime you want to use with the runtime parameter, passing in one of the Runtime enum values.

For guidance on runtime sizing and selection, see Runtimes.

Region parameter will become mandatory

To prepare for the expansion of Wherobots Cloud to new regions and cloud providers, the region parameter will become mandatory in a future SDK version.

Before this support for new regions is added, we will release an updated version of the SDK.

If you continue using an older SDK version, your existing Airflow tasks will still work. However, any new or existing tasks you create without specifying the region parameter will be hosted in the aws-us-west-2 region.

The following AWS regions are currently supported in the Wherobots Spatial SQL API:

Wherobots Parameter AWS Region Name AWS Region Code Access
Region.AWS_US_WEST_2 Oregon us-west-2 All Organization Editions
Region.AWS_EU_WEST_1 Ireland eu-west-1 Paid Organizations Only
Region.AWS_US_EAST_1 Northern Virginia us-east-1 Paid Organizations Only

You can see the runtimes available to your organization within the Start a Notebook dropdown in Wherobots Cloud.

Build ETL pipelines with the WherobotsSqlOperator

Loading or creating tables into the Wherobots Catalog allows you to query, process, and work with your data using pure SQL queries. In this example, we'll use a SQL query to create a new table from the result of a query on an existing table of the Overture Maps public dataset.

First, create a new database in your wherobots catalog. You can execute those SQL queries using our Spatial SQL API or from a notebook.

CREATE DATABASE IF NOT EXISTS wherobots.test_db

Now we build a new table called wherobots.test_db.top_100_hot_buildings_daily from the query result on tables in the wherobots_open_data catalog. It finds out the 100 buildings from wherobots_open_data.overture.buildings_building table that contains the most points recorded in wherobots_open_data.overture.places_place table at 2023-07-24.

CREATE TABLE wherobots.test_db.top_100_hot_buildings_daily AS
SELECT buildings.id, first(buildings.names) as names, count(places.geometry), '2023-07-24' as ts
    FROM wherobots_open_data.overture.places_place places
         JOIN wherobots_open_data.overture.buildings_building buildings
         ON ST_CONTAINS(buildings.geometry, places.geometry)
    WHERE places.updatetime >= '2023-07-24'
          AND places.updatetime < '2023-07-25'
          AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
          AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
    GROUP BY 1
    ORDER BY 3 desc
    LIMIT 100

Now you can query the resulting table to verify the results:

SELECT * FROM wherobots.test_db.top_100_hot_buildings_daily
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|id                                                                  |first(names)                                                                                                                                                           |count(geometry)|ts        |
+--------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------+
|tmp_72313131373137393340342D333031342E373539343833333034343439      |{common -> [{value -> Grand Central Terminal, language -> local}]}                                                                                                     |624            |2023-07-24|
|tmp_7732373634313033373340312D333032302E333535353036393230383134    |{}                                                                                                                                                                     |476            |2023-07-24|
|tmp_77343332313331324034342D333436352E32313130363731393438343132    |{common -> [{value -> CF ν† λ‘ ν†  이튼 μ„Όν„°, language -> ko}, {value -> CF Toronto Eaton Centre, language -> en}, {value -> CF Toronto Eaton Centre, language -> local}]}|381            |2023-07-24|
|tmp_773337333136323234364031352D333437312E38313636303038323138393035|{common -> [{value -> Square One, language -> local}]}                                                                                                                 |360            |2023-07-24|
|tmp_7731393838373435394034312D333437342E31313337343135303734303133  |{common -> [{value -> Yorkdale Shopping Centre, language -> local}]}                                                                                                   |258            |2023-07-24|
|tmp_7733323836363031393440362D323939382E373635383131313635353434    |{common -> [{value -> Roosevelt Field Mall, language -> local}]}                                                                                                       |249            |2023-07-24|
|tmp_723238363531343040332D333035382E30383231323232313334323134      |{}

To turn this ETL into a daily process orchestrated by Apache Airflow, bring the query into your DAG's definition of the WherobotsSqlOperator, changing the CREATE TABLE ... AS into INSERT INTO ... to append new data each day into your table, and leveraging Apache Airflow's macros for the daily date range.

Below is an example DAG file. The macros variables {{ ds }} and {{ next_ds }} will be replaced dynamically by the actual schedule time.

example-DAG.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import datetime

from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator

from wherobots.db.region import Region
from wherobots.db.runtime import Runtime

with DAG(
    dag_id="example_wherobots_sql_dag",
    start_date=datetime.datetime.strptime("2023-07-24", "%Y-%m-%d"),
    schedule="@daily",
    catchup=True,
    max_active_runs=1,
):
    operator = WherobotsSqlOperator(
            region=Region.AWS_US_WEST_2,
            task_id="execute_query",
            wait_for_downstream=True,
            sql="""
            INSERT INTO wherobots.test_db.top_100_hot_buildings_daily
            SELECT buildings.id, first(buildings.names), count(places.geometry), '{{ ds }}' as ts
            FROM wherobots_open_data.overture.places_place places
                 JOIN wherobots_open_data.overture.buildings_building buildings
                 ON ST_CONTAINS(buildings.geometry, places.geometry)
            WHERE places.updatetime >= '{{ ds }}'
                  AND places.updatetime < '{{ next_ds }}'
                  AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
                  AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
            GROUP BY 1
            ORDER BY 3 desc
            LIMIT 100
            """,
            return_last=False,
        )

Test your DAG file

There are two ways to test the DAG file, within the Airflow UI or through pytest framework.

You can also refer to the official Apache Airflow Guidance for DAG testing best practices.

Test in Airflow UI

You can put the DAG file into the $AIRFLOW_HOME/dags directory and trigger the DAG from the Airflow UI.

Below is an example run of the DAG file. You will find the exact queries executed from the logs.

example-dag-run

  • If you are launching Apache Airflow instance through airflow standalone, and you are working on macOS, you may need to execute the following line:

    # To handle the issue https://bugs.python.org/issue28342
    export no_proxy=*
    # Then launch your Apache Airflow standalone instance
    airflow standalone
    
  • The second batch will fail because there is no data in the source tables at after 2023-07-24.

Test using pytest

Pytest is an open-source testing framework for Python.

It can be used to write various types of software tests, including unit tests, integration tests, end-to-end tests, and functional tests. For more information on installing and using pytest, refer to the pytest PyPi page.

Example DAG with pytest

The following is an example Python file that demonstrates how to use your DAG with pytest:

example-DAG-with-pytest.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import datetime

import pendulum
import pytest
import uuid
from airflow import DAG
from airflow_providers_wherobots.operators.sql import WherobotsSqlOperator

from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

TEST_DAG_ID = "my_custom_operator_dag" + str(uuid.uuid4())
TEST_TASK_ID = "my_custom_operator_task" + str(uuid.uuid4())
DATA_INTERVAL_START = pendulum.datetime(2023, 7, 24, tz="UTC")
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)


@pytest.fixture()
def dag():
    with DAG(
        dag_id=TEST_DAG_ID,
        start_date=datetime.datetime.strptime("2023-07-24", "%Y-%m-%d"),
        schedule="@daily",
        catchup=True,
        max_active_runs=1,
    ) as dag:
        operator = WherobotsSqlOperator(
            region=Region.AWS_US_WEST_2,
            task_id=TEST_TASK_ID,
            wait_for_downstream=True,
            sql="""
                INSERT INTO wherobots.test_db.top_100_hot_buildings_daily
                SELECT buildings.id, first(buildings.names), count(places.geometry), '{{ ds }}' as ts
                FROM wherobots_open_data.overture.places_place places
                     JOIN wherobots_open_data.overture.buildings_building buildings
                     ON ST_CONTAINS(buildings.geometry, places.geometry)
                WHERE places.updatetime >= '{{ ds }}'
                      AND places.updatetime < '{{ next_ds }}'
                      AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), places.geometry)
                      AND ST_CONTAINS(ST_PolygonFromEnvelope(-79.762152, 40.496103, -71.856214, 45.01585), buildings.geometry)
                GROUP BY 1
                ORDER BY 3 desc
                LIMIT 100
                """,
            return_last=False,
        )
    return dag


def test_my_custom_operator_execute_no_trigger(dag):
    dagrun = dag.create_dagrun(
        state=DagRunState.RUNNING,
        execution_date=DATA_INTERVAL_START,
        data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
        start_date=DATA_INTERVAL_END,
        run_type=DagRunType.MANUAL,
    )
    ti = dagrun.get_task_instance(task_id=TEST_TASK_ID)
    ti.task = dag.get_task(task_id=TEST_TASK_ID)
    ti.run(ignore_ti_state=True)
    assert ti.state == TaskInstanceState.SUCCESS
    # Assert something related to tasks results.

Execute the test

To execute this test:

  1. Copy this DAG example into a Python file.
  2. Save the file with a name of your choosing (e.g., YOUR_DAG_FILE_EXAMPLE_NAME.py).
  3. Execute it using the command: pytest YOUR_DAG_FILE_EXAMPLE_NAME.py

It can take a few minutes for your WherobotsDB SQL Session to initialize. Logs will appear once the test completes.