Data Pipelines
How evidence moves from upload to a signed, verifiable dossier.
Stages at a glance
Submit & protect — Users upload chats, screenshots, or receipts. The API normalizes content, detects language, deduplicates, and encrypts victim contact fields (Fernet) so only redacted data flows downstream.
Extract & enrich — Document AI performs OCR; entity extraction finds wallets, emails, phones, and sentiment. The Gemini classifier applies the 5-axis taxonomy (scam intent, channel, social engineering, action, persona) with confidence thresholds. Related cases are linked by recurring accounts and campaign patterns.
Store safely — Victim contact fields are Fernet-encrypted at rest; case metadata lands in Cloud SQL; evidence files go to versioned GCS buckets (raw + redacted copies).
Search & triage — Embeddings are generated and indexed in Vertex AI Search for hybrid retrieval (semantic + keyword). SQL filters provide date, status, and type facets. Analysts search without exposing identities.
Report & sign — The report generator assembles findings from case data and redacted evidence into a dossier PDF, signs it with a SHA-256 hash manifest and timestamp, and publishes via controlled links.
Feedback loop — Analyst corrections and outcome signals feed back into the classifier, improving precision over time.
Analytics aggregation (Sprint 3)
A periodic job (analytics_aggregation.py, scheduled every 4 hours) materializes raw data into four aggregate tables used by the Impact Dashboard and Campaign Intelligence views:
cases + entities
entity_stats
Entity-level risk and loss totals
indicators + intake_records
indicator_stats
Indicator freshness and counts
threat_campaigns + cases
campaign_stats
Campaign-level aggregates
cases + intake_records
platform_kpis
Weekly/monthly KPI snapshots
The Impact Dashboard reads exclusively from these pre-computed tables, keeping analyst-facing queries fast and avoiding full-table scans on the transactional database. See core/docs/design/threat_intelligence_analytics_tdd.md for the full schema.
External enrichment sources (Sprint 5)
Three enrichment pipelines feed additional data into the analytics layer:
Passive DNS
services/enrichment/passive_dns.py
On-demand / API
Historical DNS records per domain
ASN Lookup
services/enrichment/asn_lookup.py
On-demand / API
Network name, CIDR, ASN, country
Takedown Verification
worker/jobs/takedown_check.py
Scheduled (12h)
taken_down_at on entity_stats
The infrastructure clustering job (worker/jobs/infrastructure_clustering.py, default every 6 hours) discovers shared-hosting relationships by computing entity co-occurrence across cases and writing edges to infrastructure_edges.
The watchlist check job (worker/jobs/watchlist_check.py, default every 30 minutes) monitors pinned entities for new case activity and loss threshold breaches, generating alerts in the watchlist_alerts table.
For job schedules and Docker image mapping, see Job Architecture. For PII safeguards applied during ingestion, see Security Model.
Last updated