crowdstrike-parser

star 1

Parse and transform CrowdStrike Falcon data exports including detections, incidents, Spotlight vulnerabilities, EASM assets, and LogScale query results. Use when processing CrowdStrike JSON/CSV exports, building reports from Falcon data, normalizing detection data, or integrating CrowdStrike outputs into security workflows.

rthexton1964 By rthexton1964 schedule Updated 1/26/2026

name: crowdstrike-parser description: "Parse and transform CrowdStrike Falcon data exports including detections, incidents, Spotlight vulnerabilities, EASM assets, and LogScale query results. Use when processing CrowdStrike JSON/CSV exports, building reports from Falcon data, normalizing detection data, or integrating CrowdStrike outputs into security workflows."

CrowdStrike Data Parser

Overview

Utility scripts and patterns for parsing CrowdStrike Falcon platform data exports across all modules.

Supported Data Types

Data Type Source Script
Detections Falcon Prevent/Insight XDR parse_detections.py
Incidents Incident Workbench parse_incidents.py
Vulnerabilities Spotlight parse_spotlight.py
External Assets EASM parse_easm.py
Host Inventory Discover parse_discover.py
Query Results LogScale parse_logscale.py

Quick Start

# Parse detection export
python scripts/parse_detections.py detections.json --output detections_normalized.json

# Parse Spotlight vulnerabilities with KEV enrichment
python scripts/parse_spotlight.py spotlight.json --enrich-kev --output vulns.json

# Convert to CSV for reporting
python scripts/parse_detections.py detections.json --format csv --output detections.csv

Detection Parsing

# scripts/parse_detections.py
"""
Parse CrowdStrike detection exports into normalized format.

Usage:
    python parse_detections.py <input_file> [--format json|csv] [--output <file>]
"""

import json
import csv
import sys
from typing import Generator
from datetime import datetime

def parse_detection_json(file_path: str) -> Generator[dict, None, None]:
    """Parse detection JSON export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    # Handle both array and wrapped formats
    detections = data if isinstance(data, list) else data.get("resources", [data])
    
    for det in detections:
        yield normalize_detection(det)

def normalize_detection(detection: dict) -> dict:
    """Normalize detection to standard schema."""
    device = detection.get("device", {})
    behaviors = detection.get("behaviors", [])
    
    # Extract unique tactics/techniques
    tactics = list(set(b.get("tactic") for b in behaviors if b.get("tactic")))
    techniques = list(set(b.get("technique") for b in behaviors if b.get("technique")))
    mitre_ids = list(set(b.get("technique_id") for b in behaviors if b.get("technique_id")))
    
    # Extract IOCs
    iocs = []
    for b in behaviors:
        if b.get("sha256"):
            iocs.append({"type": "sha256", "value": b["sha256"]})
        if b.get("md5"):
            iocs.append({"type": "md5", "value": b["md5"]})
        if b.get("ioc_value"):
            iocs.append({"type": b.get("ioc_type"), "value": b["ioc_value"]})
    
    return {
        "detection_id": detection.get("detection_id"),
        "severity": detection.get("max_severity_displayname"),
        "severity_score": detection.get("max_severity"),
        "status": detection.get("status"),
        "first_behavior": detection.get("first_behavior"),
        "last_behavior": detection.get("last_behavior"),
        "hostname": device.get("hostname"),
        "local_ip": device.get("local_ip"),
        "external_ip": device.get("external_ip"),
        "platform": device.get("platform_name"),
        "os_version": device.get("os_version"),
        "device_id": device.get("device_id"),
        "tactics": tactics,
        "techniques": techniques,
        "mitre_ids": mitre_ids,
        "iocs": iocs,
        "filenames": list(set(b.get("filename") for b in behaviors if b.get("filename"))),
        "cmdlines": [b.get("cmdline") for b in behaviors if b.get("cmdline")],
        "assigned_to": detection.get("assigned_to_name"),
        "raw_behaviors": behaviors
    }

def severity_to_score(severity: str) -> int:
    """Convert severity name to numeric score."""
    return {"Critical": 100, "High": 75, "Medium": 50, "Low": 25, "Informational": 10}.get(severity, 0)

def export_csv(detections: list, output_path: str):
    """Export detections to CSV."""
    fieldnames = [
        "detection_id", "severity", "severity_score", "status", "hostname",
        "local_ip", "platform", "first_behavior", "tactics", "techniques",
        "mitre_ids", "filenames"
    ]
    
    with open(output_path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
        writer.writeheader()
        for det in detections:
            row = det.copy()
            row["tactics"] = "; ".join(det.get("tactics", []))
            row["techniques"] = "; ".join(det.get("techniques", []))
            row["mitre_ids"] = "; ".join(det.get("mitre_ids", []))
            row["filenames"] = "; ".join(det.get("filenames", []))
            writer.writerow(row)

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Parse CrowdStrike detections")
    parser.add_argument("input", help="Input JSON file")
    parser.add_argument("--format", choices=["json", "csv"], default="json")
    parser.add_argument("--output", "-o", help="Output file path")
    args = parser.parse_args()
    
    detections = list(parse_detection_json(args.input))
    
    if args.format == "csv":
        export_csv(detections, args.output or "detections.csv")
    else:
        output = args.output or "detections_normalized.json"
        with open(output, 'w') as f:
            json.dump(detections, f, indent=2)
    
    print(f"Parsed {len(detections)} detections")

Spotlight Vulnerability Parsing

# scripts/parse_spotlight.py
"""
Parse CrowdStrike Spotlight vulnerability exports.

