name: building-ioc-enrichment-pipeline-with-opencti description: OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This skill covers building an automated IOC enrichment pipeline using O domain: cybersecurity subdomain: threat-intelligence tags:
- threat-intelligence
- cti
- ioc
- mitre-attack
- stix
- opencti
- enrichment
- virustotal version: '1.0' author: mahipal license: Apache-2.0 nist_csf:
- ID.RA-01
- ID.RA-05
- DE.CM-01
- DE.AE-02
Building IOC Enrichment Pipeline with OpenCTI
Overview
OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This skill covers building an automated IOC enrichment pipeline using OpenCTI's connector ecosystem to enrich indicators with context from VirusTotal, Shodan, AbuseIPDB, GreyNoise, and other sources. The pipeline automatically enriches newly ingested indicators, correlates them with known threat actors and campaigns, and scores them for analyst prioritization.
When to Use
- When deploying or configuring building ioc enrichment pipeline with opencti capabilities in your environment
- When establishing security controls aligned to compliance requirements
- When building or improving security architecture for this domain
- When conducting security assessments that require this implementation
Prerequisites
- Docker and Docker Compose for OpenCTI deployment
- Python 3.9+ with
pyctilibrary - API keys for enrichment services: VirusTotal, Shodan, AbuseIPDB, GreyNoise
- Understanding of STIX 2.1 data model and relationships
- ElasticSearch or OpenSearch for OpenCTI backend
- RabbitMQ or Redis for connector messaging
Key Concepts
This section covers key concepts for building ioc enrichment pipeline with opencti.
- Ensure all prerequisites are met before proceeding
- Follow the documented workflow steps in sequence
- Record results and any anomalies encountered during this phase
OpenCTI Architecture
OpenCTI uses a GraphQL API frontend backed by ElasticSearch for storage and Redis/RabbitMQ for connector communication. Data is natively stored as STIX 2.1 objects with relationships. Connectors are categorized as: External Import (feed ingestion), Internal Import (file parsing), Internal Enrichment (context addition), and Stream (real-time export).
Enrichment Connector Model
Internal enrichment connectors are triggered automatically when new observables are created or manually by analysts. Each connector receives STIX objects, queries external services, and returns STIX 2.1 bundles that augment the original observable with additional context, labels, and relationships.
Confidence Scoring
OpenCTI uses a 0-100 confidence scale for indicators. Enrichment connectors can update confidence scores based on external validation: VirusTotal detection ratios, Shodan exposure data, AbuseIPDB report counts, and GreyNoise classification results.
Workflow
- Scope the task — define objectives, boundaries, and success criteria
- Gather information — collect all necessary data and context before proceeding
- Execute the core workflow — follow the domain-specific steps methodically
- Validate results — verify outputs against expected outcomes or baselines
- Document findings — record results, anomalies, and recommendations
Step 1: Deploy OpenCTI with Docker Compose
# docker-compose.yml (key services)
version: '3'
services:
opencti:
image: opencti/platform:6.4.4
environment:
- APP__PORT=8080
- APP__ADMIN__EMAIL=admin@opencti.io
- APP__ADMIN__PASSWORD=ChangeMeNow
- APP__ADMIN__TOKEN=your-admin-token-uuid
- ELASTICSEARCH__URL=http://elasticsearch:9200
- MINIO__ENDPOINT=minio
- RABBITMQ__HOSTNAME=rabbitmq
ports:
- "8080:8080"
depends_on:
- elasticsearch
- minio
- rabbitmq
- redis
connector-virustotal:
image: opencti/connector-virustotal:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-virustotal-id
- CONNECTOR_NAME=VirusTotal
- CONNECTOR_SCOPE=StixFile,Artifact,IPv4-Addr,Domain-Name,Url
- CONNECTOR_AUTO=true
- VIRUSTOTAL_TOKEN=your-vt-api-key
- VIRUSTOTAL_MAX_TLP=TLP:AMBER
connector-shodan:
image: opencti/connector-shodan:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-shodan-id
- CONNECTOR_NAME=Shodan
- CONNECTOR_SCOPE=IPv4-Addr
- CONNECTOR_AUTO=true
- SHODAN_TOKEN=your-shodan-api-key
- SHODAN_MAX_TLP=TLP:AMBER
connector-abuseipdb:
image: opencti/connector-abuseipdb:6.4.4
environment:
- OPENCTI_URL=http://opencti:8080
- OPENCTI_TOKEN=your-admin-token-uuid
- CONNECTOR_ID=connector-abuseipdb-id
- CONNECTOR_NAME=AbuseIPDB
- CONNECTOR_SCOPE=IPv4-Addr
- CONNECTOR_AUTO=true
- ABUSEIPDB_API_KEY=your-abuseipdb-key
Step 2: Build Custom Enrichment Connector
import os
from pycti import OpenCTIConnectorHelper, get_config_variable
from stix2 import (
Bundle, Indicator, Note, Relationship,
IPv4Address, DomainName
)
import requests
class CustomEnrichmentConnector:
def __init__(self):
config = {
"opencti": {
"url": os.environ.get("OPENCTI_URL"),
"token": os.environ.get("OPENCTI_TOKEN"),
},
"connector": {
"id": os.environ.get("CONNECTOR_ID"),
"name": "CustomEnrichment",
"scope": "IPv4-Addr,Domain-Name,Url",
"auto": True,
"type": "INTERNAL_ENRICHMENT",
},
}
self.helper = OpenCTIConnectorHelper(config)
self.helper.listen(self._process_message)
def _process_message(self, data):
entity_id = data["entity_id"]
stix_object = self.helper.api.stix_cyber_observable.read(id=entity_id)
if not stix_object:
return "Observable not found"
observable_type = stix_object["entity_type"]
observable_value = stix_object.get("value", "")
enrichment_results = []
if observable_type == "IPv4-Addr":
enrichment_results = self._enrich_ip(observable_value, entity_id)
elif observable_type == "Domain-Name":
enrichment_results = self._enrich_domain(observable_value, entity_id)
if enrichment_results:
bundle = Bundle(objects=enrichment_results, allow_custom=True)
self.helper.send_stix2_bundle(bundle.serialize())
return "Enrichment completed"
def _enrich_ip(self, ip_address, entity_id):
"""Enrich IP address with GreyNoise, AbuseIPDB context."""
objects = []
# GreyNoise Community API
try:
gn_response = requests.get(
f"https://api.greynoise.io/v3/community/{ip_address}",
headers={"key": os.environ.get("GREYNOISE_API_KEY")},
timeout=30,
)
if gn_response.status_code == 200:
gn_data = gn_response.json()
classification = gn_data.get("classification", "unknown")
noise = gn_data.get("noise", False)
riot = gn_data.get("riot", False)
note_content = (
f"## GreyNoise Enrichment\n"
f"- Classification: {classification}\n"
f"- Internet Noise: {noise}\n"
f"- RIOT (Benign Service): {riot}\n"
f"- Name: {gn_data.get('name', 'N/A')}\n"
f"- Last Seen: {gn_data.get('last_seen', 'N/A')}"
)
note = Note(
content=note_content,
object_refs=[entity_id],
abstract=f"GreyNoise: {classification}",
allow_custom=True,
)
objects.append(note)
# Add labels based on classification
if classification == "malicious":
self.helper.api.stix_cyber_observable.add_label(
id=entity_id, label_name="greynoise:malicious"
)
elif riot:
self.helper.api.stix_cyber_observable.add_label(
id=entity_id, label_name="greynoise:benign-service"
)
except Exception as e:
self.helper.log_error(f"GreyNoise enrichment failed: {e}")
return objects
def _enrich_domain(self, domain, entity_id):
"""Enrich domain with WHOIS and DNS context."""
objects = []
try:
# Use SecurityTrails API for domain enrichment
st_response = requests.get(
f"https://api.securitytrails.com/v1/domain/{domain}",
headers={"APIKEY": os.environ.get("SECURITYTRAILS_API_KEY")},
timeout=30,
)
if st_response.status_code == 200:
st_data = st_response.json()
current_dns = st_data.get("current_dns", {})
a_records = [
r.get("ip") for r in current_dns.get("a", {}).get("values", [])
]
note_content = (
f"## SecurityTrails Enrichment\n"
f"- A Records: {', '.join(a_records)}\n"
f"- Alexa Rank: {st_data.get('alexa_rank', 'N/A')}\n"
f"- Hostname: {st_data.get('hostname', 'N/A')}"
)
note = Note(
content=note_content,
object_refs=[entity_id],
abstract=f"SecurityTrails: {domain}",
allow_custom=True,
)
objects.append(note)
except Exception as e:
self.helper.log_error(f"SecurityTrails enrichment failed: {e}")
return objects
if __name__ == "__main__":
connector = CustomEnrichmentConnector()
## When NOT to Use
- You need to test what you built (use performing-* skills)
- Task is about configuring existing systems (use configuring-* skills)
- You need to analyze the output (use analyzing-* skills)
- Task is about implementing vendor solutions (use implementing-* skills)
- You don't have infrastructure access
- Task requires compliance validation (use auditing-* skills)
## Red Flags
- Performing actions without explicit written authorization from the asset owner
- Testing against production systems without a defined scope and rules of engagement
- Acting on threat intelligence without validating source reliability
- Sharing classified or sensitive indicators without proper handling procedures
- Alerting threat actors to detection capabilities through visible response actions
## Verification
- All steps executed successfully against a test environment before production use
- Output documented with screenshots or logs demonstrating expected behavior
- Results validated against known-good baselines or reference implementations
- Documentation complete enough for another analyst to reproduce findings
## Process
1. Analyze the task requirements
2. Apply domain expertise
3. Verify output quality
## Anti-Rationalization
| Rationalization | Reality |
|---|---|
| "We are too small to be targeted" | Automated attacks target everyone. Size does not matter. |
| "Security slows us down" | A breach slows you down 100x more. Build security in from the start. |
| "We will fix it after launch" | Vulnerabilities in production are exploited within hours. Fix before deploy. |