01 — Mathematics

The formulas,
before & after

Before — Python / Pandas

Rate of Return via pct_change()

In Stocks_Extractor_v1.py, the entire DataFrame had to reside in the Pi's RAM. A single missing row shifts the index — the calculation for that day becomes wrong. You also defined calculate_rate_of_return twice; the second definition with .dropna() silently overrides the first.

Rt = ( Closet / Closet-1 − 1 ) × 100

# Python implementation
df['return'] = df['Close'].pct_change() * 100
df = df.dropna() # lose the first row
After — dbt / PostgreSQL LAG()

Rate of Return via Window Function

The SQL LAG() function is more robust. PARTITION BY ticker ensures you never accidentally divide by the previous row's price if it belongs to a different ticker — something raw Pandas pct_change() can't guarantee without pre-sorting.

Rt = ( Closet / LAG(Close, 1) OVER (PARTITION BY ticker ORDER BY date) − 1 ) × 100

-- dbt stocks_mart.sql
(close / LAG(close, 1) OVER (
  PARTITION BY ticker
  ORDER BY date
) - 1) * 100 AS return_pct

Is the transformation linear?

Test 1 — Additivity

If T(A + B) = T(A) + T(B), the transform is additive. For rate of return:

T(A+B) = (At+Bt−At-1−Bt-1) / (At-1+Bt-1)

T(A)+T(B) = (At−At-1)/At-1 + (Bt−Bt-1)/Bt-1

T(A+B) ≠ T(A)+T(B)
Fails additivity

Test 2 — Homogeneity

If T(αC) = α·T(C), the transform is homogeneous. For a stock split (scale by α):

T(αC) = (αCt − αCt-1) / αCt-1
       = α cancels out
       = T(C)

T(αC) = T(C) ≠ α·T(C)
Fails homogeneity
Verdict

The rate-of-return transformation is non-linear

Division by the previous price Ct-1 breaks linearity. Financially, this makes sense: if Stock A and Stock B both rise 10%, a portfolio of both also rises 10% — it doesn't add to 20%. The denominator is what makes the function non-linear.

02 — File Changes

What actually
changed in the files

Stocks_Extractor.py — old new

Removed — calculate_metrics()
Daily Return, Cumulative Return,
SMA_20, EMA_20

Four derived columns computed in Pandas during extraction. Stored in the DB as pre-calculated values — meaning a logic bug required re-fetching all history from yfinance to fix.

Kept — calculate_rate_of_return()
df['Rate of Return'] = df['Close'].pct_change() * 100
df = df.dropna(subset=['Rate of Return'])

Still calculated in Python — only the first row per ticker is affected. The new script's docstring explicitly notes that dbt's stocks_cleansing layer will sanitise any residual NaN/Inf on incremental loads.

Removed — prepare_dataframe_for_db() columns
daily_return, cumulative_return,
sma_20, ema_20, sector

Old version wrote these calculated and enriched columns directly to the database. New version intentionally strips them — the column mapping comment now reads "Columns intentionally excluded (now owned by dbt)".

Added — dotenv + EMAIL_NOTIFICATION_INTERVAL
from dotenv import load_dotenv
load_dotenv()
EMAIL_RECIPIENT = os.getenv("EMAIL_RECIPIENT")
EMAIL_NOTIFICATION_INTERVAL = 500

Email recipient moved to .env — no more hardcoded address. Notification interval raised from 100 → 500 tickers, reducing email noise for a 15k-ticker run (~30 emails instead of ~150).

Removed — duplicate function definition
calculate_rate_of_return() defined twice
(lines 296 and 307 in old file)

The old file defined the function twice. The second silently overrode the first. The new file has a single, clean definition with a proper docstring explaining why .dropna() is intentional.

Simplified — insert is now truly raw
INSERT INTO integrated_stocks
(date, open, high, low, close,
volume, dividends, stock_splits,
ticker, rate_of_return)

Only raw OHLCV fields plus rate_of_return reach the database. All analytics — sector joins, moving averages, Bollinger Bands, MACD — are dbt's responsibility.

dbt models — what each file actually does

stocks_staging.sql
ROW_NUMBER() OVER (
  PARTITION BY s.ticker, s.date
  ORDER BY s.open
) as rn ... WHERE rn = 1

