crowdstrike-falcon

star 1

Complete CrowdStrike Falcon platform operations covering all modules: Falcon Prevent (EDR/AV), Spotlight (vulnerability management), EASM (external attack surface), Discover (asset inventory), Identity Protection, OverWatch (threat hunting), Intelligence, Insight XDR, LogScale, Horizon (CSPM), and Fusion SOAR. Use when working with CrowdStrike APIs, parsing Falcon data exports, building detections, analyzing incidents, vulnerability prioritization, or integrating Falcon with other security tools.

rthexton1964 By rthexton1964 schedule Updated 1/26/2026

name: crowdstrike-falcon description: "Complete CrowdStrike Falcon platform operations covering all modules: Falcon Prevent (EDR/AV), Spotlight (vulnerability management), EASM (external attack surface), Discover (asset inventory), Identity Protection, OverWatch (threat hunting), Intelligence, Insight XDR, LogScale, Horizon (CSPM), and Fusion SOAR. Use when working with CrowdStrike APIs, parsing Falcon data exports, building detections, analyzing incidents, vulnerability prioritization, or integrating Falcon with other security tools."

CrowdStrike Falcon Platform

Overview

Complete reference for CrowdStrike Falcon platform operations. Covers API integration, data parsing, detection engineering, and cross-module workflows.

Quick Reference

Module Primary Use API Base
Falcon Prevent EDR/AV, detections /detects/, /incidents/
Spotlight Vulnerability management /spotlight/
EASM External attack surface /fem/
Discover Asset inventory /discover/
Identity Identity protection /identity-protection/
Horizon Cloud security (CSPM) /cspm-registration/
LogScale Log management /humio/
Intel Threat intelligence /intel/
Fusion SOAR workflows /workflows/

Authentication

import requests

def get_falcon_token(client_id: str, client_secret: str, base_url: str = "https://api.crowdstrike.com") -> str:
    """Obtain OAuth2 token for Falcon API access."""
    response = requests.post(
        f"{base_url}/oauth2/token",
        data={"client_id": client_id, "client_secret": client_secret}
    )
    response.raise_for_status()
    return response.json()["access_token"]

def falcon_headers(token: str) -> dict:
    return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

Cloud regions:

  • US-1: api.crowdstrike.com
  • US-2: api.us-2.crowdstrike.com
  • EU-1: api.eu-1.crowdstrike.com
  • US-GOV-1: api.laggar.gcw.crowdstrike.com

Falcon Prevent (EDR)

Detection Queries

def get_detections(token: str, base_url: str, filter_query: str = None, limit: int = 100) -> list:
    """Query detections with FQL filter."""
    params = {"limit": limit}
    if filter_query:
        params["filter"] = filter_query
    
    # Get detection IDs
    resp = requests.get(
        f"{base_url}/detects/queries/detects/v1",
        headers=falcon_headers(token),
        params=params
    )
    detection_ids = resp.json().get("resources", [])
    
    if not detection_ids:
        return []
    
    # Get full details
    resp = requests.post(
        f"{base_url}/detects/entities/summaries/GET/v1",
        headers=falcon_headers(token),
        json={"ids": detection_ids}
    )
    return resp.json().get("resources", [])

# Common FQL filters
CRITICAL_DETECTIONS = "max_severity_displayname:'Critical'+status:'new'"
LAST_24H = "first_behavior:>='2024-01-01T00:00:00Z'"
RANSOMWARE = "behaviors.tactic:'Execution'+behaviors.technique:'Ransomware'"

Incident Management

def get_incidents(token: str, base_url: str, filter_query: str = None) -> list:
    """Query incidents with details."""
    params = {"filter": filter_query} if filter_query else {}
    
    resp = requests.get(
        f"{base_url}/incidents/queries/incidents/v1",
        headers=falcon_headers(token),
        params=params
    )
    incident_ids = resp.json().get("resources", [])
    
    if not incident_ids:
        return []
    
    resp = requests.post(
        f"{base_url}/incidents/entities/incidents/GET/v1",
        headers=falcon_headers(token),
        json={"ids": incident_ids}
    )
    return resp.json().get("resources", [])

def update_incident_status(token: str, base_url: str, incident_ids: list, status: str) -> dict:
    """Update incident status. Valid: new, in_progress, closed, reopened"""
    return requests.patch(
        f"{base_url}/incidents/entities/incident-actions/v1",
        headers=falcon_headers(token),
        json={"action_parameters": [{"name": "update_status", "value": status}], "ids": incident_ids}
    ).json()

Real-Time Response (RTR)