Usage:
    python parse_spotlight.py <input_file> [--enrich-kev] [--output <file>]
"""

import json
import csv
import requests
from typing import Generator

KEV_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"

def parse_spotlight_json(file_path: str) -> Generator[dict, None, None]:
    """Parse Spotlight vulnerability export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    vulns = data if isinstance(data, list) else data.get("resources", [data])
    
    for vuln in vulns:
        yield normalize_vulnerability(vuln)

def normalize_vulnerability(vuln: dict) -> dict:
    """Normalize Spotlight vulnerability to standard schema."""
    cve = vuln.get("cve", {})
    host = vuln.get("host_info", {})
    app = vuln.get("app", {})
    cisa = cve.get("cisa_info", {})
    
    return {
        "id": vuln.get("id"),
        "cve_id": cve.get("id"),
        "severity": cve.get("severity"),
        "cvss_score": cve.get("base_score"),
        "cvss_vector": cve.get("vector"),
        "exploit_status": cve.get("exploit_status"),
        "is_kev": cisa.get("is_cisa_kev", False),
        "kev_due_date": cisa.get("due_date"),
        "hostname": host.get("hostname"),
        "local_ip": host.get("local_ip"),
        "os_version": host.get("os_version"),
        "groups": host.get("groups", []),
        "tags": host.get("tags", []),
        "agent_version": host.get("agent_version"),
        "product": app.get("product_name_version"),
        "vendor": app.get("vendor"),
        "version": app.get("version"),
        "status": vuln.get("status"),
        "created": vuln.get("created_timestamp"),
        "updated": vuln.get("updated_timestamp"),
        "remediation": vuln.get("remediation", {}).get("action"),
        "suppression": vuln.get("suppression_info")
    }

def enrich_with_kev(vulns: list) -> list:
    """Enrich vulnerabilities with current KEV data."""
    # Fetch KEV catalog
    resp = requests.get(KEV_URL)
    kev_data = resp.json()
    kev_lookup = {v["cveID"]: v for v in kev_data.get("vulnerabilities", [])}
    
    for vuln in vulns:
        cve_id = vuln.get("cve_id")
        if cve_id and cve_id in kev_lookup:
            kev = kev_lookup[cve_id]
            vuln["is_kev"] = True
            vuln["kev_due_date"] = kev.get("dueDate")
            vuln["kev_action"] = kev.get("requiredAction")
            vuln["ransomware_related"] = kev.get("knownRansomwareCampaignUse") == "Known"
    
    return vulns

