In Stocks_Extractor_v1.py, the entire DataFrame had to reside in the Pi's RAM. A single missing row shifts the index — the calculation for that day becomes wrong. You also defined calculate_rate_of_return twice; the second definition with .dropna() silently overrides the first.
The SQL LAG() function is more robust. PARTITION BY ticker ensures you never accidentally divide by the previous row's price if it belongs to a different ticker — something raw Pandas pct_change() can't guarantee without pre-sorting.
If T(A + B) = T(A) + T(B), the transform is additive. For rate of return:
If T(αC) = α·T(C), the transform is homogeneous. For a stock split (scale by α):
Division by the previous price Ct-1 breaks linearity. Financially, this makes sense: if Stock A and Stock B both rise 10%, a portfolio of both also rises 10% — it doesn't add to 20%. The denominator is what makes the function non-linear.
Daily Return, Cumulative Return,
SMA_20, EMA_20
Four derived columns computed in Pandas during extraction. Stored in the DB as pre-calculated values — meaning a logic bug required re-fetching all history from yfinance to fix.
df['Rate of Return'] = df['Close'].pct_change() * 100
df = df.dropna(subset=['Rate of Return'])
Still calculated in Python — only the first row per ticker is affected. The new script's docstring explicitly notes that dbt's stocks_cleansing layer will sanitise any residual NaN/Inf on incremental loads.
daily_return, cumulative_return,
sma_20, ema_20, sector
Old version wrote these calculated and enriched columns directly to the database. New version intentionally strips them — the column mapping comment now reads "Columns intentionally excluded (now owned by dbt)".
from dotenv import load_dotenv
load_dotenv()
EMAIL_RECIPIENT = os.getenv("EMAIL_RECIPIENT")
EMAIL_NOTIFICATION_INTERVAL = 500
Email recipient moved to .env — no more hardcoded address. Notification interval raised from 100 → 500 tickers, reducing email noise for a 15k-ticker run (~30 emails instead of ~150).
calculate_rate_of_return() defined twice
(lines 296 and 307 in old file)
The old file defined the function twice. The second silently overrode the first. The new file has a single, clean definition with a proper docstring explaining why .dropna() is intentional.
INSERT INTO integrated_stocks
(date, open, high, low, close,
volume, dividends, stock_splits,
ticker, rate_of_return)
Only raw OHLCV fields plus rate_of_return reach the database. All analytics — sector joins, moving averages, Bollinger Bands, MACD — are dbt's responsibility.
ROW_NUMBER() OVER (
PARTITION BY s.ticker, s.date
ORDER BY s.open
) as rn ... WHERE rn = 1
Left-joins integrated_stocks with stocks_info for sector/region. Deduplicates by (ticker, date) — if Python ever inserts a duplicate row, dbt silently picks one. Filters to S&P 500, Forex, Crypto, Indices, Weapons, Commodities, Japan, Brazil, USA.
CASE
WHEN open = 'NaN'::float8
OR open = 'Infinity'::float8
OR open >= 1e14
THEN null ELSE open
END as open
Scrubs three float8 failure modes: NaN, ±Infinity, and overflow (≥1e14). Price columns → NULL. Volume → 0. The safe_numeric macro eliminates the scattered CASE blocks that would otherwise appear in every column of the mart.
CASE
WHEN ema_12_count >= 12
AND ema_26_count >= 26
THEN ema_12 - ema_26
ELSE null
END as macd_line
MACD is only emitted once there are at least 12 and 26 rows of history — the warm-up guard. This is what guarantees accuracy. Python with a 30-day fetch window has no such guard, and the EMA recursion starts cold. Signal line requires an additional 9 confirmed MACD rows.
Your extractor fetches the last 60 days by default (timedelta(days=60)). That's enough rows for a 30-day SMA — but MACD's slow EMA uses 26 periods, and the signal line needs 9 more periods on top of that. More critically, EMAs are recursive: each value depends on the previous EMA, which depends on the one before it, going all the way back to the first trading day.
Stocks_Extractor.py — a lean ingestion engine. Hits yfinance, grabs OHLCV data, upserts into PostgreSQL.
Responsibility: networking, rate limiting, I/O. Nothing more. The Pi's CPU stays cool.
ON CONFLICT (ticker, date) DO UPDATERun it multiple times without fear — no duplicate rows, no broken dbt models downstream.
Filters the universe: S&P 500, Brazil, Japan. Deduplicates with ROW_NUMBER() OVER (PARTITION BY ticker, date).
If Python accidentally inserts a duplicate for the same day, dbt picks one and ignores the rest — Power BI never double-counts.
The safe_numeric macro — a defensive layer at the database level. Catches NaN and Infinity that yfinance occasionally emits and converts them to NULL.
Casts everything to numeric(18,4) eliminating float64 rounding errors like 0.30000000000000004 before they reach Power BI.
The "brain." Calculates all financial intelligence using the full historical database — no warm-up period issues, no re-fetching history from yfinance.
7, 14, 20, 30-day SMAs · Bollinger Bands · MACD Lines · Signal Lines · Histograms. All materialized as a table: Power BI reads static values, not running DAX calculations.
Pre-hooks configure work_mem and temp_buffers — PostgreSQL handles the sorting and partitioning efficiently.
Reads clean, pre-calculated data. No complex time-intelligence DAX. No heavy measures. Your 1,500+ user environment gets snappy dashboards because the hard math already happened in PostgreSQL.
Dagster orchestrates the sequence: Python extractor → dbt run → Power BI refresh trigger. Materialized as assets with full lineage visibility.
N-period SMAs computed across the full history. No warm-up problem: the database already holds every row since 1940-01-01 for your tickers.
Upper and lower bands at ±2 standard deviations from the moving average. When n=1 (new stock's first day), σ can produce NaN — safe_numeric catches it and returns NULL instead of crashing the Power BI refresh.
Trend-following momentum. EMAs are recursive — accurate MACD requires history from the very first trading day. dbt has access to all of it. Extraction-time Python with a 30-day fetch window would produce a mathematically incorrect MACD.
stocks_staging and
stocks_cleansing are both
materialized='view' — they complete in seconds.
stocks_mart is a
materialized='table' against 46M rows — around 30 minutes.
Everything below stops before it.
Materializes both views in sequence. Because stocks_cleansing refs stocks_staging, dbt respects the dependency order automatically. Both complete in under half a second.
Runs everything under models/staging/ as defined in dbt_project.yml. Safe because the mart lives under models/marts/ — a completely separate path.
Renders Jinja/macro templates into plain SQL under target/compiled/. Nothing touches the database — great for verifying the safe_numeric macro expansion and source references before committing.
Queries sources defined in sources.yml for freshness. Currently returns "Nothing to do" because no loaded_at_field is configured — a reminder to add one for stale-data alerting.
The + operator selects all downstream dependents of stocks_staging. The explicit --exclude ensures the mart never runs — future-proof as more models are added.
Full materialized='table' rebuild against the entire 46M-row history. Pre-hooks configure PostgreSQL memory before sorting begins. Record this separately and cut to the result.
The extractor's only job is to get raw OHLCV numbers into integrated_stocks safely and fast. Every financial calculation — moving averages, Bollinger Bands, MACD, rate of return — belongs in your dbt mart, where it runs against the full history, produces exact decimal results, and is one SQL edit away from being fixed or improved.