Skip to content

Commit a0c9c39

Browse files
committed
Add databricks version of module 3
Signed-off-by: Danny Chiao <[email protected]>
1 parent 802de24 commit a0c9c39

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+604
-9
lines changed

Diff for: .gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@ terraform.tfstate.backup
88
*.iml
99
**/feast-postgres-data/*
1010
**/airflow_demo/airflow_home/*
11-
.vscode/*
11+
.vscode/*
12+
**/derby.log
13+
**/metastore_db/*

Diff for: README.md

+7-6

Diff for: module_3_db/README.md

+312

Diff for: module_3_db/airflow_demo/dag.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import os
2+
from airflow.decorators import task
3+
from airflow.models.dag import DAG
4+
from airflow.operators.bash import BashOperator
5+
from feast import RepoConfig, FeatureStore
6+
from feast.infra.offline_stores.contrib.spark_offline_store.spark import (
7+
SparkOfflineStoreConfig,
8+
)
9+
from feast.repo_config import RegistryConfig
10+
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
11+
import pendulum
12+
13+
with DAG(
14+
dag_id="feature_dag",
15+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
16+
description="A dbt + Feast DAG",
17+
schedule="@daily",
18+
catchup=False,
19+
tags=["feast"],
20+
) as dag:
21+
dbt_test = BashOperator(
22+
task_id="dbt_test",
23+
bash_command="""
24+
cd ${AIRFLOW_HOME}; dbt test --models "aggregate_transaction_features"
25+
""",
26+
dag=dag,
27+
)
28+
29+
# Generate new transformed feature values
30+
dbt_run = BashOperator(
31+
task_id="dbt_run",
32+
bash_command="""
33+
cd ${AIRFLOW_HOME}; dbt run --models "aggregate_transaction_features"
34+
""",
35+
dag=dag,
36+
)
37+
38+
# Use Feast to make these feature values available in a low latency store
39+
@task()
40+
def materialize(data_interval_start=None, data_interval_end=None):
41+
repo_config = RepoConfig(
42+
registry=RegistryConfig(
43+
registry_type="sql",
44+
path="postgresql://postgres:mysecretpassword@[YOUR-RDS-ENDPOINT:PORT]/feast",
45+
),
46+
project="feast_demo",
47+
provider="local",
48+
offline_store=SparkOfflineStoreConfig(
49+
spark_conf={
50+
"spark.ui.enabled": "false",
51+
"spark.eventLog.enabled": "false",
52+
"spark.sql.catalogImplementation": "hive",
53+
"spark.sql.parser.quotedRegexColumnNames": "true",
54+
"spark.sql.session.timeZone": "UTC",
55+
}
56+
),
57+
online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
58+
entity_key_serialization_version=2,
59+
)
60+
# Needed for Mac OS users because of a seg fault in requests for standalone Airflow (not needed in prod)
61+
os.environ["NO_PROXY"] = "*"
62+
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
63+
store = FeatureStore(config=repo_config)
64+
# Add 1 hr overlap to account for late data
65+
# Note: normally, you'll probably have different feature views with different freshness requirements, instead
66+
# of materializing all feature views every day.
67+
store.materialize(data_interval_start.subtract(hours=1), data_interval_end)
68+
69+
# Setup DAG
70+
dbt_test >> dbt_run >> materialize()

Diff for: module_3_db/airflow_demo/setup_airflow.sh

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Airflow needs a home. `~/airflow` is the default, but you can put it
2+
# somewhere else if you prefer (optional)
3+
export AIRFLOW_HOME=$(pwd)/airflow_home
4+
export AIRFLOW__CORE__LOAD_EXAMPLES=False
5+
# TODO: UPDATE THIS
6+
export AIRFLOW_CONN_DATABRICKS_DEFAULT='databricks://@host-url?token=yourtoken'
7+
8+
# Cleanup previous state, if it exists
9+
rm -rf $AIRFLOW_HOME
10+
11+
# Install Airflow using the constraints file
12+
AIRFLOW_VERSION=2.4.0
13+
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
14+
# For example: 3.7
15+
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
16+
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.4.0/constraints-3.7.txt
17+
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
18+
19+
# Setup Feast dags
20+
mkdir -p $AIRFLOW_HOME/dags
21+
cp dag.py $AIRFLOW_HOME/dags
22+
23+
# Setup dbt dags
24+
cd ../dbt/feast_demo
25+
cp -R * $AIRFLOW_HOME
26+
cd $AIRFLOW_HOME
27+
28+
# The Standalone command will initialise the database, make a user,
29+
# and start all components for you.
30+
airflow standalone
31+
32+
# Visit localhost:8080 in the browser and use the admin account details
33+
# shown on the terminal to login.

Diff for: module_3_db/architecture.png

164 KB
File renamed without changes.

Diff for: module_3_db/dbt/feast_demo/dbt_project.yml

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
# Name your project! Project names should contain only lowercase characters
3+
# and underscores. A good package name should reflect your organization's
4+
# name or the intended use of these models
5+
name: 'feast_demo'
6+
version: '1.0.0'
7+
config-version: 2
8+
9+
# This setting configures which "profile" dbt uses for this project.
10+
profile: 'feast_demo'
11+
12+
# These configurations specify where dbt should look for different types of files.
13+
# The `model-paths` config, for example, states that models in this project can be
14+
# found in the "models/" directory. You probably won't need to change these!
15+
model-paths: ["models"]
16+
macro-paths: ["macros"]
17+
18+
19+
target-path: "target" # directory which will store compiled SQL files
20+
clean-targets: # directories to be removed by `dbt clean`
21+
- "target"
22+
- "dbt_packages"
23+
24+
models:
25+
feast_demo:
26+
example:
27+
materialized: view
28+
location_root: '[YOUR S3 BUCKET/FOLDER]'
29+
file_format: 'delta'

Diff for: module_3_db/dbt/feast_demo/macros/macro.sql

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{% macro prev_day_partition() %}
2+
(SELECT MAX(timestamp)::date FROM {{ this }})
3+
{% endmacro %}
4+
5+
{% macro next_day_partition() %}
6+
(SELECT date_add(MAX(timestamp)::date, 1) FROM {{ this }})
7+
{% endmacro %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{{
2+
config(
3+
materialized='incremental',
4+
file_format='parquet',
5+
incremental_strategy='append'
6+
)
7+
}}
8+
9+
SELECT *
10+
FROM
11+
(
12+
SELECT
13+
user_id,
14+
to_timestamp(timestamp) AS timestamp,
15+
SUM(amt) OVER (
16+
PARTITION BY user_id
17+
ORDER BY to_timestamp(timestamp)
18+
RANGE BETWEEN INTERVAL 1 day PRECEDING AND CURRENT ROW
19+
) AS amt_sum_1d_10m,
20+
AVG(amt) OVER (
21+
PARTITION BY user_id
22+
ORDER BY to_timestamp(timestamp)
23+
RANGE BETWEEN INTERVAL 1 day PRECEDING AND CURRENT ROW
24+
) AS amt_mean_1d_10m
25+
FROM demo_fraud_v2.transactions
26+
{% if is_incremental() %}
27+
WHERE
28+
partition_0 BETWEEN date_format({{ prev_day_partition() }}, "yyyy") AND date_format({{ next_day_partition() }}, "yyyy") AND
29+
partition_1 BETWEEN date_format({{ prev_day_partition() }}, "MM") AND date_format({{ next_day_partition() }}, "MM") AND
30+
partition_2 BETWEEN date_format({{ prev_day_partition() }}, "dd") AND date_format({{ next_day_partition() }}, "dd")
31+
{% else %}
32+
-- Hack to simulate we started on 2021-06-01
33+
WHERE
34+
partition_0 = "2022" AND
35+
partition_1 = "04" AND
36+
partition_2 = "20"
37+
{% endif %}
38+
)
39+
{% if is_incremental() %}
40+
-- Need to only produce values in this window
41+
WHERE timestamp > (SELECT MAX(timestamp) FROM {{ this }})
42+
{% endif %}

Diff for: module_3_db/dbt/feast_demo/models/example/schema.yml

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
version: 2
3+
4+
models:
5+
- name: aggregate_transaction_features
6+
description: ""
7+
columns:
8+
- name: "user_id"
9+
description: "The primary key for this table"
10+
tests:
11+
- not_null
File renamed without changes.

Diff for: module_3_db/demo_images/db_ghf.png

494 KB

Diff for: module_3_db/demo_images/db_gof.png

245 KB

Diff for: module_3_db/demo_images/db_materialize.png

201 KB

Diff for: module_3_db/demo_images/db_stream_agg.png

450 KB

Diff for: module_3_db/demo_images/dbt.png

122 KB

Diff for: module_3_db/demo_images/df_stream_setup.png

558 KB

Diff for: module_3_db/feature_repo/data_sources.py

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from feast import PushSource
2+
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
3+
SparkSource,
4+
)
5+
6+
# Historical log of transactions stream
7+
transactions_source = SparkSource(
8+
name="transactions_source",
9+
table="demo_fraud_v2.transactions",
10+
timestamp_field="timestamp",
11+
)
12+
13+
# Precomputed aggregate transaction feature values (batch / stream)
14+
aggregate_transactions_source = PushSource(
15+
name="transactions_1d",
16+
batch_source=SparkSource(
17+
name="transactions_1d_batch",
18+
table="demo_fraud_v2.aggregate_transaction_features",
19+
timestamp_field="timestamp",
20+
tags={"dbtModel": "models/example/aggregate_transaction_features.sql"},
21+
),
22+
)

Diff for: module_3_db/feature_repo/entities.py

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from feast import (
2+
Entity,
3+
)
4+
5+
user = Entity(
6+
name="user",
7+
join_keys=["user_id"],
8+
description="user id",
9+
)

Diff for: module_3_db/feature_repo/feature_services.py

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from feast import FeatureService
2+
3+
from features import *
4+
5+
feature_service_1 = FeatureService(
6+
name="model_v1",
7+
features=[user_transaction_amount_metrics],
8+
9+
)

Diff for: module_3_db/feature_repo/feature_store.yaml

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
project: feast_demo
2+
provider: aws
3+
registry: # where this repo's metadata is stored
4+
registry_type: sql
5+
path: postgresql://postgres:mysecretpassword@[your-rds-instance]:5432/feast
6+
online_store: # low latency online storage
7+
type: dynamodb
8+
region: us-west-1
9+
offline_store:
10+
type: spark
11+
spark_conf: # Note: pip install -U "databricks-connect"
12+
spark.ui.enabled: "false"
13+
spark.eventLog.enabled: "false"
14+
spark.sql.catalogImplementation: "hive"
15+
spark.sql.parser.quotedRegexColumnNames: "true"
16+
spark.sql.session.timeZone: "UTC"
17+
entity_key_serialization_version: 2

Diff for: module_3_db/feature_repo/features.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from datetime import timedelta
2+
3+
from feast import (
4+
FeatureView,
5+
Field,
6+
)
7+
from feast.types import String, Float64
8+
9+
from data_sources import *
10+
from entities import *
11+
12+
user_transaction_amount_metrics = FeatureView(
13+
name="user_transaction_amount_metrics",
14+
description="User transaction features",
15+
entities=[user],
16+
ttl=timedelta(seconds=8640000000),
17+
schema=[
18+
Field(name="user_id", dtype=String),
19+
Field(name="amt_sum_1d_10m", dtype=Float64),
20+
Field(name="amt_mean_1d_10m", dtype=Float64),
21+
],
22+
online=True,
23+
source=aggregate_transactions_source,
24+
tags={"production": "True"},
25+
26+
)

Diff for: module_3_db/orchestrate.png

217 KB

Diff for: module_3_db/sample_db_notebook.ipynb

+1
Large diffs are not rendered by default.

Diff for: module_3/README.md renamed to module_3_sf/README.md

+2-2

Diff for: module_3_sf/airflow.png

247 KB
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

Diff for: module_3_sf/dbt/feast_demo/.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
target/
3+
dbt_packages/
4+
logs/
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)