def calculate_risk_score(vuln: dict) -> float:
    """Calculate composite risk score."""
    score = 0.0
    cvss = vuln.get("cvss_score") or 0
    score += (cvss / 10) * 40  # CVSS: 40%
    
    if vuln.get("is_kev"):
        score += 25  # KEV: 25%
    
    if vuln.get("exploit_status") == "available":
        score += 20  # Exploit available: 20%
    
    # Severity bonus
    sev_bonus = {"CRITICAL": 15, "HIGH": 10, "MEDIUM": 5}.get(vuln.get("severity", "").upper(), 0)
    score += sev_bonus
    
    return min(score, 100)

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Parse CrowdStrike Spotlight vulnerabilities")
    parser.add_argument("input", help="Input JSON file")
    parser.add_argument("--enrich-kev", action="store_true", help="Enrich with CISA KEV data")
    parser.add_argument("--output", "-o", help="Output file path")
    args = parser.parse_args()
    
    vulns = list(parse_spotlight_json(args.input))
    
    if args.enrich_kev:
        vulns = enrich_with_kev(vulns)
    
    # Calculate risk scores
    for v in vulns:
        v["risk_score"] = calculate_risk_score(v)
    
    # Sort by risk score
    vulns.sort(key=lambda x: x["risk_score"], reverse=True)
    
    output = args.output or "spotlight_normalized.json"
    with open(output, 'w') as f:
        json.dump(vulns, f, indent=2)
    
    print(f"Parsed {len(vulns)} vulnerabilities")
    kev_count = sum(1 for v in vulns if v.get("is_kev"))
    print(f"KEV vulnerabilities: {kev_count}")

Incident Parsing

# scripts/parse_incidents.py
"""Parse CrowdStrike incident exports."""

import json
from typing import Generator

def parse_incident_json(file_path: str) -> Generator[dict, None, None]:
    """Parse incident JSON export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    incidents = data if isinstance(data, list) else data.get("resources", [data])
    
    for inc in incidents:
        yield normalize_incident(inc)

def normalize_incident(incident: dict) -> dict:
    """Normalize incident to standard schema."""
    return {
        "incident_id": incident.get("incident_id"),
        "incident_type": incident.get("incident_type"),
        "state": incident.get("state"),
        "status": incident.get("status"),
        "score": incident.get("fine_score"),
        "name": incident.get("name"),
        "description": incident.get("description"),
        "created": incident.get("created"),
        "start": incident.get("start"),
        "end": incident.get("end"),
        "tags": incident.get("tags", []),
        "assigned_to": incident.get("assigned_to"),
        "hosts": [
            {
                "device_id": h.get("device_id"),
                "hostname": h.get("hostname"),
                "local_ip": h.get("local_ip")
            }
            for h in incident.get("hosts", [])
        ],
        "users": incident.get("users", []),
        "objectives": incident.get("objectives", []),
        "tactics": incident.get("tactics", []),
        "techniques": incident.get("techniques", [])
    }

EASM Asset Parsing

# scripts/parse_easm.py
"""Parse CrowdStrike EASM (External Attack Surface) exports."""

import json
from typing import Generator

def parse_easm_json(file_path: str) -> Generator[dict, None, None]:
    """Parse EASM asset export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    assets = data if isinstance(data, list) else data.get("resources", [data])
    
    for asset in assets:
        yield normalize_easm_asset(asset)

def normalize_easm_asset(asset: dict) -> dict:
    """Normalize EASM asset to standard schema."""
    return {
        "id": asset.get("id"),
        "asset_type": asset.get("asset_type"),
        "asset": asset.get("asset"),
        "confidence": asset.get("confidence"),
        "discovery_date": asset.get("discovery_date"),
        "first_seen": asset.get("first_seen"),
        "last_seen": asset.get("last_seen"),
        "sources": asset.get("sources", []),
        "subsidiaries": asset.get("subsidiaries", []),
        "services": [
            {
                "port": s.get("port"),
                "protocol": s.get("protocol"),
                "service": s.get("service_name"),
                "version": s.get("version")
            }
            for s in asset.get("services", [])
        ],
        "vulnerabilities": [
            {
                "cve_id": v.get("cve_id"),
                "severity": v.get("severity")
            }
            for v in asset.get("vulnerabilities", [])
        ],
        "exposures": asset.get("exposures", [])
    }