def init_rtr_session(token: str, base_url: str, device_id: str) -> str:
    """Initialize RTR session on endpoint."""
    resp = requests.post(
        f"{base_url}/real-time-response/entities/sessions/v1",
        headers=falcon_headers(token),
        json={"device_id": device_id, "queue_offline": True}
    )
    return resp.json()["resources"][0]["session_id"]

def execute_rtr_command(token: str, base_url: str, session_id: str, 
                        base_command: str, command_string: str) -> dict:
    """Execute RTR command. base_command: ls, cd, cat, ps, netstat, etc."""
    return requests.post(
        f"{base_url}/real-time-response/entities/command/v1",
        headers=falcon_headers(token),
        json={
            "session_id": session_id,
            "base_command": base_command,
            "command_string": command_string
        }
    ).json()

Spotlight (Vulnerability Management)

Vulnerability Queries

def get_spotlight_vulnerabilities(token: str, base_url: str, 
                                   filter_query: str = None, facet: list = None) -> dict:
    """Query Spotlight vulnerabilities with optional faceting."""
    params = {"filter": filter_query or "status:'open'"}
    if facet:
        params["facet"] = facet  # e.g., ["cve.severity", "host_info.hostname"]
    
    resp = requests.get(
        f"{base_url}/spotlight/combined/vulnerabilities/v1",
        headers=falcon_headers(token),
        params=params
    )
    return resp.json()

# Common Spotlight FQL filters
CRITICAL_VULNS = "cve.severity:'CRITICAL'+status:'open'"
EXPLOITED_VULNS = "cve.exploit_status:'available'+status:'open'"
KEV_VULNS = "cve.cisa_info.is_cisa_kev:true+status:'open'"
RECENT_VULNS = "created_timestamp:>='2024-01-01'"
BY_HOST = "host_info.hostname:'HOSTNAME'"

Spotlight Data Export

