name: crowdstrike-parser description: "Parse and transform CrowdStrike Falcon data exports including detections, incidents, Spotlight vulnerabilities, EASM assets, and LogScale query results. Use when processing CrowdStrike JSON/CSV exports, building reports from Falcon data, normalizing detection data, or integrating CrowdStrike outputs into security workflows."
CrowdStrike Data Parser
Overview
Utility scripts and patterns for parsing CrowdStrike Falcon platform data exports across all modules.
Supported Data Types
| Data Type | Source | Script |
|---|---|---|
| Detections | Falcon Prevent/Insight XDR | parse_detections.py |
| Incidents | Incident Workbench | parse_incidents.py |
| Vulnerabilities | Spotlight | parse_spotlight.py |
| External Assets | EASM | parse_easm.py |
| Host Inventory | Discover | parse_discover.py |
| Query Results | LogScale | parse_logscale.py |
Quick Start
# Parse detection export
python scripts/parse_detections.py detections.json --output detections_normalized.json
# Parse Spotlight vulnerabilities with KEV enrichment
python scripts/parse_spotlight.py spotlight.json --enrich-kev --output vulns.json
# Convert to CSV for reporting
python scripts/parse_detections.py detections.json --format csv --output detections.csv
Detection Parsing
# scripts/parse_detections.py
"""
Parse CrowdStrike detection exports into normalized format.
Usage:
python parse_detections.py <input_file> [--format json|csv] [--output <file>]
"""
import json
import csv
import sys
from typing import Generator
from datetime import datetime
def parse_detection_json(file_path: str) -> Generator[dict, None, None]:
"""Parse detection JSON export."""
with open(file_path, 'r') as f:
data = json.load(f)
# Handle both array and wrapped formats
detections = data if isinstance(data, list) else data.get("resources", [data])
for det in detections:
yield normalize_detection(det)
def normalize_detection(detection: dict) -> dict:
"""Normalize detection to standard schema."""
device = detection.get("device", {})
behaviors = detection.get("behaviors", [])
# Extract unique tactics/techniques
tactics = list(set(b.get("tactic") for b in behaviors if b.get("tactic")))
techniques = list(set(b.get("technique") for b in behaviors if b.get("technique")))
mitre_ids = list(set(b.get("technique_id") for b in behaviors if b.get("technique_id")))
# Extract IOCs
iocs = []
for b in behaviors:
if b.get("sha256"):
iocs.append({"type": "sha256", "value": b["sha256"]})
if b.get("md5"):
iocs.append({"type": "md5", "value": b["md5"]})
if b.get("ioc_value"):
iocs.append({"type": b.get("ioc_type"), "value": b["ioc_value"]})
return {
"detection_id": detection.get("detection_id"),
"severity": detection.get("max_severity_displayname"),
"severity_score": detection.get("max_severity"),
"status": detection.get("status"),
"first_behavior": detection.get("first_behavior"),
"last_behavior": detection.get("last_behavior"),
"hostname": device.get("hostname"),
"local_ip": device.get("local_ip"),
"external_ip": device.get("external_ip"),
"platform": device.get("platform_name"),
"os_version": device.get("os_version"),
"device_id": device.get("device_id"),
"tactics": tactics,
"techniques": techniques,
"mitre_ids": mitre_ids,
"iocs": iocs,
"filenames": list(set(b.get("filename") for b in behaviors if b.get("filename"))),
"cmdlines": [b.get("cmdline") for b in behaviors if b.get("cmdline")],
"assigned_to": detection.get("assigned_to_name"),
"raw_behaviors": behaviors
}
def severity_to_score(severity: str) -> int:
"""Convert severity name to numeric score."""
return {"Critical": 100, "High": 75, "Medium": 50, "Low": 25, "Informational": 10}.get(severity, 0)
def export_csv(detections: list, output_path: str):
"""Export detections to CSV."""
fieldnames = [
"detection_id", "severity", "severity_score", "status", "hostname",
"local_ip", "platform", "first_behavior", "tactics", "techniques",
"mitre_ids", "filenames"
]
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
for det in detections:
row = det.copy()
row["tactics"] = "; ".join(det.get("tactics", []))
row["techniques"] = "; ".join(det.get("techniques", []))
row["mitre_ids"] = "; ".join(det.get("mitre_ids", []))
row["filenames"] = "; ".join(det.get("filenames", []))
writer.writerow(row)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Parse CrowdStrike detections")
parser.add_argument("input", help="Input JSON file")
parser.add_argument("--format", choices=["json", "csv"], default="json")
parser.add_argument("--output", "-o", help="Output file path")
args = parser.parse_args()
detections = list(parse_detection_json(args.input))
if args.format == "csv":
export_csv(detections, args.output or "detections.csv")
else:
output = args.output or "detections_normalized.json"
with open(output, 'w') as f:
json.dump(detections, f, indent=2)
print(f"Parsed {len(detections)} detections")
Spotlight Vulnerability Parsing
# scripts/parse_spotlight.py
"""
Parse CrowdStrike Spotlight vulnerability exports.
Usage:
python parse_spotlight.py <input_file> [--enrich-kev] [--output <file>]
"""
import json
import csv
import requests
from typing import Generator
KEV_URL = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
def parse_spotlight_json(file_path: str) -> Generator[dict, None, None]:
"""Parse Spotlight vulnerability export."""
with open(file_path, 'r') as f:
data = json.load(f)
vulns = data if isinstance(data, list) else data.get("resources", [data])
for vuln in vulns:
yield normalize_vulnerability(vuln)
def normalize_vulnerability(vuln: dict) -> dict:
"""Normalize Spotlight vulnerability to standard schema."""
cve = vuln.get("cve", {})
host = vuln.get("host_info", {})
app = vuln.get("app", {})
cisa = cve.get("cisa_info", {})
return {
"id": vuln.get("id"),
"cve_id": cve.get("id"),
"severity": cve.get("severity"),
"cvss_score": cve.get("base_score"),
"cvss_vector": cve.get("vector"),
"exploit_status": cve.get("exploit_status"),
"is_kev": cisa.get("is_cisa_kev", False),
"kev_due_date": cisa.get("due_date"),
"hostname": host.get("hostname"),
"local_ip": host.get("local_ip"),
"os_version": host.get("os_version"),
"groups": host.get("groups", []),
"tags": host.get("tags", []),
"agent_version": host.get("agent_version"),
"product": app.get("product_name_version"),
"vendor": app.get("vendor"),
"version": app.get("version"),
"status": vuln.get("status"),
"created": vuln.get("created_timestamp"),
"updated": vuln.get("updated_timestamp"),
"remediation": vuln.get("remediation", {}).get("action"),
"suppression": vuln.get("suppression_info")
}
def enrich_with_kev(vulns: list) -> list:
"""Enrich vulnerabilities with current KEV data."""
# Fetch KEV catalog
resp = requests.get(KEV_URL)
kev_data = resp.json()
kev_lookup = {v["cveID"]: v for v in kev_data.get("vulnerabilities", [])}
for vuln in vulns:
cve_id = vuln.get("cve_id")
if cve_id and cve_id in kev_lookup:
kev = kev_lookup[cve_id]
vuln["is_kev"] = True
vuln["kev_due_date"] = kev.get("dueDate")
vuln["kev_action"] = kev.get("requiredAction")
vuln["ransomware_related"] = kev.get("knownRansomwareCampaignUse") == "Known"
return vulns
def calculate_risk_score(vuln: dict) -> float:
"""Calculate composite risk score."""
score = 0.0
cvss = vuln.get("cvss_score") or 0
score += (cvss / 10) * 40 # CVSS: 40%
if vuln.get("is_kev"):
score += 25 # KEV: 25%
if vuln.get("exploit_status") == "available":
score += 20 # Exploit available: 20%
# Severity bonus
sev_bonus = {"CRITICAL": 15, "HIGH": 10, "MEDIUM": 5}.get(vuln.get("severity", "").upper(), 0)
score += sev_bonus
return min(score, 100)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Parse CrowdStrike Spotlight vulnerabilities")
parser.add_argument("input", help="Input JSON file")
parser.add_argument("--enrich-kev", action="store_true", help="Enrich with CISA KEV data")
parser.add_argument("--output", "-o", help="Output file path")
args = parser.parse_args()
vulns = list(parse_spotlight_json(args.input))
if args.enrich_kev:
vulns = enrich_with_kev(vulns)
# Calculate risk scores
for v in vulns:
v["risk_score"] = calculate_risk_score(v)
# Sort by risk score
vulns.sort(key=lambda x: x["risk_score"], reverse=True)
output = args.output or "spotlight_normalized.json"
with open(output, 'w') as f:
json.dump(vulns, f, indent=2)
print(f"Parsed {len(vulns)} vulnerabilities")
kev_count = sum(1 for v in vulns if v.get("is_kev"))
print(f"KEV vulnerabilities: {kev_count}")
Incident Parsing
# scripts/parse_incidents.py
"""Parse CrowdStrike incident exports."""
import json
from typing import Generator
def parse_incident_json(file_path: str) -> Generator[dict, None, None]:
"""Parse incident JSON export."""
with open(file_path, 'r') as f:
data = json.load(f)
incidents = data if isinstance(data, list) else data.get("resources", [data])
for inc in incidents:
yield normalize_incident(inc)
def normalize_incident(incident: dict) -> dict:
"""Normalize incident to standard schema."""
return {
"incident_id": incident.get("incident_id"),
"incident_type": incident.get("incident_type"),
"state": incident.get("state"),
"status": incident.get("status"),
"score": incident.get("fine_score"),
"name": incident.get("name"),
"description": incident.get("description"),
"created": incident.get("created"),
"start": incident.get("start"),
"end": incident.get("end"),
"tags": incident.get("tags", []),
"assigned_to": incident.get("assigned_to"),
"hosts": [
{
"device_id": h.get("device_id"),
"hostname": h.get("hostname"),
"local_ip": h.get("local_ip")
}
for h in incident.get("hosts", [])
],
"users": incident.get("users", []),
"objectives": incident.get("objectives", []),
"tactics": incident.get("tactics", []),
"techniques": incident.get("techniques", [])
}
EASM Asset Parsing
# scripts/parse_easm.py
"""Parse CrowdStrike EASM (External Attack Surface) exports."""
import json
from typing import Generator
def parse_easm_json(file_path: str) -> Generator[dict, None, None]:
"""Parse EASM asset export."""
with open(file_path, 'r') as f:
data = json.load(f)
assets = data if isinstance(data, list) else data.get("resources", [data])
for asset in assets:
yield normalize_easm_asset(asset)
def normalize_easm_asset(asset: dict) -> dict:
"""Normalize EASM asset to standard schema."""
return {
"id": asset.get("id"),
"asset_type": asset.get("asset_type"),
"asset": asset.get("asset"),
"confidence": asset.get("confidence"),
"discovery_date": asset.get("discovery_date"),
"first_seen": asset.get("first_seen"),
"last_seen": asset.get("last_seen"),
"sources": asset.get("sources", []),
"subsidiaries": asset.get("subsidiaries", []),
"services": [
{
"port": s.get("port"),
"protocol": s.get("protocol"),
"service": s.get("service_name"),
"version": s.get("version")
}
for s in asset.get("services", [])
],
"vulnerabilities": [
{
"cve_id": v.get("cve_id"),
"severity": v.get("severity")
}
for v in asset.get("vulnerabilities", [])
],
"exposures": asset.get("exposures", [])
}
CSV Export Utility
# scripts/export_csv.py
"""Generic CSV export for CrowdStrike data."""
import csv
import json
import sys
def flatten_dict(d: dict, parent_key: str = '', sep: str = '_') -> dict:
"""Flatten nested dictionary."""
items = []
for k, v in d.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_dict(v, new_key, sep).items())
elif isinstance(v, list):
if v and isinstance(v[0], dict):
items.append((new_key, json.dumps(v)))
else:
items.append((new_key, "; ".join(str(x) for x in v)))
else:
items.append((new_key, v))
return dict(items)
def json_to_csv(input_file: str, output_file: str):
"""Convert JSON array to CSV."""
with open(input_file, 'r') as f:
data = json.load(f)
if not isinstance(data, list):
data = data.get("resources", [data])
if not data:
print("No data to export")
return
# Flatten all records
flattened = [flatten_dict(d) for d in data]
# Get all unique keys
all_keys = set()
for record in flattened:
all_keys.update(record.keys())
fieldnames = sorted(all_keys)
with open(output_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction='ignore')
writer.writeheader()
writer.writerows(flattened)
print(f"Exported {len(flattened)} records to {output_file}")
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: export_csv.py <input.json> <output.csv>")
sys.exit(1)
json_to_csv(sys.argv[1], sys.argv[2])
LogScale Query Result Parsing
# scripts/parse_logscale.py
"""Parse CrowdStrike LogScale (Humio) query results."""
import json
from typing import Generator
def parse_logscale_results(file_path: str) -> Generator[dict, None, None]:
"""Parse LogScale query export."""
with open(file_path, 'r') as f:
data = json.load(f)
# LogScale returns events in various formats
events = data.get("events", data) if isinstance(data, dict) else data
for event in events:
yield normalize_logscale_event(event)
def normalize_logscale_event(event: dict) -> dict:
"""Normalize LogScale event to standard schema."""
return {
"timestamp": event.get("@timestamp") or event.get("timestamp"),
"event_type": event.get("event_simpleName"),
"aid": event.get("aid"),
"hostname": event.get("ComputerName"),
"username": event.get("UserName"),
"process_name": event.get("ImageFileName"),
"command_line": event.get("CommandLine"),
"sha256": event.get("SHA256HashData"),
"parent_process": event.get("ParentImageFileName"),
"remote_ip": event.get("RemoteIP"),
"remote_port": event.get("RemotePort"),
"local_ip": event.get("LocalIP"),
"raw": event
}
Batch Processing
#!/bin/bash
# scripts/batch_process.sh - Process multiple CrowdStrike exports
INPUT_DIR="${1:-.}"
OUTPUT_DIR="${2:-./processed}"
mkdir -p "$OUTPUT_DIR"
# Process all detection files
for f in "$INPUT_DIR"/detections*.json; do
[ -f "$f" ] || continue
base=$(basename "$f" .json)
python parse_detections.py "$f" -o "$OUTPUT_DIR/${base}_normalized.json"
done
# Process all Spotlight files
for f in "$INPUT_DIR"/spotlight*.json; do
[ -f "$f" ] || continue
base=$(basename "$f" .json)
python parse_spotlight.py "$f" --enrich-kev -o "$OUTPUT_DIR/${base}_normalized.json"
done
echo "Processing complete. Output in $OUTPUT_DIR"