Data Pipelines

How evidence moves from upload to a signed, verifiable dossier.

Data Pipeline
chevron-rightMermaid source (click to expand)hashtag
spinner

Stages at a glance

  1. Submit & protect — Users upload chats, screenshots, or receipts. The API normalizes content, detects language, deduplicates, and encrypts victim contact fields (Fernet) so only redacted data flows downstream.

  2. Extract & enrich — Document AI performs OCR; entity extraction finds wallets, emails, phones, and sentiment. The Gemini classifier applies the 5-axis taxonomy (scam intent, channel, social engineering, action, persona) with confidence thresholds. Related cases are linked by recurring accounts and campaign patterns.

  3. Store safely — Victim contact fields are Fernet-encrypted at rest; case metadata lands in Cloud SQL; evidence files go to versioned GCS buckets (raw + redacted copies).

  4. Search & triage — Embeddings are generated and indexed in Vertex AI Search for hybrid retrieval (semantic + keyword). SQL filters provide date, status, and type facets. Analysts search without exposing identities.

  5. Report & sign — The report generator assembles findings from case data and redacted evidence into a dossier PDF, signs it with a SHA-256 hash manifest and timestamp, and publishes via controlled links.

  6. Feedback loop — Analyst corrections and outcome signals feed back into the classifier, improving precision over time.

Analytics aggregation (Sprint 3)

A periodic job (analytics_aggregation.py, scheduled every 4 hours) materializes raw data into four aggregate tables used by the Impact Dashboard and Campaign Intelligence views:

Source tables
Target table
Purpose

cases + entities

entity_stats

Entity-level risk and loss totals

indicators + intake_records

indicator_stats

Indicator freshness and counts

threat_campaigns + cases

campaign_stats

Campaign-level aggregates

cases + intake_records

platform_kpis

Weekly/monthly KPI snapshots

The Impact Dashboard reads exclusively from these pre-computed tables, keeping analyst-facing queries fast and avoiding full-table scans on the transactional database. See core/docs/design/threat_intelligence_analytics_tdd.md for the full schema.

External enrichment sources (Sprint 5)

Three enrichment pipelines feed additional data into the analytics layer:

Source
Module
Trigger
Output

Passive DNS

services/enrichment/passive_dns.py

On-demand / API

Historical DNS records per domain

ASN Lookup

services/enrichment/asn_lookup.py

On-demand / API

Network name, CIDR, ASN, country

Takedown Verification

worker/jobs/takedown_check.py

Scheduled (12h)

taken_down_at on entity_stats

The infrastructure clustering job (worker/jobs/infrastructure_clustering.py, default every 6 hours) discovers shared-hosting relationships by computing entity co-occurrence across cases and writing edges to infrastructure_edges.

The watchlist check job (worker/jobs/watchlist_check.py, default every 30 minutes) monitors pinned entities for new case activity and loss threshold breaches, generating alerts in the watchlist_alerts table.

For job schedules and Docker image mapping, see Job Architecture. For PII safeguards applied during ingestion, see Security Model.

Last updated