def export_spotlight_csv(vulnerabilities: list, output_path: str):
    """Export Spotlight data to CSV for reporting."""
    import csv
    
    fieldnames = [
        "cve_id", "severity", "cvss_score", "hostname", "os", 
        "app_name", "app_version", "exploit_status", "kev", 
        "created", "remediation"
    ]
    
    with open(output_path, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        
        for vuln in vulnerabilities:
            cve = vuln.get("cve", {})
            host = vuln.get("host_info", {})
            app = vuln.get("app", {})
            
            writer.writerow({
                "cve_id": cve.get("id"),
                "severity": cve.get("severity"),
                "cvss_score": cve.get("base_score"),
                "hostname": host.get("hostname"),
                "os": host.get("os_version"),
                "app_name": app.get("product_name_version"),
                "app_version": app.get("version"),
                "exploit_status": cve.get("exploit_status"),
                "kev": cve.get("cisa_info", {}).get("is_cisa_kev", False),
                "created": vuln.get("created_timestamp"),
                "remediation": vuln.get("remediation", {}).get("action")
            })

EASM (External Attack Surface)

def get_easm_assets(token: str, base_url: str, asset_type: str = None) -> list:
    """Query external assets. Types: domain, ip, certificate, web_asset"""
    params = {}
    if asset_type:
        params["filter"] = f"asset_type:'{asset_type}'"
    
    resp = requests.get(
        f"{base_url}/fem/queries/external-assets/v1",
        headers=falcon_headers(token),
        params=params
    )
    asset_ids = resp.json().get("resources", [])
    
    if not asset_ids:
        return []
    
    resp = requests.post(
        f"{base_url}/fem/entities/external-assets/v1",
        headers=falcon_headers(token),
        json={"ids": asset_ids}
    )
    return resp.json().get("resources", [])

def get_easm_exposures(token: str, base_url: str) -> list:
    """Get external exposures/vulnerabilities."""
    resp = requests.get(
        f"{base_url}/fem/queries/external-exposures/v1",
        headers=falcon_headers(token)
    )
    return resp.json().get("resources", [])

Discover (Asset Inventory)

def get_discover_hosts(token: str, base_url: str, filter_query: str = None) -> list:
    """Query Discover asset inventory."""
    params = {"filter": filter_query} if filter_query else {}
    
    resp = requests.get(
        f"{base_url}/discover/queries/hosts/v1",
        headers=falcon_headers(token),
        params=params
    )
    host_ids = resp.json().get("resources", [])
    
    if not host_ids:
        return []
    
    resp = requests.post(
        f"{base_url}/discover/entities/hosts/v1",
        headers=falcon_headers(token),
        json={"ids": host_ids}
    )
    return resp.json().get("resources", [])

# Common Discover filters
UNMANAGED = "entity_type:'unmanaged'"
MANAGED = "entity_type:'managed'"  
IOT_DEVICES = "entity_type:'iot'"
BY_SUBNET = "local_ip_addresses:'10.0.1.*'"

Horizon (CSPM)

def get_cspm_findings(token: str, base_url: str, cloud_provider: str = None) -> list:
    """Query cloud security posture findings."""
    params = {}
    if cloud_provider:
        params["filter"] = f"cloud_provider:'{cloud_provider}'"  # aws, azure, gcp
    
    resp = requests.get(
        f"{base_url}/cspm-registration/entities/policy-details/v1",
        headers=falcon_headers(token),
        params=params
    )
    return resp.json().get("resources", [])

def get_cloud_accounts(token: str, base_url: str) -> list:
    """List registered cloud accounts."""
    resp = requests.get(
        f"{base_url}/cspm-registration/entities/accounts/v1",
        headers=falcon_headers(token)
    )
    return resp.json().get("resources", [])

Intel (Threat Intelligence)

def search_intel_actors(token: str, base_url: str, query: str) -> list:
    """Search threat actors."""
    resp = requests.get(
        f"{base_url}/intel/queries/actors/v1",
        headers=falcon_headers(token),
        params={"q": query}
    )
    actor_ids = resp.json().get("resources", [])
    
    if not actor_ids:
        return []
    
    resp = requests.post(
        f"{base_url}/intel/entities/actors/v1",
        headers=falcon_headers(token),
        json={"ids": actor_ids}
    )
    return resp.json().get("resources", [])

def get_intel_indicators(token: str, base_url: str, indicator_type: str = None) -> list:
    """Get threat indicators. Types: hash_md5, hash_sha256, domain, ip_address, url"""
    params = {}
    if indicator_type:
        params["filter"] = f"type:'{indicator_type}'"
    
    resp = requests.get(
        f"{base_url}/intel/queries/indicators/v1",
        headers=falcon_headers(token),
        params=params
    )
    return resp.json().get("resources", [])

LogScale (Humio)

def query_logscale(token: str, base_url: str, repository: str, 
                   query: str, start: str = "24h", end: str = "now") -> dict:
    """Execute LogScale query."""
    return requests.post(
        f"{base_url}/humio/api/v1/repositories/{repository}/query",
        headers=falcon_headers(token),
        json={
            "queryString": query,
            "start": start,
            "end": end
        }
    ).json()

# Example LogScale queries
FAILED_LOGINS = 'event_simpleName=UserLogonFailed | groupBy([UserName, ComputerName])'
PROCESS_CREATION = 'event_simpleName=ProcessRollup2 | ImageFileName=/.*powershell.*/i'
NETWORK_CONNECTIONS = 'event_simpleName=NetworkConnectIP4 | RemotePort=443'

Fusion (SOAR Workflows)

def list_workflows(token: str, base_url: str) -> list:
    """List available Fusion workflows."""
    resp = requests.get(
        f"{base_url}/workflows/entities/definitions/v1",
        headers=falcon_headers(token)
    )
    return resp.json().get("resources", [])

def execute_workflow(token: str, base_url: str, definition_id: str, 
                     trigger_data: dict) -> dict:
    """Trigger a Fusion workflow."""
    return requests.post(
        f"{base_url}/workflows/entities/executions/v1",
        headers=falcon_headers(token),
        json={
            "definition_id": definition_id,
            "trigger": trigger_data
        }
    ).json()

FQL (Falcon Query Language) Reference

Operators

Operator Description Example
: Equals status:'open'
!: Not equals status!:'closed'
>, >=, <, <= Comparison cvss_score:>=7.0
+ AND severity:'Critical'+status:'open'
, OR severity:'Critical',severity:'High'
* Wildcard hostname:'web-*'
~ Contains description:~'ransomware'

Common Patterns

# Date ranges
"timestamp:>='2024-01-01T00:00:00Z'+timestamp:<='2024-01-31T23:59:59Z'"

# Multiple values
"severity:'Critical','High','Medium'"

# Nested fields
"behaviors.tactic:'Persistence'"

# Negation with wildcards
"hostname!:'test-*'"

Data Parsing Utilities

See references/data-formats.md for detailed parsing of:

  • Detection export JSON/CSV
  • Spotlight vulnerability exports
  • Incident report formats
  • LogScale query results
  • RTR script outputs

Rate Limits

Endpoint Category Requests/Minute
OAuth Token 500
Detections 6000
Incidents 6000
Spotlight 6000
RTR 100 per session
Intel 6000
LogScale Varies by repo

Implement exponential backoff on 429 responses.

Install via CLI
npx skills add https://github.com/rthexton1964/dotfiles --skill crowdstrike-falcon
Repository Details
star Stars 1
call_split Forks 0
navigation Branch main
article Path SKILL.md
More from Creator
rthexton1964
rthexton1964 Explore all skills →