dbt Pipeline Documentation  ·  ELT Architecture

Stocks Pipeline
From Git to dbt

How raw yfinance OHLCV data flows from Python extraction into PostgreSQL, through a three-layer dbt pipeline, and out as clean, pre-calculated financial indicators ready for Power BI and the Go API. The case for keeping Python dumb and letting SQL be the brain.

dbt 1.11.7 PostgreSQL · 46M rows Python · yfinance stocks_mart · ~14K tickers

The formulas, before & after

Why moving rate-of-return from Pandas into SQL window functions produces more reliable results across 14,000+ tickers.

Before — Python / Pandas

Rate of Return via pct_change()

In Stocks_Extractor_v1.py, the entire DataFrame had to reside in the Pi's RAM. A single missing row shifts the index — the calculation for that day becomes wrong. The function was also defined twice; the second definition with .dropna() silently overrides the first.

Rt = ( Closet / Closet-1 − 1 ) × 100

# Python implementation
df['return'] = df['Close'].pct_change() * 100
df = df.dropna() # lose the first row per ticker
After — dbt / PostgreSQL LAG()

Rate of Return via Window Function

The SQL LAG() function is more robust. PARTITION BY ticker ensures you never accidentally divide by the previous row's price if it belongs to a different ticker — something raw Pandas pct_change() can't guarantee without pre-sorting.

Rt = ( Closet / LAG(Close, 1) OVER (PARTITION BY ticker ORDER BY date) − 1 ) × 100

-- dbt stocks_mart.sql
(close / LAG(close, 1) OVER (
  PARTITION BY ticker
  ORDER BY date
) - 1) * 100 AS return_pct

Is the transformation linear?

Test 1 — Additivity

If T(A + B) = T(A) + T(B), the transform is additive. For rate of return:

T(A+B) = (At+Bt−At-1−Bt-1) / (At-1+Bt-1)

T(A)+T(B) = (At−At-1)/At-1 + (Bt−Bt-1)/Bt-1

T(A+B) ≠ T(A)+T(B)
Fails additivity

Test 2 — Homogeneity

If T(αC) = α·T(C), the transform is homogeneous. For a stock split (scale by α):

T(αC) = (αCt − αCt-1) / αCt-1
       = α cancels out
       = T(C)

T(αC) = T(C) ≠ α·T(C)
Fails homogeneity
Verdict

The rate-of-return transformation is non-linear

Division by the previous price Ct-1 breaks linearity. Financially this makes sense: if Stock A and B both rise 10%, a portfolio of both also rises 10% — it doesn't add to 20%. The denominator is what makes the function non-linear.


What actually changed in the files

The extractor was slimmed to raw OHLCV only. All financial logic was evicted to dbt.

Stocks_Extractor.py — old new

Removed — calculate_metrics()
Daily Return, Cumulative Return,
SMA_20, EMA_20

Four derived columns computed in Pandas during extraction. Stored in the DB as pre-calculated values — a logic bug required re-fetching all history from yfinance to fix.

Kept — calculate_rate_of_return()
df['Rate of Return'] = df['Close'].pct_change() * 100
df = df.dropna(subset=['Rate of Return'])

Still calculated in Python — only the first row per ticker is affected. dbt's stocks_cleansing layer sanitises any residual NaN/Inf on incremental loads.

Removed — prepare_dataframe_for_db() columns
daily_return, cumulative_return,
sma_20, ema_20, sector

Old version wrote these calculated and enriched columns directly to the database. New version strips them — column mapping comment now reads "Columns intentionally excluded (now owned by dbt)".

Added — dotenv + EMAIL_NOTIFICATION_INTERVAL
from dotenv import load_dotenv
load_dotenv()
EMAIL_RECIPIENT = os.getenv("EMAIL_RECIPIENT")
EMAIL_NOTIFICATION_INTERVAL = 500

Email recipient moved to .env — no hardcoded address. Notification interval raised 100 → 500 tickers (~30 emails instead of ~150 on a 15K-ticker run).

Removed — duplicate function definition
calculate_rate_of_return() defined twice
(lines 296 and 307 in old file)

The old file defined the function twice; the second silently overrode the first. The new file has a single definition with a docstring explaining why .dropna() is intentional.

Simplified — insert is now truly raw
INSERT INTO integrated_stocks
(date, open, high, low, close,
volume, dividends, stock_splits,
ticker, rate_of_return)

Only raw OHLCV fields plus rate_of_return reach the database. All analytics — sector joins, moving averages, Bollinger Bands, MACD — are dbt's responsibility.

dbt models — what each file actually does