CSV Export Utility

# scripts/export_csv.py
"""Generic CSV export for CrowdStrike data."""

import csv
import json
import sys

def flatten_dict(d: dict, parent_key: str = '', sep: str = '_') -> dict:
    """Flatten nested dictionary."""
    items = []
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten_dict(v, new_key, sep).items())
        elif isinstance(v, list):
            if v and isinstance(v[0], dict):
                items.append((new_key, json.dumps(v)))
            else:
                items.append((new_key, "; ".join(str(x) for x in v)))
        else:
            items.append((new_key, v))
    return dict(items)

def json_to_csv(input_file: str, output_file: str):
    """Convert JSON array to CSV."""
    with open(input_file, 'r') as f:
        data = json.load(f)
    
    if not isinstance(data, list):
        data = data.get("resources", [data])
    
    if not data:
        print("No data to export")
        return
    
    # Flatten all records
    flattened = [flatten_dict(d) for d in data]
    
    # Get all unique keys
    all_keys = set()
    for record in flattened:
        all_keys.update(record.keys())
    
    fieldnames = sorted(all_keys)
    
    with open(output_file, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
        writer.writeheader()
        writer.writerows(flattened)
    
    print(f"Exported {len(flattened)} records to {output_file}")

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: export_csv.py <input.json> <output.csv>")
        sys.exit(1)
    json_to_csv(sys.argv[1], sys.argv[2])

LogScale Query Result Parsing

# scripts/parse_logscale.py
"""Parse CrowdStrike LogScale (Humio) query results."""

import json
from typing import Generator

def parse_logscale_results(file_path: str) -> Generator[dict, None, None]:
    """Parse LogScale query export."""
    with open(file_path, 'r') as f:
        data = json.load(f)
    
    # LogScale returns events in various formats
    events = data.get("events", data) if isinstance(data, dict) else data
    
    for event in events:
        yield normalize_logscale_event(event)

def normalize_logscale_event(event: dict) -> dict:
    """Normalize LogScale event to standard schema."""
    return {
        "timestamp": event.get("@timestamp") or event.get("timestamp"),
        "event_type": event.get("event_simpleName"),
        "aid": event.get("aid"),
        "hostname": event.get("ComputerName"),
        "username": event.get("UserName"),
        "process_name": event.get("ImageFileName"),
        "command_line": event.get("CommandLine"),
        "sha256": event.get("SHA256HashData"),
        "parent_process": event.get("ParentImageFileName"),
        "remote_ip": event.get("RemoteIP"),
        "remote_port": event.get("RemotePort"),
        "local_ip": event.get("LocalIP"),
        "raw": event
    }

Batch Processing

#!/bin/bash
# scripts/batch_process.sh - Process multiple CrowdStrike exports

INPUT_DIR="${1:-.}"
OUTPUT_DIR="${2:-./processed}"

mkdir -p "$OUTPUT_DIR"

# Process all detection files
for f in "$INPUT_DIR"/detections*.json; do
    [ -f "$f" ] || continue
    base=$(basename "$f" .json)
    python parse_detections.py "$f" -o "$OUTPUT_DIR/${base}_normalized.json"
done

# Process all Spotlight files
for f in "$INPUT_DIR"/spotlight*.json; do
    [ -f "$f" ] || continue
    base=$(basename "$f" .json)
    python parse_spotlight.py "$f" --enrich-kev -o "$OUTPUT_DIR/${base}_normalized.json"
done

echo "Processing complete. Output in $OUTPUT_DIR"
Install via CLI
npx skills add https://github.com/rthexton1964/dotfiles --skill crowdstrike-parser
Repository Details
star Stars 1
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator
rthexton1964
rthexton1964 Explore all skills →