Left-joins integrated_stocks with stocks_info for sector/region. Deduplicates by (ticker, date) — if Python ever inserts a duplicate row, dbt silently picks one. Filters to S&P 500, Forex, Crypto, Indices, Weapons, Commodities, Japan, Brazil, USA.

stocks_cleansing.sql + safe_numeric macro
CASE
  WHEN open = 'NaN'::float8
  OR open = 'Infinity'::float8
  OR open >= 1e14
  THEN null ELSE open
END as open

Scrubs three float8 failure modes: NaN, ±Infinity, and overflow (≥1e14). Price columns → NULL. Volume → 0. The safe_numeric macro eliminates the scattered CASE blocks that would otherwise appear in every column of the mart.

stocks_mart.sql — key guard
CASE
  WHEN ema_12_count >= 12
  AND ema_26_count >= 26
  THEN ema_12 - ema_26
  ELSE null
END as macd_line

MACD is only emitted once there are at least 12 and 26 rows of history — the warm-up guard. This is what guarantees accuracy. Python with a 30-day fetch window has no such guard, and the EMA recursion starts cold. Signal line requires an additional 9 confirmed MACD rows.

Key insight — the 30-day window problem

Why Python's 30-day fetch produces incorrect indicators

Your extractor fetches the last 60 days by default (timedelta(days=60)). That's enough rows for a 30-day SMA — but MACD's slow EMA uses 26 periods, and the signal line needs 9 more periods on top of that. More critically, EMAs are recursive: each value depends on the previous EMA, which depends on the one before it, going all the way back to the first trading day.

-- Python, 60-day window, day 1 of the window:
EMAt = Closet × k + EMAt-1 × (1-k)
-- EMAt-1 is unknown → cold start → wrong for every row

-- dbt stocks_mart.sql, against full history (~46M rows):
CASE WHEN ema_26_count >= 26 THEN ema_12 - ema_26 ELSE null END
-- Only emits a value once the warm-up period is fully satisfied
03 — Pipeline Architecture

Your ELT
data flow

Python
Extract & Load

Stocks_Extractor.py — a lean ingestion engine. Hits yfinance, grabs OHLCV data, upserts into PostgreSQL.

Responsibility: networking, rate limiting, I/O. Nothing more. The Pi's CPU stays cool.

ON CONFLICT (ticker, date) DO UPDATE

Run it multiple times without fear — no duplicate rows, no broken dbt models downstream.

dbt
stocks_staging

Filters the universe: S&P 500, Brazil, Japan. Deduplicates with ROW_NUMBER() OVER (PARTITION BY ticker, date).

If Python accidentally inserts a duplicate for the same day, dbt picks one and ignores the rest — Power BI never double-counts.

dbt
stocks_cleansing

The safe_numeric macro — a defensive layer at the database level. Catches NaN and Infinity that yfinance occasionally emits and converts them to NULL.

Casts everything to numeric(18,4) eliminating float64 rounding errors like 0.30000000000000004 before they reach Power BI.

dbt
stocks_mart

The "brain." Calculates all financial intelligence using the full historical database — no warm-up period issues, no re-fetching history from yfinance.

7, 14, 20, 30-day SMAs · Bollinger Bands · MACD Lines · Signal Lines · Histograms. All materialized as a table: Power BI reads static values, not running DAX calculations.

Pre-hooks configure work_mem and temp_buffers — PostgreSQL handles the sorting and partitioning efficiently.

Power BI
Reports

Reads clean, pre-calculated data. No complex time-intelligence DAX. No heavy measures. Your 1,500+ user environment gets snappy dashboards because the hard math already happened in PostgreSQL.

Dagster orchestrates the sequence: Python extractor → dbt run → Power BI refresh trigger. Materialized as assets with full lineage visibility.

04 — Financial Indicators in dbt

The math that
lives in SQL

Moving Averages

N-period SMAs computed across the full history. No warm-up problem: the database already holds every row since 1940-01-01 for your tickers.

SMAn(t) = Σ Closei / n
OVER (PARTITION BY ticker
  ORDER BY date
  ROWS BETWEEN (n-1) PRECEDING
  AND CURRENT ROW)