stocks_staging.sql
ROW_NUMBER() OVER (
  PARTITION BY s.ticker, s.date
  ORDER BY s.open
) as rn ... WHERE rn = 1

Left-joins integrated_stocks with stocks_info for sector/region. Deduplicates by (ticker, date). Filters to S&P 500, Forex, Crypto, Indices, Japan, Brazil, USA.

stocks_cleansing.sql + safe_numeric macro
CASE
  WHEN open = 'NaN'::float8
  OR open = 'Infinity'::float8
  OR open >= 1e14
  THEN null ELSE open
END as open

Scrubs three float8 failure modes: NaN, ±Infinity, overflow (≥1e14). Price columns → NULL. Volume → 0. The safe_numeric macro eliminates scattered CASE blocks across every mart column.

stocks_mart.sql — key guard
CASE
  WHEN ema_12_count >= 12
  AND ema_26_count >= 26
  THEN ema_12 - ema_26
  ELSE null
END as macd_line

MACD only emitted once there are ≥12 and ≥26 rows of history — the warm-up guard. Signal line requires an additional 9 confirmed MACD rows. Python with a 30-day fetch has no such guard.

Key insight — the 30-day window problem

Why Python's 30-day fetch produces incorrect indicators

Your extractor fetches the last 60 days by default (timedelta(days=60)). That's enough rows for a 30-day SMA — but MACD's slow EMA uses 26 periods, and the signal line needs 9 more periods on top of that. More critically, EMAs are recursive: each value depends on the previous EMA, going all the way back to the first trading day.

-- Python, 60-day window, day 1 of the window:
EMAt = Closet × k + EMAt-1 × (1-k)
-- EMAt-1 is unknown → cold start → wrong for every row

-- dbt stocks_mart.sql, against full history (~46M rows):
CASE WHEN ema_26_count >= 26 THEN ema_12 - ema_26 ELSE null END
-- Only emits a value once the warm-up period is fully satisfied

Your ELT data flow

Four stages from raw extraction to Power BI. Python is only responsible for networking and I/O — every computation lives in SQL.

Python
Extract & Load

Stocks_Extractor.py — a lean ingestion engine. Hits yfinance, grabs OHLCV data, upserts into PostgreSQL. Responsibility: networking, rate limiting, I/O. Nothing more. The Pi's CPU stays cool.

ON CONFLICT (ticker, date) DO UPDATE

Run it multiple times without fear — no duplicate rows, no broken dbt models downstream.

dbt
stocks_staging

Filters the universe: S&P 500, Brazil, Japan. Deduplicates with ROW_NUMBER() OVER (PARTITION BY ticker, date).

If Python accidentally inserts a duplicate for the same day, dbt picks one and ignores the rest — Power BI never double-counts.

dbt
stocks_cleansing

The safe_numeric macro — a defensive layer at the database level. Catches NaN and Infinity that yfinance occasionally emits and converts them to NULL.

Casts everything to numeric(18,4) eliminating float64 rounding errors like 0.30000000000000004 before they reach Power BI.

dbt
stocks_mart

The "brain." Calculates all financial intelligence using the full historical database — no warm-up period issues, no re-fetching history from yfinance.

7, 14, 20, 30-day SMAs · Bollinger Bands · MACD Lines · Signal Lines · Histograms. All materialized as a table: Power BI reads static values, not running DAX calculations.

Pre-hooks configure work_mem and temp_buffers — PostgreSQL handles sorting and partitioning efficiently.

Power BI
Reports

Reads clean, pre-calculated data. No complex time-intelligence DAX. No heavy measures. Your 1,500+ user environment gets snappy dashboards because the hard math already happened in PostgreSQL.

Crontab (0 1 * * *) orchestrates the sequence: Python extractor → dbt build → Power BI refresh trigger.


The math that lives in SQL

Three core indicator families, all computed against the full 46M-row history using PostgreSQL window functions.

Moving Averages

N-period SMAs computed across the full history. No warm-up problem: the database already holds every row since the earliest trading date for each ticker.

SMAn(t) = Σ Closei / n
OVER (PARTITION BY ticker
  ORDER BY date
  ROWS BETWEEN (n-1) PRECEDING
  AND CURRENT ROW)
σ

Bollinger Bands

