Neo4j für Forensische Analysen - Von Grundlagen bis zur Berichtserstellung
Neo4j ist eine leistungsstarke Graphdatenbank, die sich hervorragend für forensische Analysen eignet, bei denen Beziehungen zwischen Entitäten (Personen, Dateien, Prozesse, Netzwerkverbindungen) im Mittelpunkt stehen. Dieser umfassende Leitfaden führt Sie von den Grundlagen bis zu komplexen forensischen Analyseszenarien.
Warum Neo4j für Forensik?
Graphdatenbanken sind ideal für forensische Untersuchungen, da sie:
- Beziehungen natürlich modellieren - Verbindungen zwischen Verdächtigen, Geräten und Aktivitäten
- Mustererkennung ermöglichen - Anomalien und verdächtige Strukturen in komplexen Datenstrukturen
- Zeitbasierte Analysen unterstützen - Zeitleisten-Rekonstruktion und temporale Beziehungen
- Skalierbare Abfragen bieten - Millionen von Verbindungen effizient durchsuchen
- Visualisierung vereinfachen - Komplexe Zusammenhänge grafisch darstellen
Grundlegende Konzepte für Forensiker
Knoten (Nodes)
Repräsentieren Entitäten in Ihrer Untersuchung:
// Verschiedene forensische Entitäten
(:Person {name: "John Doe", id: "emp001"})
(:Computer {hostname: "DESKTOP-ABC123", ip: "192.168.1.100"})
(:File {path: "C:\\Users\\john\\suspicious.exe", hash: "a1b2c3..."})
(:Process {name: "cmd.exe", pid: 1234})
(:NetworkConnection {src_ip: "192.168.1.100", dst_ip: "185.199.108.153"})
Beziehungen (Relationships)
Verbindungen zwischen Entitäten mit Eigenschaften:
// Forensische Beziehungen
(:Person)-[:LOGGED_INTO {timestamp: "2024-01-15T10:30:00"}]->(:Computer)
(:Process)-[:CREATED {timestamp: "2024-01-15T10:31:15"}]->(:File)
(:Computer)-[:CONNECTED_TO {port: 443, protocol: "HTTPS"}]->(:NetworkConnection)
Grundoperationen - Knoten und Beziehungen verwalten
Knoten erstellen (CREATE)
Einzelne Knoten erstellen:
// Person erstellen
CREATE (p:Person {
name: "Max Mustermann",
employee_id: "E001",
department: "IT",
email: "max.mustermann@firma.de"
})
// Computer erstellen
CREATE (c:Computer {
hostname: "DESKTOP-IT001",
ip: "192.168.1.50",
os: "Windows 10",
mac_address: "00:11:22:33:44:55"
})
// Datei erstellen
CREATE (f:File {
path: "C:\\temp\\verdaechtig.exe",
hash_md5: "5d41402abc4b2a76b9719d911017c592",
size: 2048,
created_time: datetime("2024-01-15T14:30:00")
})
Mehrere Knoten gleichzeitig:
CREATE
(p:Person {name: "Anna Schmidt", id: "E002"}),
(c:Computer {hostname: "LAPTOP-HR001", ip: "192.168.1.75"}),
(f:File {path: "C:\\Users\\anna\\dokument.pdf", size: 1024})
Beziehungen erstellen
Einfache Beziehung:
MATCH (p:Person {name: "Max Mustermann"})
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
CREATE (p)-[:LOGGED_INTO {timestamp: datetime(), session_id: "12345"}]->(c)
Knoten und Beziehung in einem Schritt:
CREATE (p:Person {name: "Thomas Weber"})-[:OWNS]->(c:Computer {hostname: "LAPTOP-TW001"})
Mehrere Beziehungen:
MATCH (p:Person {name: "Max Mustermann"})
MATCH (c1:Computer {hostname: "DESKTOP-IT001"})
MATCH (c2:Computer {hostname: "SERVER-001"})
CREATE
(p)-[:ACCESSED {timestamp: datetime(), access_type: "remote"}]->(c1),
(p)-[:ADMINISTERED {timestamp: datetime(), privileges: "full"}]->(c2)
Eigenschaften aktualisieren (SET)
Einzelne Eigenschaft ändern:
MATCH (p:Person {name: "Max Mustermann"})
SET p.status = "under_investigation"
Mehrere Eigenschaften aktualisieren:
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
SET c.status = "compromised",
c.last_scan = datetime(),
c.security_level = "high_risk"
Eigenschaft zu Beziehung hinzufügen:
MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
WHERE p.name = "Max Mustermann" AND c.hostname = "DESKTOP-IT001"
SET login.logout_time = datetime(),
login.session_duration = duration("PT2H30M")
Knoten und Beziehungen zusammenführen (MERGE)
Knoten zusammenführen (erstellt nur wenn nicht vorhanden):
MERGE (p:Person {employee_id: "E001"})
ON CREATE SET p.name = "Max Mustermann", p.created = datetime()
ON MATCH SET p.last_seen = datetime()
Beziehung zusammenführen:
MATCH (p:Person {name: "Max Mustermann"})
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
MERGE (p)-[login:LOGGED_INTO {date: date()}]->(c)
ON CREATE SET login.first_login = datetime()
ON MATCH SET login.last_login = datetime()
Löschen (DELETE)
Beziehung löschen:
MATCH (p:Person {name: "Max Mustermann"})-[login:LOGGED_INTO]->(c:Computer)
DELETE login
Knoten löschen (nur wenn keine Beziehungen):
MATCH (f:File {path: "C:\\temp\\test.txt"})
DELETE f
Knoten mit allen Beziehungen löschen:
MATCH (p:Person {name: "Test User"})
DETACH DELETE p
Mehrere Knoten nach Kriterium:
MATCH (f:File)
WHERE f.size < 1024 AND f.temporary = true
DETACH DELETE f
Eigenschaften entfernen (REMOVE)
Einzelne Eigenschaft entfernen:
MATCH (p:Person {name: "Max Mustermann"})
REMOVE p.temp_status
Label entfernen:
MATCH (n:TempNode)
REMOVE n:TempNode
Von einfachen zu komplexen Abfragen
Level 1: Grundlegende MATCH-Abfragen
Alle Computer in der Untersuchung finden:
MATCH (c:Computer)
RETURN c.hostname, c.ip, c.os
Bestimmte Datei suchen:
MATCH (f:File {name: "suspicious.exe"})
RETURN f.path, f.hash, f.size
Personen mit bestimmten Eigenschaften:
MATCH (p:Person)
WHERE p.department = "IT"
RETURN p.name, p.email
Level 2: Einfache Beziehungsabfragen
Wer hat sich an welchen Computern angemeldet?
MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
RETURN p.name, c.hostname, login.timestamp
ORDER BY login.timestamp DESC
Welche Dateien wurden von einem bestimmten Prozess erstellt?
MATCH (proc:Process {name: "powershell.exe"})-[:CREATED]->(f:File)
RETURN proc.pid, f.path, f.created_time
Level 3: Mustersuche und Pfade
Verdächtige Dateierstellung-Ketten finden:
MATCH path = (p:Person)-[:LOGGED_INTO]->(c:Computer)<-[:RUNNING_ON]-(proc:Process)-[:CREATED]->(f:File)
WHERE f.path CONTAINS ".exe"
RETURN path
Netzwerkverbindungen von kompromittierten Systemen:
MATCH (c:Computer {status: "compromised"})-[:INITIATED]->(conn:NetworkConnection)
WHERE conn.dst_port IN [80, 443, 8080]
RETURN c.hostname, conn.dst_ip, conn.dst_port, conn.timestamp
Level 4: Aggregation und Statistiken
Anmeldestatistiken pro Benutzer:
MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
RETURN p.name,
count(login) as login_count,
collect(DISTINCT c.hostname) as computers_accessed,
min(login.timestamp) as first_login,
max(login.timestamp) as last_login
ORDER BY login_count DESC
Häufigste Netzwerkziele identifizieren:
MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
RETURN conn.dst_ip,
count(*) as connection_count,
collect(DISTINCT c.hostname) as source_computers
ORDER BY connection_count DESC
LIMIT 10
Level 5: Zeitbasierte Analysen
Aktivitäten in einem bestimmten Zeitfenster:
MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
WHERE datetime(r.timestamp) >= datetime("2024-01-15T08:00:00")
AND datetime(r.timestamp) <= datetime("2024-01-15T18:00:00")
RETURN p.name, c.hostname, r.timestamp
ORDER BY r.timestamp
Zeitleiste einer kompromittierten Sitzung:
MATCH (c:Computer {hostname: "DESKTOP-ABC123"})
MATCH (c)<-[r]-(entity)
WHERE r.timestamp IS NOT NULL
RETURN type(entity) as entity_type,
entity.name as entity_name,
type(r) as relationship_type,
r.timestamp
ORDER BY r.timestamp
Level 6: Erweiterte Pfadanalysen
Seitliche Bewegung erkennen:
MATCH path = (start:Computer)<-[:LOGGED_INTO]-(p:Person)-[:LOGGED_INTO]->(end:Computer)
WHERE start <> end
AND datetime(path.relationships[0].timestamp) < datetime(path.relationships[1].timestamp)
RETURN p.name,
start.hostname as from_computer,
end.hostname as to_computer,
length(path) as hop_count
Dateiverteilung verfolgen:
MATCH path = (origin:File)-[:COPIED_TO|:MOVED_TO*1..5]->(destination:File)
WHERE origin.hash = destination.hash
RETURN path, length(path) as propagation_steps
Level 7: Komplexe Untersuchungsmuster
Insider-Bedrohung erkennen:
MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
MATCH (c)-[:ACCESSED]->(f:File)
WHERE f.classification = "confidential"
AND NOT (p)-[:AUTHORIZED_FOR]->(f)
WITH p, count(f) as unauthorized_access_count
WHERE unauthorized_access_count > 5
RETURN p.name, p.department, unauthorized_access_count
Command & Control Kommunikation identifizieren:
MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
WHERE conn.dst_port IN [80, 443, 8080, 53]
WITH conn.dst_ip as suspicious_ip, count(*) as freq, collect(DISTINCT c.hostname) as computers
WHERE freq > 100 AND size(computers) > 1
MATCH (compromised:Computer)-[:CONNECTED_TO]->(cc_conn:NetworkConnection {dst_ip: suspicious_ip})
RETURN suspicious_ip, freq, computers, collect(cc_conn.timestamp) as connection_times
APOC - Erweiterte Funktionen für Forensiker
APOC (Awesome Procedures on Cypher) erweitert Neo4j um wichtige forensische Funktionen.
Zeitfunktionen
Zeitstempel-Konvertierung:
// Unix-Timestamp zu datetime
RETURN apoc.date.format(1642248600, 's', 'yyyy-MM-dd HH:mm:ss') as formatted_time
// Zeitbereich-Abfragen
MATCH (event:Event)
WHERE apoc.date.parse(event.timestamp, 's', 'yyyy-MM-dd HH:mm:ss')
BETWEEN apoc.date.parse('2024-01-15 08:00:00', 's', 'yyyy-MM-dd HH:mm:ss')
AND apoc.date.parse('2024-01-15 18:00:00', 's', 'yyyy-MM-dd HH:mm:ss')
RETURN event
Datenimport für forensische Artefakte
CSV-Import von Windows Ereignisprotokollen:
CALL apoc.load.csv('file:///windows_events.csv') YIELD map
CREATE (e:Event {
event_id: toInteger(map.EventID),
timestamp: datetime(map.TimeGenerated),
computer: map.ComputerName,
user: map.User,
description: map.Message
})
JSON-Import von Netzwerk-Protokollen:
CALL apoc.load.json('file:///network_traffic.json') YIELD value
UNWIND value.connections as conn
CREATE (nc:NetworkConnection {
src_ip: conn.source_ip,
dst_ip: conn.destination_ip,
dst_port: conn.destination_port,
protocol: conn.protocol,
timestamp: datetime(conn.timestamp),
bytes_transferred: conn.bytes
})
Graphalgorithmen für Untersuchungen
Kürzester Pfad zwischen verdächtigen Entitäten:
MATCH (start:Person {name: "Verdächtiger A"}), (end:File {classification: "streng_geheim"})
CALL apoc.algo.dijkstra(start, end, 'ACCESSED|SHARED|COPIED', 'weight') YIELD path, weight
RETURN path, weight
PageRank für wichtige Akteure identifizieren:
CALL apoc.algo.pageRank(['Person'], ['COMMUNICATED_WITH', 'SHARED_FILE'])
YIELD node, score
RETURN node.name as person, score
ORDER BY score DESC
LIMIT 10
Forensische Datenmodellierung
Typische Entitätstypen
// Personen und Identitäten
CREATE (p:Person {name: "Max Mustermann", employee_id: "E001", department: "Finance"})
CREATE (u:UserAccount {username: "mmustermann", domain: "FIRMA", sid: "S-1-5-21-..."})
// Systeme und Hardware
CREATE (c:Computer {hostname: "WS001", ip: "192.168.1.100", os: "Windows 10"})
CREATE (m:Mobile {imei: "123456789012345", number: "+1234567890"})
// Dateien und Artefakte
CREATE (f:File {path: "C:\\temp\\malware.exe", hash_md5: "5d41402a...", size: 1024})
CREATE (r:RegistryKey {path: "HKLM\\Software\\Microsoft\\..."})
// Netzwerk und Kommunikation
CREATE (ip:IPAddress {address: "192.168.1.100", geolocation: "Internal"})
CREATE (e:Email {subject: "Re: Projekt X", timestamp: datetime()})
// Prozesse und Aktivitäten
CREATE (proc:Process {name: "cmd.exe", pid: 1234, command_line: "cmd.exe /c dir"})
CREATE (event:Event {event_id: 4624, source: "Security", timestamp: datetime()})
Beziehungstypen für forensische Analysen
// Benutzer-System-Interaktionen
CREATE (person)-[:OWNS]->(computer)
CREATE (user)-[:LOGGED_INTO {timestamp: datetime(), session_id: "12345"}]->(computer)
// Dateioperationen
CREATE (process)-[:CREATED {timestamp: datetime()}]->(file)
CREATE (process)-[:MODIFIED {timestamp: datetime(), action: "write"}]->(file)
CREATE (process)-[:DELETED {timestamp: datetime()}]->(file)
// Netzwerkaktivitäten
CREATE (computer)-[:CONNECTED_TO {timestamp: datetime(), protocol: "TCP"}]->(ip)
CREATE (process)-[:INITIATED_CONNECTION]->(network_connection)
// Kommunikation
CREATE (person)-[:SENT {timestamp: datetime()}]->(email)
CREATE (person)-[:RECEIVED {timestamp: datetime()}]->(email)
// Zeitliche Beziehungen
CREATE (event1)-[:FOLLOWED_BY {duration: duration("PT5M")}]->(event2)
CREATE (activity)-[:OCCURRED_DURING]->(time_period)
Zeitleisten-Analyse mit Neo4j
Zeitbasierte Ereignisrekonstruktion
Chronologische Ereignissequenz:
MATCH (e:Event)
WHERE e.computer = "DESKTOP-ABC123"
AND e.timestamp >= datetime("2024-01-15T10:00:00")
AND e.timestamp <= datetime("2024-01-15T12:00:00")
RETURN e.timestamp, e.event_id, e.description, e.user
ORDER BY e.timestamp
Überlappende Aktivitäten erkennen:
MATCH (e1:Event)-[:OCCURRED_ON]->(c:Computer)<-[:OCCURRED_ON]-(e2:Event)
WHERE e1.timestamp <= e2.timestamp <= e1.end_timestamp
AND e1 <> e2
RETURN e1.description as activity1,
e2.description as activity2,
c.hostname,
e1.timestamp as start1,
e2.timestamp as start2
Zeitliche Mustererkennung
Wiederkehrende verdächtige Aktivitäten:
MATCH (p:Process)-[:CREATED]->(f:File)
WHERE f.path CONTAINS "temp"
WITH p.name as process_name,
duration.between(min(f.created_time), max(f.created_time)) as time_span,
count(f) as file_count
WHERE file_count > 10 AND time_span < duration("PT1H")
RETURN process_name, file_count, time_span
Visualisierung und Berichtserstellung
Grafische Darstellung im Neo4j Browser
Neo4j Browser bietet eingebaute Visualisierung für forensische Analysen. Hier lernen Sie, wie Sie Ihre Daten direkt grafisch darstellen.
Grundlegende Visualisierungsabfragen
Alle Beziehungen zwischen Personen und Computern anzeigen:
MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
RETURN p, r, c
LIMIT 50
Netzwerk um einen verdächtigen Benutzer:
MATCH (p:Person {name: "Max Mustermann"})-[r]-(connected)
RETURN p, r, connected
Dateierstellung-Ketten visualisieren:
MATCH path = (proc:Process)-[:CREATED]->(f:File)-[:COPIED_TO]->(f2:File)
RETURN path
LIMIT 25
Netzwerkverbindungen mit Zeitfilter:
MATCH (c:Computer)-[conn:CONNECTED_TO]->(target)
WHERE datetime(conn.timestamp) >= datetime("2024-01-15T10:00:00")
RETURN c, conn, target
LIMIT 100
Visualisierung anpassen
Knoten-Farben nach Typ festlegen: Im Neo4j Browser können Sie die Darstellung anpassen:
- Klicken Sie auf einen Knoten-Typ (z.B.
:Person
) in der Legende - Wählen Sie Farbe und Größe
- Legen Sie die Beschriftung fest (z.B.
name
oderhostname
)
Beziehungs-Labels anzeigen:
MATCH (p:Person)-[r:LOGGED_INTO {session_id: "12345"}]->(c:Computer)
RETURN p, r, c
Klicken Sie auf den Beziehungstyp in der Legende und aktivieren Sie “Caption” für Eigenschaften wie timestamp
.
Graph-Layout optimieren:
- Hierarchical Layout: Für Zeitleisten und Prozess-Bäume
- Force Layout: Für allgemeine Netzwerk-Visualisierung
- Grid Layout: Für strukturierte Darstellungen
Forensik-spezifische Visualisierungen
Kompromittierungsweg darstellen:
MATCH path = (attacker:Person)-[:LOGGED_INTO]->(entry:Computer)-[:CONNECTED_TO*1..3]-(target:Computer)
WHERE entry.security_level = "low" AND target.classification = "sensitive"
RETURN path
Zeitbasierte Aktivitätsketten:
MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
MATCH (c)<-[:RUNNING_ON]-(proc:Process)-[created:CREATED]->(f:File)
WHERE datetime(login.timestamp) <= datetime(created.timestamp) <= datetime(login.timestamp) + duration("PT1H")
RETURN p, login, c, proc, created, f
Anomalie-Cluster identifizieren:
MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
WITH p, count(c) as computer_count
WHERE computer_count > 5 // Benutzer mit Zugriff auf viele Computer
MATCH (p)-[r:LOGGED_INTO]->(c:Computer)
RETURN p, r, c
Visualisierungsfilter für große Datensätze
Aktivitäten nach Zeitfenster begrenzen:
MATCH (entity)-[r]-(connected)
WHERE r.timestamp >= datetime("2024-01-15T08:00:00")
AND r.timestamp <= datetime("2024-01-15T18:00:00")
RETURN entity, r, connected
LIMIT 200
Nach Wichtigkeit filtern:
MATCH (p:Person)-[r]->(entity)
WHERE p.risk_level = "high" OR entity.classification = "confidential"
RETURN p, r, entity
LIMIT 100
Schrittweise Expansion:
// Schritt 1: Zentrale Person finden
MATCH (p:Person {name: "Verdächtiger"})
RETURN p
// Schritt 2: Direkte Verbindungen erweitern
MATCH (p:Person {name: "Verdächtiger"})-[r]-(level1)
RETURN p, r, level1
// Schritt 3: Zweite Ebene hinzufügen
MATCH (p:Person {name: "Verdächtiger"})-[r1]-(level1)-[r2]-(level2)
WHERE type(r2) IN ['SHARED_FILE', 'COMMUNICATED_WITH']
RETURN p, r1, level1, r2, level2
LIMIT 50
Interaktive Untersuchung
Doppelklick-Expansion verwenden:
- Starten Sie mit einer einfachen Abfrage
- Doppelklicken Sie auf interessante Knoten
- Neo4j erweitert automatisch die Verbindungen
- Verwenden Sie Rechtsklick → “Dismiss” um Knoten auszublenden
Graph-Export aus dem Browser:
- Klicken Sie auf das Kamera-Symbol für Screenshots
- Oder verwenden Sie den Export-Button für SVG/PNG
- Für Berichte: Speichern als PNG mit Titel und Zeitstempel
Ermittlungsberichte erstellen
Zusammenfassung verdächtiger Aktivitäten:
MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
WHERE datetime(r.timestamp) >= datetime("2024-01-15T00:00:00")
WITH p, c, count(r) as login_count, collect(r.timestamp) as login_times
WHERE login_count > 20 // Verdächtig viele Anmeldungen
RETURN p.name as suspect,
p.department,
c.hostname as accessed_computer,
login_count,
login_times[0] as first_login,
login_times[-1] as last_login
ORDER BY login_count DESC
Netzwerk-Anomalien-Bericht:
MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
WHERE conn.dst_port NOT IN [80, 443, 53, 22, 23, 25, 110, 143, 993, 995]
WITH conn.dst_ip as suspicious_ip,
conn.dst_port as suspicious_port,
count(*) as frequency,
collect(DISTINCT c.hostname) as source_computers
WHERE frequency > 5
RETURN suspicious_ip,
suspicious_port,
frequency,
source_computers,
size(source_computers) as affected_computers_count
ORDER BY frequency DESC
Export für externe Werkzeuge
Gephi-Format für Visualisierung:
// Knoten für Gephi
MATCH (n)
RETURN id(n) as Id,
labels(n)[0] as Label,
n.name as Name,
CASE labels(n)[0]
WHEN 'Person' THEN 'red'
WHEN 'Computer' THEN 'blue'
WHEN 'File' THEN 'green'
ELSE 'gray'
END as Color
UNION
// Kanten für Gephi
MATCH (a)-[r]->(b)
RETURN id(a) as Source,
id(b) as Target,
type(r) as Type,
r.timestamp as Timestamp
Leistungsoptimierung für große Datensätze
Indizierung strategisch einsetzen
// Zeitstempel-Indizes für Zeitleisten-Analysen
CREATE INDEX FOR (e:Event) ON (e.timestamp)
CREATE INDEX FOR (l:Login) ON (l.timestamp)
// Eindeutige Bezeichner
CREATE CONSTRAINT FOR (f:File) REQUIRE f.hash IS UNIQUE
CREATE CONSTRAINT FOR (p:Person) REQUIRE p.employee_id IS UNIQUE
// Zusammengesetzte Indizes für häufige Abfragen
CREATE INDEX FOR (c:Computer) ON (c.hostname, c.ip)
Abfrageoptimierung
PROFILE verwenden für Leistungsanalyse:
PROFILE
MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
WHERE p.department = "IT"
RETURN p.name, c.hostname
Begrenzung für große Ergebnismengen:
MATCH (f:File)
WHERE f.size > 100000000 // Dateien größer als 100MB
RETURN f.path, f.size
ORDER BY f.size DESC
LIMIT 100
Häufige Probleme und Lösungen
Speicheroptimierung
# Neo4j-Konfiguration für große Datensätze
# Speicher-Einstellungen in neo4j.conf
dbms.memory.heap.initial_size=2G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G
Datenimport-Leistung
// Stapelimport für große CSV-Dateien
:auto USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM 'file:///large_dataset.csv' AS row
CREATE (e:Event {
timestamp: datetime(row.timestamp),
event_id: toInteger(row.event_id),
computer: row.computer,
description: row.description
})
Fortgeschrittene Techniken
Maschinelles Lernen Integration
// Anomalieerkennung durch statistische Analyse
MATCH (p:Person)-[l:LOGGED_INTO]->(c:Computer)
WITH p, count(l) as login_frequency
WITH avg(login_frequency) as avg_logins, stdev(login_frequency) as std_logins
MATCH (p:Person)-[l:LOGGED_INTO]->(c:Computer)
WITH p, count(l) as user_logins, avg_logins, std_logins
WHERE user_logins > (avg_logins + 2 * std_logins)
RETURN p.name as potential_anomaly, user_logins, avg_logins
Korrelationsanalyse
// Zeitliche Korrelation zwischen Ereignissen
MATCH (e1:Event), (e2:Event)
WHERE e1.computer = e2.computer
AND abs(duration.between(e1.timestamp, e2.timestamp).seconds) < 300 // 5 Minuten
AND e1 <> e2
RETURN e1.event_id, e2.event_id, e1.timestamp, e2.timestamp,
duration.between(e1.timestamp, e2.timestamp) as time_diff
Installation und Einrichtung
Docker-Installation (empfohlen)
# Neo4j mit APOC-Plugin für Forensik-Umgebungen
docker run \
--name neo4j-forensics \
-p7474:7474 -p7687:7687 \
-d \
-v $HOME/neo4j/data:/data \
-v $HOME/neo4j/logs:/logs \
-v $HOME/neo4j/import:/var/lib/neo4j/import \
--env NEO4J_AUTH=neo4j/forensics123 \
--env NEO4J_PLUGINS='["apoc"]' \
neo4j:latest
Native Installation
# Ubuntu/Debian
wget -O - https://debian.neo4j.com/neotechnology.gpg.key | sudo apt-key add -
echo 'deb https://debian.neo4j.com stable 4.4' | sudo tee /etc/apt/sources.list.d/neo4j.list
sudo apt update
sudo apt install neo4j
# APOC Plugin herunterladen
wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/4.4.0.18/apoc-4.4.0.18-all.jar
sudo mv apoc-4.4.0.18-all.jar /var/lib/neo4j/plugins/
# Neo4j starten
sudo systemctl enable neo4j
sudo systemctl start neo4j
Erste Konfiguration
# Standardpasswort ändern
cypher-shell -u neo4j -p neo4j
ALTER USER neo4j SET PASSWORD 'IhrSicheresPasswort';
# Speicher für forensische Arbeitslasten konfigurieren
sudo nano /etc/neo4j/neo4j.conf
Wichtige Konfigurationseinstellungen:
# Speicherzuweisung
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G
# APOC-Konfiguration
dbms.security.procedures.unrestricted=apoc.*
dbms.security.procedures.allowlist=apoc.*
# Import-Verzeichnis
dbms.directories.import=/var/lib/neo4j/import
Zugriff:
- Web-Schnittstelle:
http://localhost:7474
- Bolt-Protokoll:
bolt://localhost:7687
Zusammenfassung und bewährte Praktiken
Empfohlener Arbeitsablauf
- Datenmodell entwerfen - Entitäten und Beziehungen für Ihren Fall definieren
- Daten importieren - Strukturiert mit APOC oder LOAD CSV
- Indizes erstellen - Leistung für häufige Abfragen optimieren
- Explorative Analyse - Von einfachen zu komplexen Abfragen voranschreiten
- Muster identifizieren - Anomalien und verdächtige Strukturen erkennen
- Zeitleiste rekonstruieren - Chronologische Ereignissequenzen erstellen
- Berichte generieren - Ergebnisse strukturiert dokumentieren
- Visualisieren - Komplexe Beziehungen grafisch darstellen
Wichtige Erkenntnisse
- Graphdatenbanken eignen sich hervorragend für forensische Analysen aufgrund ihrer natürlichen Beziehungsmodellierung
- Cypher ermöglicht intuitive Abfragen die komplexe ermittlerische Fragen beantworten
- APOC erweitert die Funktionalität um zeitbasierte Analysen und Datenimport
- Leistungsoptimierung ist kritisch bei großen forensischen Datensätzen
- Strukturierte Herangehensweise führt zu effektiven Untersuchungsergebnissen
Neo4j bietet forensischen Analysten ein mächtiges Werkzeug zur Untersuchung komplexer digitaler Spuren und zur Aufdeckung versteckter Zusammenhänge in ihren Fällen.
Neo4j für Forensische Analysen - Von Grundlagen bis zur Berichtserstellung
Neo4j ist eine leistungsstarke Graphdatenbank, die sich hervorragend für forensische Analysen eignet, bei denen Beziehungen zwischen Entitäten (Personen, Dateien, Prozesse, Netzwerkverbindungen) im Mittelpunkt stehen. Dieser umfassende Leitfaden führt Sie von den Grundlagen bis zu komplexen forensischen Analyseszenarien.
Warum Neo4j für Forensik?
Graphdatenbanken sind ideal für forensische Untersuchungen, da sie:
- Beziehungen natürlich modellieren - Verbindungen zwischen Verdächtigen, Geräten und Aktivitäten
- Mustererkennung ermöglichen - Anomalien und verdächtige Strukturen in komplexen Datenstrukturen
- Zeitbasierte Analysen unterstützen - Zeitleisten-Rekonstruktion und temporale Beziehungen
- Skalierbare Abfragen bieten - Millionen von Verbindungen effizient durchsuchen
- Visualisierung vereinfachen - Komplexe Zusammenhänge grafisch darstellen
Grundlegende Konzepte für Forensiker
Knoten (Nodes)
Repräsentieren Entitäten in Ihrer Untersuchung:
// Verschiedene forensische Entitäten
(:Person {name: "John Doe", id: "emp001"})
(:Computer {hostname: "DESKTOP-ABC123", ip: "192.168.1.100"})
(:File {path: "C:\\Users\\john\\suspicious.exe", hash: "a1b2c3..."})
(:Process {name: "cmd.exe", pid: 1234})
(:NetworkConnection {src_ip: "192.168.1.100", dst_ip: "185.199.108.153"})
Beziehungen (Relationships)
Verbindungen zwischen Entitäten mit Eigenschaften:
// Forensische Beziehungen
(:Person)-[:LOGGED_INTO {timestamp: "2024-01-15T10:30:00"}]->(:Computer)
(:Process)-[:CREATED {timestamp: "2024-01-15T10:31:15"}]->(:File)
(:Computer)-[:CONNECTED_TO {port: 443, protocol: "HTTPS"}]->(:NetworkConnection)
Grundoperationen - Knoten und Beziehungen verwalten
Knoten erstellen (CREATE)
Einzelne Knoten erstellen:
// Person erstellen
CREATE (p:Person {
name: "Max Mustermann",
employee_id: "E001",
department: "IT",
email: "max.mustermann@firma.de"
})
// Computer erstellen
CREATE (c:Computer {
hostname: "DESKTOP-IT001",
ip: "192.168.1.50",
os: "Windows 10",
mac_address: "00:11:22:33:44:55"
})
// Datei erstellen
CREATE (f:File {
path: "C:\\temp\\verdaechtig.exe",
hash_md5: "5d41402abc4b2a76b9719d911017c592",
size: 2048,
created_time: datetime("2024-01-15T14:30:00")
})
Mehrere Knoten gleichzeitig:
CREATE
(p:Person {name: "Anna Schmidt", id: "E002"}),
(c:Computer {hostname: "LAPTOP-HR001", ip: "192.168.1.75"}),
(f:File {path: "C:\\Users\\anna\\dokument.pdf", size: 1024})
Beziehungen erstellen
Einfache Beziehung:
MATCH (p:Person {name: "Max Mustermann"})
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
CREATE (p)-[:LOGGED_INTO {timestamp: datetime(), session_id: "12345"}]->(c)
Knoten und Beziehung in einem Schritt:
CREATE (p:Person {name: "Thomas Weber"})-[:OWNS]->(c:Computer {hostname: "LAPTOP-TW001"})
Mehrere Beziehungen:
MATCH (p:Person {name: "Max Mustermann"})
MATCH (c1:Computer {hostname: "DESKTOP-IT001"})
MATCH (c2:Computer {hostname: "SERVER-001"})
CREATE
(p)-[:ACCESSED {timestamp: datetime(), access_type: "remote"}]->(c1),
(p)-[:ADMINISTERED {timestamp: datetime(), privileges: "full"}]->(c2)
Eigenschaften aktualisieren (SET)
Einzelne Eigenschaft ändern:
MATCH (p:Person {name: "Max Mustermann"})
SET p.status = "under_investigation"
Mehrere Eigenschaften aktualisieren:
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
SET c.status = "compromised",
c.last_scan = datetime(),
c.security_level = "high_risk"
Eigenschaft zu Beziehung hinzufügen:
MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
WHERE p.name = "Max Mustermann" AND c.hostname = "DESKTOP-IT001"
SET login.logout_time = datetime(),
login.session_duration = duration("PT2H30M")
Knoten und Beziehungen zusammenführen (MERGE)
Knoten zusammenführen (erstellt nur wenn nicht vorhanden):
MERGE (p:Person {employee_id: "E001"})
ON CREATE SET p.name = "Max Mustermann", p.created = datetime()
ON MATCH SET p.last_seen = datetime()
Beziehung zusammenführen:
MATCH (p:Person {name: "Max Mustermann"})
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
MERGE (p)-[login:LOGGED_INTO {date: date()}]->(c)
ON CREATE SET login.first_login = datetime()
ON MATCH SET login.last_login = datetime()
Löschen (DELETE)
Beziehung löschen:
MATCH (p:Person {name: "Max Mustermann"})-[login:LOGGED_INTO]->(c:Computer)
DELETE login
Knoten löschen (nur wenn keine Beziehungen):
MATCH (f:File {path: "C:\\temp\\test.txt"})
DELETE f
Knoten mit allen Beziehungen löschen:
MATCH (p:Person {name: "Test User"})
DETACH DELETE p
Mehrere Knoten nach Kriterium:
MATCH (f:File)
WHERE f.size < 1024 AND f.temporary = true
DETACH DELETE f
Eigenschaften entfernen (REMOVE)
Einzelne Eigenschaft entfernen:
MATCH (p:Person {name: "Max Mustermann"})
REMOVE p.temp_status
Label entfernen:
MATCH (n:TempNode)
REMOVE n:TempNode
Von einfachen zu komplexen Abfragen
Level 1: Grundlegende MATCH-Abfragen
Alle Computer in der Untersuchung finden:
MATCH (c:Computer)
RETURN c.hostname, c.ip, c.os
Bestimmte Datei suchen:
MATCH (f:File {name: "suspicious.exe"})
RETURN f.path, f.hash, f.size
Personen mit bestimmten Eigenschaften:
MATCH (p:Person)
WHERE p.department = "IT"
RETURN p.name, p.email
Level 2: Einfache Beziehungsabfragen
Wer hat sich an welchen Computern angemeldet?
MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
RETURN p.name, c.hostname, login.timestamp
ORDER BY login.timestamp DESC
Welche Dateien wurden von einem bestimmten Prozess erstellt?
MATCH (proc:Process {name: "powershell.exe"})-[:CREATED]->(f:File)
RETURN proc.pid, f.path, f.created_time
Level 3: Mustersuche und Pfade
Verdächtige Dateierstellung-Ketten finden:
MATCH path = (p:Person)-[:LOGGED_INTO]->(c:Computer)<-[:RUNNING_ON]-(proc:Process)-[:CREATED]->(f:File)
WHERE f.path CONTAINS ".exe"
RETURN path
Netzwerkverbindungen von kompromittierten Systemen:
MATCH (c:Computer {status: "compromised"})-[:INITIATED]->(conn:NetworkConnection)
WHERE conn.dst_port IN [80, 443, 8080]
RETURN c.hostname, conn.dst_ip, conn.dst_port, conn.timestamp
Level 4: Aggregation und Statistiken
Anmeldestatistiken pro Benutzer:
MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
RETURN p.name,
count(login) as login_count,
collect(DISTINCT c.hostname) as computers_accessed,
min(login.timestamp) as first_login,
max(login.timestamp) as last_login
ORDER BY login_count DESC
Häufigste Netzwerkziele identifizieren:
MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
RETURN conn.dst_ip,
count(*) as connection_count,
collect(DISTINCT c.hostname) as source_computers
ORDER BY connection_count DESC
LIMIT 10
Level 5: Zeitbasierte Analysen
Aktivitäten in einem bestimmten Zeitfenster:
MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
WHERE datetime(r.timestamp) >= datetime("2024-01-15T08:00:00")
AND datetime(r.timestamp) <= datetime("2024-01-15T18:00:00")
RETURN p.name, c.hostname, r.timestamp
ORDER BY r.timestamp
Zeitleiste einer kompromittierten Sitzung:
MATCH (c:Computer {hostname: "DESKTOP-ABC123"})
MATCH (c)<-[r]-(entity)
WHERE r.timestamp IS NOT NULL
RETURN type(entity) as entity_type,
entity.name as entity_name,
type(r) as relationship_type,
r.timestamp
ORDER BY r.timestamp
Level 6: Erweiterte Pfadanalysen
Seitliche Bewegung erkennen:
MATCH path = (start:Computer)<-[:LOGGED_INTO]-(p:Person)-[:LOGGED_INTO]->(end:Computer)
WHERE start <> end
AND datetime(path.relationships[0].timestamp) < datetime(path.relationships[1].timestamp)
RETURN p.name,
start.hostname as from_computer,
end.hostname as to_computer,
length(path) as hop_count
Dateiverteilung verfolgen:
MATCH path = (origin:File)-[:COPIED_TO|:MOVED_TO*1..5]->(destination:File)
WHERE origin.hash = destination.hash
RETURN path, length(path) as propagation_steps
Level 7: Komplexe Untersuchungsmuster
Insider-Bedrohung erkennen:
MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
MATCH (c)-[:ACCESSED]->(f:File)
WHERE f.classification = "confidential"
AND NOT (p)-[:AUTHORIZED_FOR]->(f)
WITH p, count(f) as unauthorized_access_count
WHERE unauthorized_access_count > 5
RETURN p.name, p.department, unauthorized_access_count
Command & Control Kommunikation identifizieren:
MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
WHERE conn.dst_port IN [80, 443, 8080, 53]
WITH conn.dst_ip as suspicious_ip, count(*) as freq, collect(DISTINCT c.hostname) as computers
WHERE freq > 100 AND size(computers) > 1
MATCH (compromised:Computer)-[:CONNECTED_TO]->(cc_conn:NetworkConnection {dst_ip: suspicious_ip})
RETURN suspicious_ip, freq, computers, collect(cc_conn.timestamp) as connection_times
APOC - Erweiterte Funktionen für Forensiker
APOC (Awesome Procedures on Cypher) erweitert Neo4j um wichtige forensische Funktionen.
Zeitfunktionen
Zeitstempel-Konvertierung:
// Unix-Timestamp zu datetime
RETURN apoc.date.format(1642248600, 's', 'yyyy-MM-dd HH:mm:ss') as formatted_time
// Zeitbereich-Abfragen
MATCH (event:Event)
WHERE apoc.date.parse(event.timestamp, 's', 'yyyy-MM-dd HH:mm:ss')
BETWEEN apoc.date.parse('2024-01-15 08:00:00', 's', 'yyyy-MM-dd HH:mm:ss')
AND apoc.date.parse('2024-01-15 18:00:00', 's', 'yyyy-MM-dd HH:mm:ss')
RETURN event
Datenimport für forensische Artefakte
CSV-Import von Windows Ereignisprotokollen:
CALL apoc.load.csv('file:///windows_events.csv') YIELD map
CREATE (e:Event {
event_id: toInteger(map.EventID),
timestamp: datetime(map.TimeGenerated),
computer: map.ComputerName,
user: map.User,
description: map.Message
})
JSON-Import von Netzwerk-Protokollen:
CALL apoc.load.json('file:///network_traffic.json') YIELD value
UNWIND value.connections as conn
CREATE (nc:NetworkConnection {
src_ip: conn.source_ip,
dst_ip: conn.destination_ip,
dst_port: conn.destination_port,
protocol: conn.protocol,
timestamp: datetime(conn.timestamp),
bytes_transferred: conn.bytes
})
Graphalgorithmen für Untersuchungen
Kürzester Pfad zwischen verdächtigen Entitäten:
MATCH (start:Person {name: "Verdächtiger A"}), (end:File {classification: "streng_geheim"})
CALL apoc.algo.dijkstra(start, end, 'ACCESSED|SHARED|COPIED', 'weight') YIELD path, weight
RETURN path, weight
PageRank für wichtige Akteure identifizieren:
CALL apoc.algo.pageRank(['Person'], ['COMMUNICATED_WITH', 'SHARED_FILE'])
YIELD node, score
RETURN node.name as person, score
ORDER BY score DESC
LIMIT 10
Forensische Datenmodellierung
Typische Entitätstypen
// Personen und Identitäten
CREATE (p:Person {name: "Max Mustermann", employee_id: "E001", department: "Finance"})
CREATE (u:UserAccount {username: "mmustermann", domain: "FIRMA", sid: "S-1-5-21-..."})
// Systeme und Hardware
CREATE (c:Computer {hostname: "WS001", ip: "192.168.1.100", os: "Windows 10"})
CREATE (m:Mobile {imei: "123456789012345", number: "+1234567890"})
// Dateien und Artefakte
CREATE (f:File {path: "C:\\temp\\malware.exe", hash_md5: "5d41402a...", size: 1024})
CREATE (r:RegistryKey {path: "HKLM\\Software\\Microsoft\\..."})
// Netzwerk und Kommunikation
CREATE (ip:IPAddress {address: "192.168.1.100", geolocation: "Internal"})
CREATE (e:Email {subject: "Re: Projekt X", timestamp: datetime()})
// Prozesse und Aktivitäten
CREATE (proc:Process {name: "cmd.exe", pid: 1234, command_line: "cmd.exe /c dir"})
CREATE (event:Event {event_id: 4624, source: "Security", timestamp: datetime()})
Beziehungstypen für forensische Analysen
// Benutzer-System-Interaktionen
CREATE (person)-[:OWNS]->(computer)
CREATE (user)-[:LOGGED_INTO {timestamp: datetime(), session_id: "12345"}]->(computer)
// Dateioperationen
CREATE (process)-[:CREATED {timestamp: datetime()}]->(file)
CREATE (process)-[:MODIFIED {timestamp: datetime(), action: "write"}]->(file)
CREATE (process)-[:DELETED {timestamp: datetime()}]->(file)
// Netzwerkaktivitäten
CREATE (computer)-[:CONNECTED_TO {timestamp: datetime(), protocol: "TCP"}]->(ip)
CREATE (process)-[:INITIATED_CONNECTION]->(network_connection)
// Kommunikation
CREATE (person)-[:SENT {timestamp: datetime()}]->(email)
CREATE (person)-[:RECEIVED {timestamp: datetime()}]->(email)
// Zeitliche Beziehungen
CREATE (event1)-[:FOLLOWED_BY {duration: duration("PT5M")}]->(event2)
CREATE (activity)-[:OCCURRED_DURING]->(time_period)
Zeitleisten-Analyse mit Neo4j
Zeitbasierte Ereignisrekonstruktion
Chronologische Ereignissequenz:
MATCH (e:Event)
WHERE e.computer = "DESKTOP-ABC123"
AND e.timestamp >= datetime("2024-01-15T10:00:00")
AND e.timestamp <= datetime("2024-01-15T12:00:00")
RETURN e.timestamp, e.event_id, e.description, e.user
ORDER BY e.timestamp
Überlappende Aktivitäten erkennen:
MATCH (e1:Event)-[:OCCURRED_ON]->(c:Computer)<-[:OCCURRED_ON]-(e2:Event)
WHERE e1.timestamp <= e2.timestamp <= e1.end_timestamp
AND e1 <> e2
RETURN e1.description as activity1,
e2.description as activity2,
c.hostname,
e1.timestamp as start1,
e2.timestamp as start2
Zeitliche Mustererkennung
Wiederkehrende verdächtige Aktivitäten:
MATCH (p:Process)-[:CREATED]->(f:File)
WHERE f.path CONTAINS "temp"
WITH p.name as process_name,
duration.between(min(f.created_time), max(f.created_time)) as time_span,
count(f) as file_count
WHERE file_count > 10 AND time_span < duration("PT1H")
RETURN process_name, file_count, time_span
Berichtserstellung und Visualisierung
Ermittlungsberichte erstellen
Zusammenfassung verdächtiger Aktivitäten:
MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
WHERE datetime(r.timestamp) >= datetime("2024-01-15T00:00:00")
WITH p, c, count(r) as login_count, collect(r.timestamp) as login_times
WHERE login_count > 20 // Verdächtig viele Anmeldungen
RETURN p.name as suspect,
p.department,
c.hostname as accessed_computer,
login_count,
login_times[0] as first_login,
login_times[-1] as last_login
ORDER BY login_count DESC
Netzwerk-Anomalien-Bericht:
MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
WHERE conn.dst_port NOT IN [80, 443, 53, 22, 23, 25, 110, 143, 993, 995]
WITH conn.dst_ip as suspicious_ip,
conn.dst_port as suspicious_port,
count(*) as frequency,
collect(DISTINCT c.hostname) as source_computers
WHERE frequency > 5
RETURN suspicious_ip,
suspicious_port,
frequency,
source_computers,
size(source_computers) as affected_computers_count
ORDER BY frequency DESC
Export für externe Werkzeuge
Gephi-Format für Visualisierung:
// Knoten für Gephi
MATCH (n)
RETURN id(n) as Id,
labels(n)[0] as Label,
n.name as Name,
CASE labels(n)[0]
WHEN 'Person' THEN 'red'
WHEN 'Computer' THEN 'blue'
WHEN 'File' THEN 'green'
ELSE 'gray'
END as Color
UNION
// Kanten für Gephi
MATCH (a)-[r]->(b)
RETURN id(a) as Source,
id(b) as Target,
type(r) as Type,
r.timestamp as Timestamp
Leistungsoptimierung für große Datensätze
Indizierung strategisch einsetzen
// Zeitstempel-Indizes für Zeitleisten-Analysen
CREATE INDEX FOR (e:Event) ON (e.timestamp)
CREATE INDEX FOR (l:Login) ON (l.timestamp)
// Eindeutige Bezeichner
CREATE CONSTRAINT FOR (f:File) REQUIRE f.hash IS UNIQUE
CREATE CONSTRAINT FOR (p:Person) REQUIRE p.employee_id IS UNIQUE
// Zusammengesetzte Indizes für häufige Abfragen
CREATE INDEX FOR (c:Computer) ON (c.hostname, c.ip)
Abfrageoptimierung
PROFILE verwenden für Leistungsanalyse:
PROFILE
MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
WHERE p.department = "IT"
RETURN p.name, c.hostname
Begrenzung für große Ergebnismengen:
MATCH (f:File)
WHERE f.size > 100000000 // Dateien größer als 100MB
RETURN f.path, f.size
ORDER BY f.size DESC
LIMIT 100
Häufige Probleme und Lösungen
Speicheroptimierung
# Neo4j-Konfiguration für große Datensätze
# Speicher-Einstellungen in neo4j.conf
dbms.memory.heap.initial_size=2G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G
Datenimport-Leistung
// Stapelimport für große CSV-Dateien
:auto USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM 'file:///large_dataset.csv' AS row
CREATE (e:Event {
timestamp: datetime(row.timestamp),
event_id: toInteger(row.event_id),
computer: row.computer,
description: row.description
})
Fortgeschrittene Techniken
Maschinelles Lernen Integration
// Anomalieerkennung durch statistische Analyse
MATCH (p:Person)-[l:LOGGED_INTO]->(c:Computer)
WITH p, count(l) as login_frequency
WITH avg(login_frequency) as avg_logins, stdev(login_frequency) as std_logins
MATCH (p:Person)-[l:LOGGED_INTO]->(c:Computer)
WITH p, count(l) as user_logins, avg_logins, std_logins
WHERE user_logins > (avg_logins + 2 * std_logins)
RETURN p.name as potential_anomaly, user_logins, avg_logins
Korrelationsanalyse
// Zeitliche Korrelation zwischen Ereignissen
MATCH (e1:Event), (e2:Event)
WHERE e1.computer = e2.computer
AND abs(duration.between(e1.timestamp, e2.timestamp).seconds) < 300 // 5 Minuten
AND e1 <> e2
RETURN e1.event_id, e2.event_id, e1.timestamp, e2.timestamp,
duration.between(e1.timestamp, e2.timestamp) as time_diff
Installation und Einrichtung
Docker-Installation (empfohlen)
# Neo4j mit APOC-Plugin für Forensik-Umgebungen
docker run \
--name neo4j-forensics \
-p7474:7474 -p7687:7687 \
-d \
-v $HOME/neo4j/data:/data \
-v $HOME/neo4j/logs:/logs \
-v $HOME/neo4j/import:/var/lib/neo4j/import \
--env NEO4J_AUTH=neo4j/forensics123 \
--env NEO4J_PLUGINS='["apoc"]' \
neo4j:latest
Native Installation
# Ubuntu/Debian
wget -O - https://debian.neo4j.com/neotechnology.gpg.key | sudo apt-key add -
echo 'deb https://debian.neo4j.com stable 4.4' | sudo tee /etc/apt/sources.list.d/neo4j.list
sudo apt update
sudo apt install neo4j
# APOC Plugin herunterladen
wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/4.4.0.18/apoc-4.4.0.18-all.jar
sudo mv apoc-4.4.0.18-all.jar /var/lib/neo4j/plugins/
# Neo4j starten
sudo systemctl enable neo4j
sudo systemctl start neo4j
Erste Konfiguration
# Standardpasswort ändern
cypher-shell -u neo4j -p neo4j
ALTER USER neo4j SET PASSWORD 'IhrSicheresPasswort';
# Speicher für forensische Arbeitslasten konfigurieren
sudo nano /etc/neo4j/neo4j.conf
Wichtige Konfigurationseinstellungen:
# Speicherzuweisung
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G
# APOC-Konfiguration
dbms.security.procedures.unrestricted=apoc.*
dbms.security.procedures.allowlist=apoc.*
# Import-Verzeichnis
dbms.directories.import=/var/lib/neo4j/import
Zugriff:
- Web-Schnittstelle:
http://localhost:7474
- Bolt-Protokoll:
bolt://localhost:7687
Zusammenfassung und bewährte Praktiken
Empfohlener Arbeitsablauf
- Datenmodell entwerfen - Entitäten und Beziehungen für Ihren Fall definieren
- Daten importieren - Strukturiert mit APOC oder LOAD CSV
- Indizes erstellen - Leistung für häufige Abfragen optimieren
- Explorative Analyse - Von einfachen zu komplexen Abfragen voranschreiten
- Muster identifizieren - Anomalien und verdächtige Strukturen erkennen
- Zeitleiste rekonstruieren - Chronologische Ereignissequenzen erstellen
- Berichte generieren - Ergebnisse strukturiert dokumentieren
- Visualisieren - Komplexe Beziehungen grafisch darstellen
Wichtige Erkenntnisse
- Graphdatenbanken eignen sich hervorragend für forensische Analysen aufgrund ihrer natürlichen Beziehungsmodellierung
- Cypher ermöglicht intuitive Abfragen die komplexe ermittlerische Fragen beantworten
- APOC erweitert die Funktionalität um zeitbasierte Analysen und Datenimport
- Leistungsoptimierung ist kritisch bei großen forensischen Datensätzen
- Strukturierte Herangehensweise führt zu effektiven Untersuchungsergebnissen
Neo4j bietet forensischen Analysten ein mächtiges Werkzeug zur Untersuchung komplexer digitaler Spuren und zur Aufdeckung versteckter Zusammenhänge in ihren Fällen.