σ

Bollinger Bands

Upper and lower bands at ±2 standard deviations from the moving average. When n=1 (new stock's first day), σ can produce NaN — safe_numeric catches it and returns NULL instead of crashing the Power BI refresh.

Middle = SMA20
σ = √( Σ(x-μ)² / n )
Upper = μ + (2 × σ)
Lower = μ − (2 × σ)
Δ

MACD

Trend-following momentum. EMAs are recursive — accurate MACD requires history from the very first trading day. dbt has access to all of it. Extraction-time Python with a 30-day fetch window would produce a mathematically incorrect MACD.

MACD = SMA12 − SMA26
Signal = SMA9(MACD)
Histogram = MACD − Signal
05 — Architecture Decision

ETL vs ELT
the tradeoffs

Feature

Comparison axis

History needed for calc
Bug fixing
Pi CPU/RAM load
Logic visibility
NaN / edge case handling
Float precision
Power BI perf
Reprocessing cost
ETL

Extract, Transform, Load

Must fetch extra history every run
Re-extract all tickers, all years
High — Pandas is memory hungry
Hidden in Python code
Pandas NaN propagates to DB
float64 rounding artifacts
Heavy DAX, slower reports
API calls, rate-limit risk
ELT

Extract, Load, Transform

Uses history already in DB
Fix SQL, run dbt — minutes
Low — Pi just moves data
Visible SQL + dbt docs lineage
safe_numeric macro, NULL in PBI
numeric(18,4) — exact
Static values, instant reads
Zero API calls to fix logic
06 — dbt Commands

What you can run
safely

stocks_staging and stocks_cleansing are both materialized='view' — they complete in seconds. stocks_mart is a materialized='table' against 46M rows — around 30 minutes. Everything below stops before it.

view
stocks_staging
instant
view
stocks_cleansing
instant
table
stocks_mart
~30 min — skip
✓ Safe to run
Run both views
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt run --select stocks_staging stocks_cleansing
13:55:56 Running with dbt=1.11.7
13:55:56 Registered adapter: postgres=1.10.0
13:55:57 Found 4 models, 3 sources, 465 macros
13:55:57 Concurrency: 2 threads (target='dev')
13:55:57 1 of 2 START sql view model dbt_stocks.stocks_staging ........... [RUN]
13:55:57 1 of 2 OK created sql view model dbt_stocks.stocks_staging ...... [CREATE VIEW in 0.17s]
13:55:57 2 of 2 START sql view model dbt_stocks.stocks_cleansing ......... [RUN]
13:55:57 2 of 2 OK created sql view model dbt_stocks.stocks_cleansing .... [CREATE VIEW in 0.05s]
13:55:57 Finished running 2 view models in 0 hours 0 minutes and 0.45 seconds.

13:55:57 Completed successfully

13:55:57 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=2

Materializes both views in sequence. Because stocks_cleansing refs stocks_staging, dbt respects the dependency order automatically. Both complete in under half a second.

✓ Safe to run
Target the staging folder
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt run --select staging.*
13:56:15 Running with dbt=1.11.7
13:56:16 Registered adapter: postgres=1.10.0
13:56:16 Found 4 models, 3 sources, 465 macros
13:56:16 Concurrency: 2 threads (target='dev')
13:56:16 1 of 2 START sql view model dbt_stocks.stocks_staging ........... [RUN]
13:56:16 1 of 2 OK created sql view model dbt_stocks.stocks_staging ...... [CREATE VIEW in 0.15s]
13:56:16 2 of 2 START sql view model dbt_stocks.stocks_cleansing ......... [RUN]
13:56:17 2 of 2 OK created sql view model dbt_stocks.stocks_cleansing .... [CREATE VIEW in 0.05s]
13:56:17 Finished running 2 view models in 0 hours 0 minutes and 0.38 seconds.

13:56:17 Completed successfully

13:56:17 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=2

Runs everything under models/staging/ as defined in dbt_project.yml. Safe because the mart lives under models/marts/ — a completely separate path.

◎ No DB writes
Compile — validate SQL only
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt compile --select stocks_staging stocks_cleansing
13:56:41 Running with dbt=1.11.7
13:56:41 Registered adapter: postgres=1.10.0
13:56:41 Found 4 models, 3 sources, 465 macros
13:56:41 Concurrency: 2 threads (target='dev')
13:56:41 Compiled node 'stocks_staging' is:

with base as (
  select
    s.ticker, s.date, s.open, s.low, s.high,
    s.close, s.volume, s.rate_of_return,
    coalesce(i.sector, 'Others') as sector,
    row_number() over (partition by s.ticker,
      s.date order by s.open) as rn
  from "stocks_db"."public"."integrated_stocks" s
  left join "stocks_db"."public"."stocks_info" i
    on s.ticker = i.ticker
  where s.ticker in (select ticker from
    "stocks_db"."public"."sp500_tickers")
  or i.country_region in ('Japan', 'Brazil', 'USA')
)
select ticker, date, open, low, high, close,
  volume, rate_of_return, sector, country_region
from base where rn = 1

Renders Jinja/macro templates into plain SQL under target/compiled/. Nothing touches the database — great for verifying the safe_numeric macro expansion and source references before committing.

◎ No DB writes
Check source freshness
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt source freshness
13:56:51 Running with dbt=1.11.7
13:56:51 Registered adapter: postgres=1.10.0
13:56:51 Found 4 models, 3 sources, 465 macros
13:56:51 Nothing to do. Try checking your model configs
13:56:51 and model specification args
13:56:51 Done.

# No loaded_at_field configured in sources.yml yet —
# add it to get freshness warnings when ingestion stalls

Queries sources defined in sources.yml for freshness. Currently returns "Nothing to do" because no loaded_at_field is configured — a reminder to add one for stale-data alerting.

✓ Safe to run
Exclude mart explicitly
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt run --select stocks_staging+ \
--exclude stocks_mart
13:57:03 Running with dbt=1.11.7
13:57:03 Registered adapter: postgres=1.10.0
13:57:03 Found 4 models, 3 sources, 465 macros
13:57:03 Concurrency: 2 threads (target='dev')
13:57:03 1 of 2 START sql view model dbt_stocks.stocks_staging ........... [RUN]
13:57:04 1 of 2 OK created sql view model dbt_stocks.stocks_staging ...... [CREATE VIEW in 0.14s]
13:57:04 2 of 2 START sql view model dbt_stocks.stocks_cleansing ......... [RUN]
13:57:04 2 of 2 OK created sql view model dbt_stocks.stocks_cleansing .... [CREATE VIEW in 0.05s]
13:57:04 Finished running 2 view models in 0 hours 0 minutes and 0.36 seconds.

13:57:04 Completed successfully

13:57:04 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 NO-OP=0 TOTAL=2

The + operator selects all downstream dependents of stocks_staging. The explicit --exclude ensures the mart never runs — future-proof as more models are added.

✗ Skip on camera
Full mart rebuild
(.dbt-env) ricardo@main-pi:~/stocks_dbt
$ dbt run --select stocks_mart
Running with dbt=1.11.7
Found 4 models, 3 sources, 465 macros
Concurrency: 2 threads (target='dev')

1 of 1 START sql table model dbt_stocks.stocks_mart ............. [RUN]
# SET LOCAL work_mem = '512MB'
# SET LOCAL temp_buffers = '256MB'
# Sorting 46M rows across all window partitions...
# SMA 7/14/20/30 · Bollinger Bands · MACD · Signal · Histogram
# ~30 minutes · pre-recorded for the video

1 of 1 OK created sql table model dbt_stocks.stocks_mart ....... [CREATE TABLE in ~1800s]

Full materialized='table' rebuild against the entire 46M-row history. Pre-hooks configure PostgreSQL memory before sorting begins. Record this separately and cut to the result.

Keep Python dumb.
Let dbt be the brain.

The extractor's only job is to get raw OHLCV numbers into integrated_stocks safely and fast. Every financial calculation — moving averages, Bollinger Bands, MACD, rate of return — belongs in your dbt mart, where it runs against the full history, produces exact decimal results, and is one SQL edit away from being fixed or improved.

✓ dbt for transformations ✓ Python for ingestion only ✓ Dagster orchestrates both