Upper and lower bands at ±2 standard deviations from the moving average. When n=1 (new stock's first day), σ can produce NaN — safe_numeric catches it and returns NULL instead of crashing the Power BI refresh.

Middle = SMA20
σ = √( Σ(x-μ)² / n )
Upper = μ + (2 × σ)
Lower = μ − (2 × σ)
Δ

MACD

Trend-following momentum. EMAs are recursive — accurate MACD requires history from the very first trading day. dbt has access to all of it. Python with a 30-day fetch window produces a mathematically incorrect MACD.

MACD = EMA12 − EMA26
Signal = EMA9(MACD)
Histogram = MACD − Signal

ETL vs ELT — the tradeoffs

Why moving all transformation logic into dbt changes how you debug, maintain, and scale the pipeline.

Feature

Comparison axis

History needed for calc
Bug fixing
Pi CPU/RAM load
Logic visibility
NaN / edge case handling
Float precision
Power BI perf
Reprocessing cost
ETL

Extract, Transform, Load

Must fetch extra history every run
Re-extract all tickers, all years
High — Pandas is memory hungry
Hidden in Python code
Pandas NaN propagates to DB
float64 rounding artifacts
Heavy DAX, slower reports
API calls, rate-limit risk
ELT

Extract, Load, Transform

Uses history already in DB
Fix SQL, run dbt — minutes
Low — Pi just moves data
Visible SQL + dbt docs lineage
safe_numeric macro, NULL in PBI
numeric(18,4) — exact
Static values, instant reads
Zero API calls to fix logic

What you can run safely

stocks_staging and stocks_cleansing are both materialized='view' — they complete in seconds. stocks_mart is a materialized='table' against 46M rows — around 30 minutes. Everything below stops before it.

view
stocks_staging
instant
view
stocks_cleansing
instant
table
stocks_mart
~30 min — skip
✓ Safe to run
Run both views
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt run --select stocks_staging stocks_cleansing
1 of 2 OK created sql view model dbt_stocks.stocks_staging ...... [CREATE VIEW in 0.17s]
2 of 2 OK created sql view model dbt_stocks.stocks_cleansing .... [CREATE VIEW in 0.05s]

Completed successfully
Done. PASS=2 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=2

Materializes both views in sequence. dbt respects the dependency order automatically — stocks_cleansing always runs after stocks_staging.

✓ Safe to run
Target the staging folder
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt run --select staging.*
1 of 2 OK created sql view model dbt_stocks.stocks_staging ...... [CREATE VIEW in 0.15s]
2 of 2 OK created sql view model dbt_stocks.stocks_cleansing .... [CREATE VIEW in 0.05s]

Completed successfully
Done. PASS=2 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=2

Runs everything under models/staging/. Safe because the mart lives under models/marts/ — a completely separate path.

◎ No DB writes
Compile — validate SQL only
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt compile --select stocks_staging stocks_cleansing
Found 4 models, 3 sources, 465 macros
Compiled node 'stocks_staging' is:

with base as (
  select s.ticker, s.date, s.open, ...
  from "stocks_db"."public"."integrated_stocks" s
  left join "stocks_db"."public"."stocks_info" i
    on s.ticker = i.ticker
)

Renders Jinja/macro templates into plain SQL under target/compiled/. Nothing touches the database — great for verifying safe_numeric macro expansion before committing.

✓ Safe to run
Exclude mart explicitly
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt run --select stocks_staging+ \
--exclude stocks_mart
1 of 2 OK created sql view model dbt_stocks.stocks_staging ...... [CREATE VIEW in 0.14s]
2 of 2 OK created sql view model dbt_stocks.stocks_cleansing .... [CREATE VIEW in 0.05s]

Completed successfully
Done. PASS=2 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=2

The + operator selects all downstream dependents of stocks_staging. Explicit --exclude ensures the mart never runs — future-proof as more models are added.

✗ Skip on camera
Full mart rebuild — ~30 minutes
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt run --select stocks_mart
Running with dbt=1.11.7 · Found 4 models, 3 sources, 465 macros
1 of 1 START sql table model dbt_stocks.stocks_mart ............. [RUN]
# SET LOCAL work_mem = '512MB'; SET LOCAL temp_buffers = '256MB'
# Sorting 46M rows across all window partitions...
# SMA 7/14/20/30 · Bollinger Bands · MACD · Signal · Histogram
# ~30 minutes · pre-recorded for the video

1 of 1 OK created sql table model dbt_stocks.stocks_mart ....... [CREATE TABLE in ~1800s]

Full materialized='table' rebuild against the entire 46M-row history. Pre-hooks configure PostgreSQL memory before sorting begins. Record this separately and cut to the result.


Keep Python dumb.
Let dbt be the brain.

The extractor's only job is to get raw OHLCV numbers into integrated_stocks safely and fast. Every financial calculation — moving averages, Bollinger Bands, MACD, rate of return — belongs in your dbt mart, where it runs against the full history, produces exact decimal results, and is one SQL edit away from being fixed.

✓ dbt for transformations ✓ Python for ingestion only ✓ Crontab orchestrates both