🕸️ Neo4j für Forensische Analysen - Von Grundlagen bis zur Berichtserstellung

Umfassender Leitfaden für den Einsatz von Neo4j in digitalen forensischen Untersuchungen - von einfachen Abfragen bis zu komplexen Beziehungsanalysen und Berichtserstellung

Zurück

Neo4j für Forensische Analysen - Von Grundlagen bis zur Berichtserstellung

Neo4j ist eine leistungsstarke Graphdatenbank, die sich hervorragend für forensische Analysen eignet, bei denen Beziehungen zwischen Entitäten (Personen, Dateien, Prozesse, Netzwerkverbindungen) im Mittelpunkt stehen. Dieser umfassende Leitfaden führt Sie von den Grundlagen bis zu komplexen forensischen Analyseszenarien.

Warum Neo4j für Forensik?

Graphdatenbanken sind ideal für forensische Untersuchungen, da sie:

  • Beziehungen natürlich modellieren - Verbindungen zwischen Verdächtigen, Geräten und Aktivitäten
  • Mustererkennung ermöglichen - Anomalien und verdächtige Strukturen in komplexen Datenstrukturen
  • Zeitbasierte Analysen unterstützen - Zeitleisten-Rekonstruktion und temporale Beziehungen
  • Skalierbare Abfragen bieten - Millionen von Verbindungen effizient durchsuchen
  • Visualisierung vereinfachen - Komplexe Zusammenhänge grafisch darstellen

Grundlegende Konzepte für Forensiker

Knoten (Nodes)

Repräsentieren Entitäten in Ihrer Untersuchung:

// Verschiedene forensische Entitäten
(:Person {name: "John Doe", id: "emp001"})
(:Computer {hostname: "DESKTOP-ABC123", ip: "192.168.1.100"})
(:File {path: "C:\\Users\\john\\suspicious.exe", hash: "a1b2c3..."})
(:Process {name: "cmd.exe", pid: 1234})
(:NetworkConnection {src_ip: "192.168.1.100", dst_ip: "185.199.108.153"})

Beziehungen (Relationships)

Verbindungen zwischen Entitäten mit Eigenschaften:

// Forensische Beziehungen
(:Person)-[:LOGGED_INTO {timestamp: "2024-01-15T10:30:00"}]->(:Computer)
(:Process)-[:CREATED {timestamp: "2024-01-15T10:31:15"}]->(:File)
(:Computer)-[:CONNECTED_TO {port: 443, protocol: "HTTPS"}]->(:NetworkConnection)

Grundoperationen - Knoten und Beziehungen verwalten

Knoten erstellen (CREATE)

Einzelne Knoten erstellen:

// Person erstellen
CREATE (p:Person {
    name: "Max Mustermann", 
    employee_id: "E001", 
    department: "IT",
    email: "max.mustermann@firma.de"
})

// Computer erstellen
CREATE (c:Computer {
    hostname: "DESKTOP-IT001",
    ip: "192.168.1.50",
    os: "Windows 10",
    mac_address: "00:11:22:33:44:55"
})

// Datei erstellen
CREATE (f:File {
    path: "C:\\temp\\verdaechtig.exe",
    hash_md5: "5d41402abc4b2a76b9719d911017c592",
    size: 2048,
    created_time: datetime("2024-01-15T14:30:00")
})

Mehrere Knoten gleichzeitig:

CREATE 
  (p:Person {name: "Anna Schmidt", id: "E002"}),
  (c:Computer {hostname: "LAPTOP-HR001", ip: "192.168.1.75"}),
  (f:File {path: "C:\\Users\\anna\\dokument.pdf", size: 1024})

Beziehungen erstellen

Einfache Beziehung:

MATCH (p:Person {name: "Max Mustermann"})
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
CREATE (p)-[:LOGGED_INTO {timestamp: datetime(), session_id: "12345"}]->(c)

Knoten und Beziehung in einem Schritt:

CREATE (p:Person {name: "Thomas Weber"})-[:OWNS]->(c:Computer {hostname: "LAPTOP-TW001"})

Mehrere Beziehungen:

MATCH (p:Person {name: "Max Mustermann"})
MATCH (c1:Computer {hostname: "DESKTOP-IT001"})
MATCH (c2:Computer {hostname: "SERVER-001"})
CREATE 
  (p)-[:ACCESSED {timestamp: datetime(), access_type: "remote"}]->(c1),
  (p)-[:ADMINISTERED {timestamp: datetime(), privileges: "full"}]->(c2)

Eigenschaften aktualisieren (SET)

Einzelne Eigenschaft ändern:

MATCH (p:Person {name: "Max Mustermann"})
SET p.status = "under_investigation"

Mehrere Eigenschaften aktualisieren:

MATCH (c:Computer {hostname: "DESKTOP-IT001"})
SET c.status = "compromised",
    c.last_scan = datetime(),
    c.security_level = "high_risk"

Eigenschaft zu Beziehung hinzufügen:

MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
WHERE p.name = "Max Mustermann" AND c.hostname = "DESKTOP-IT001"
SET login.logout_time = datetime(),
    login.session_duration = duration("PT2H30M")

Knoten und Beziehungen zusammenführen (MERGE)

Knoten zusammenführen (erstellt nur wenn nicht vorhanden):

MERGE (p:Person {employee_id: "E001"})
ON CREATE SET p.name = "Max Mustermann", p.created = datetime()
ON MATCH SET p.last_seen = datetime()

Beziehung zusammenführen:

MATCH (p:Person {name: "Max Mustermann"})
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
MERGE (p)-[login:LOGGED_INTO {date: date()}]->(c)
ON CREATE SET login.first_login = datetime()
ON MATCH SET login.last_login = datetime()

Löschen (DELETE)

Beziehung löschen:

MATCH (p:Person {name: "Max Mustermann"})-[login:LOGGED_INTO]->(c:Computer)
DELETE login

Knoten löschen (nur wenn keine Beziehungen):

MATCH (f:File {path: "C:\\temp\\test.txt"})
DELETE f

Knoten mit allen Beziehungen löschen:

MATCH (p:Person {name: "Test User"})
DETACH DELETE p

Mehrere Knoten nach Kriterium:

MATCH (f:File)
WHERE f.size < 1024 AND f.temporary = true
DETACH DELETE f

Eigenschaften entfernen (REMOVE)

Einzelne Eigenschaft entfernen:

MATCH (p:Person {name: "Max Mustermann"})
REMOVE p.temp_status

Label entfernen:

MATCH (n:TempNode)
REMOVE n:TempNode

Von einfachen zu komplexen Abfragen

Level 1: Grundlegende MATCH-Abfragen

Alle Computer in der Untersuchung finden:

MATCH (c:Computer)
RETURN c.hostname, c.ip, c.os

Bestimmte Datei suchen:

MATCH (f:File {name: "suspicious.exe"})
RETURN f.path, f.hash, f.size

Personen mit bestimmten Eigenschaften:

MATCH (p:Person)
WHERE p.department = "IT"
RETURN p.name, p.email

Level 2: Einfache Beziehungsabfragen

Wer hat sich an welchen Computern angemeldet?

MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
RETURN p.name, c.hostname, login.timestamp
ORDER BY login.timestamp DESC

Welche Dateien wurden von einem bestimmten Prozess erstellt?

MATCH (proc:Process {name: "powershell.exe"})-[:CREATED]->(f:File)
RETURN proc.pid, f.path, f.created_time

Level 3: Mustersuche und Pfade

Verdächtige Dateierstellung-Ketten finden:

MATCH path = (p:Person)-[:LOGGED_INTO]->(c:Computer)<-[:RUNNING_ON]-(proc:Process)-[:CREATED]->(f:File)
WHERE f.path CONTAINS ".exe"
RETURN path

Netzwerkverbindungen von kompromittierten Systemen:

MATCH (c:Computer {status: "compromised"})-[:INITIATED]->(conn:NetworkConnection)
WHERE conn.dst_port IN [80, 443, 8080]
RETURN c.hostname, conn.dst_ip, conn.dst_port, conn.timestamp

Level 4: Aggregation und Statistiken

Anmeldestatistiken pro Benutzer:

MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
RETURN p.name, 
       count(login) as login_count,
       collect(DISTINCT c.hostname) as computers_accessed,
       min(login.timestamp) as first_login,
       max(login.timestamp) as last_login
ORDER BY login_count DESC

Häufigste Netzwerkziele identifizieren:

MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
RETURN conn.dst_ip, 
       count(*) as connection_count,
       collect(DISTINCT c.hostname) as source_computers
ORDER BY connection_count DESC
LIMIT 10

Level 5: Zeitbasierte Analysen

Aktivitäten in einem bestimmten Zeitfenster:

MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
WHERE datetime(r.timestamp) >= datetime("2024-01-15T08:00:00")
  AND datetime(r.timestamp) <= datetime("2024-01-15T18:00:00")
RETURN p.name, c.hostname, r.timestamp
ORDER BY r.timestamp

Zeitleiste einer kompromittierten Sitzung:

MATCH (c:Computer {hostname: "DESKTOP-ABC123"})
MATCH (c)<-[r]-(entity)
WHERE r.timestamp IS NOT NULL
RETURN type(entity) as entity_type,
       entity.name as entity_name,
       type(r) as relationship_type,
       r.timestamp
ORDER BY r.timestamp

Level 6: Erweiterte Pfadanalysen

Seitliche Bewegung erkennen:

MATCH path = (start:Computer)<-[:LOGGED_INTO]-(p:Person)-[:LOGGED_INTO]->(end:Computer)
WHERE start <> end
  AND datetime(path.relationships[0].timestamp) < datetime(path.relationships[1].timestamp)
RETURN p.name, 
       start.hostname as from_computer,
       end.hostname as to_computer,
       length(path) as hop_count

Dateiverteilung verfolgen:

MATCH path = (origin:File)-[:COPIED_TO|:MOVED_TO*1..5]->(destination:File)
WHERE origin.hash = destination.hash
RETURN path, length(path) as propagation_steps

Level 7: Komplexe Untersuchungsmuster

Insider-Bedrohung erkennen:

MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
MATCH (c)-[:ACCESSED]->(f:File)
WHERE f.classification = "confidential"
  AND NOT (p)-[:AUTHORIZED_FOR]->(f)
WITH p, count(f) as unauthorized_access_count
WHERE unauthorized_access_count > 5
RETURN p.name, p.department, unauthorized_access_count

Command & Control Kommunikation identifizieren:

MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
WHERE conn.dst_port IN [80, 443, 8080, 53]
WITH conn.dst_ip as suspicious_ip, count(*) as freq, collect(DISTINCT c.hostname) as computers
WHERE freq > 100 AND size(computers) > 1
MATCH (compromised:Computer)-[:CONNECTED_TO]->(cc_conn:NetworkConnection {dst_ip: suspicious_ip})
RETURN suspicious_ip, freq, computers, collect(cc_conn.timestamp) as connection_times

APOC - Erweiterte Funktionen für Forensiker

APOC (Awesome Procedures on Cypher) erweitert Neo4j um wichtige forensische Funktionen.

Zeitfunktionen

Zeitstempel-Konvertierung:

// Unix-Timestamp zu datetime
RETURN apoc.date.format(1642248600, 's', 'yyyy-MM-dd HH:mm:ss') as formatted_time

// Zeitbereich-Abfragen
MATCH (event:Event)
WHERE apoc.date.parse(event.timestamp, 's', 'yyyy-MM-dd HH:mm:ss') 
      BETWEEN apoc.date.parse('2024-01-15 08:00:00', 's', 'yyyy-MM-dd HH:mm:ss')
      AND apoc.date.parse('2024-01-15 18:00:00', 's', 'yyyy-MM-dd HH:mm:ss')
RETURN event

Datenimport für forensische Artefakte

CSV-Import von Windows Ereignisprotokollen:

CALL apoc.load.csv('file:///windows_events.csv') YIELD map
CREATE (e:Event {
    event_id: toInteger(map.EventID),
    timestamp: datetime(map.TimeGenerated),
    computer: map.ComputerName,
    user: map.User,
    description: map.Message
})

JSON-Import von Netzwerk-Protokollen:

CALL apoc.load.json('file:///network_traffic.json') YIELD value
UNWIND value.connections as conn
CREATE (nc:NetworkConnection {
    src_ip: conn.source_ip,
    dst_ip: conn.destination_ip,
    dst_port: conn.destination_port,
    protocol: conn.protocol,
    timestamp: datetime(conn.timestamp),
    bytes_transferred: conn.bytes
})

Graphalgorithmen für Untersuchungen

Kürzester Pfad zwischen verdächtigen Entitäten:

MATCH (start:Person {name: "Verdächtiger A"}), (end:File {classification: "streng_geheim"})
CALL apoc.algo.dijkstra(start, end, 'ACCESSED|SHARED|COPIED', 'weight') YIELD path, weight
RETURN path, weight

PageRank für wichtige Akteure identifizieren:

CALL apoc.algo.pageRank(['Person'], ['COMMUNICATED_WITH', 'SHARED_FILE']) 
YIELD node, score
RETURN node.name as person, score
ORDER BY score DESC
LIMIT 10

Forensische Datenmodellierung

Typische Entitätstypen

// Personen und Identitäten
CREATE (p:Person {name: "Max Mustermann", employee_id: "E001", department: "Finance"})
CREATE (u:UserAccount {username: "mmustermann", domain: "FIRMA", sid: "S-1-5-21-..."})

// Systeme und Hardware
CREATE (c:Computer {hostname: "WS001", ip: "192.168.1.100", os: "Windows 10"})
CREATE (m:Mobile {imei: "123456789012345", number: "+1234567890"})

// Dateien und Artefakte
CREATE (f:File {path: "C:\\temp\\malware.exe", hash_md5: "5d41402a...", size: 1024})
CREATE (r:RegistryKey {path: "HKLM\\Software\\Microsoft\\..."})

// Netzwerk und Kommunikation
CREATE (ip:IPAddress {address: "192.168.1.100", geolocation: "Internal"})
CREATE (e:Email {subject: "Re: Projekt X", timestamp: datetime()})

// Prozesse und Aktivitäten
CREATE (proc:Process {name: "cmd.exe", pid: 1234, command_line: "cmd.exe /c dir"})
CREATE (event:Event {event_id: 4624, source: "Security", timestamp: datetime()})

Beziehungstypen für forensische Analysen

// Benutzer-System-Interaktionen
CREATE (person)-[:OWNS]->(computer)
CREATE (user)-[:LOGGED_INTO {timestamp: datetime(), session_id: "12345"}]->(computer)

// Dateioperationen
CREATE (process)-[:CREATED {timestamp: datetime()}]->(file)
CREATE (process)-[:MODIFIED {timestamp: datetime(), action: "write"}]->(file)
CREATE (process)-[:DELETED {timestamp: datetime()}]->(file)

// Netzwerkaktivitäten
CREATE (computer)-[:CONNECTED_TO {timestamp: datetime(), protocol: "TCP"}]->(ip)
CREATE (process)-[:INITIATED_CONNECTION]->(network_connection)

// Kommunikation
CREATE (person)-[:SENT {timestamp: datetime()}]->(email)
CREATE (person)-[:RECEIVED {timestamp: datetime()}]->(email)

// Zeitliche Beziehungen
CREATE (event1)-[:FOLLOWED_BY {duration: duration("PT5M")}]->(event2)
CREATE (activity)-[:OCCURRED_DURING]->(time_period)

Zeitleisten-Analyse mit Neo4j

Zeitbasierte Ereignisrekonstruktion

Chronologische Ereignissequenz:

MATCH (e:Event)
WHERE e.computer = "DESKTOP-ABC123"
  AND e.timestamp >= datetime("2024-01-15T10:00:00")
  AND e.timestamp <= datetime("2024-01-15T12:00:00")
RETURN e.timestamp, e.event_id, e.description, e.user
ORDER BY e.timestamp

Überlappende Aktivitäten erkennen:

MATCH (e1:Event)-[:OCCURRED_ON]->(c:Computer)<-[:OCCURRED_ON]-(e2:Event)
WHERE e1.timestamp <= e2.timestamp <= e1.end_timestamp
  AND e1 <> e2
RETURN e1.description as activity1, 
       e2.description as activity2,
       c.hostname,
       e1.timestamp as start1,
       e2.timestamp as start2

Zeitliche Mustererkennung

Wiederkehrende verdächtige Aktivitäten:

MATCH (p:Process)-[:CREATED]->(f:File)
WHERE f.path CONTAINS "temp"
WITH p.name as process_name, 
     duration.between(min(f.created_time), max(f.created_time)) as time_span,
     count(f) as file_count
WHERE file_count > 10 AND time_span < duration("PT1H")
RETURN process_name, file_count, time_span

Visualisierung und Berichtserstellung

Grafische Darstellung im Neo4j Browser

Neo4j Browser bietet eingebaute Visualisierung für forensische Analysen. Hier lernen Sie, wie Sie Ihre Daten direkt grafisch darstellen.

Grundlegende Visualisierungsabfragen

Alle Beziehungen zwischen Personen und Computern anzeigen:

MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
RETURN p, r, c
LIMIT 50

Netzwerk um einen verdächtigen Benutzer:

MATCH (p:Person {name: "Max Mustermann"})-[r]-(connected)
RETURN p, r, connected

Dateierstellung-Ketten visualisieren:

MATCH path = (proc:Process)-[:CREATED]->(f:File)-[:COPIED_TO]->(f2:File)
RETURN path
LIMIT 25

Netzwerkverbindungen mit Zeitfilter:

MATCH (c:Computer)-[conn:CONNECTED_TO]->(target)
WHERE datetime(conn.timestamp) >= datetime("2024-01-15T10:00:00")
RETURN c, conn, target
LIMIT 100

Visualisierung anpassen

Knoten-Farben nach Typ festlegen: Im Neo4j Browser können Sie die Darstellung anpassen:

  1. Klicken Sie auf einen Knoten-Typ (z.B. :Person) in der Legende
  2. Wählen Sie Farbe und Größe
  3. Legen Sie die Beschriftung fest (z.B. name oder hostname)

Beziehungs-Labels anzeigen:

MATCH (p:Person)-[r:LOGGED_INTO {session_id: "12345"}]->(c:Computer)
RETURN p, r, c

Klicken Sie auf den Beziehungstyp in der Legende und aktivieren Sie “Caption” für Eigenschaften wie timestamp.

Graph-Layout optimieren:

  • Hierarchical Layout: Für Zeitleisten und Prozess-Bäume
  • Force Layout: Für allgemeine Netzwerk-Visualisierung
  • Grid Layout: Für strukturierte Darstellungen

Forensik-spezifische Visualisierungen

Kompromittierungsweg darstellen:

MATCH path = (attacker:Person)-[:LOGGED_INTO]->(entry:Computer)-[:CONNECTED_TO*1..3]-(target:Computer)
WHERE entry.security_level = "low" AND target.classification = "sensitive"
RETURN path

Zeitbasierte Aktivitätsketten:

MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
MATCH (c)<-[:RUNNING_ON]-(proc:Process)-[created:CREATED]->(f:File)
WHERE datetime(login.timestamp) <= datetime(created.timestamp) <= datetime(login.timestamp) + duration("PT1H")
RETURN p, login, c, proc, created, f

Anomalie-Cluster identifizieren:

MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
WITH p, count(c) as computer_count
WHERE computer_count > 5  // Benutzer mit Zugriff auf viele Computer
MATCH (p)-[r:LOGGED_INTO]->(c:Computer)
RETURN p, r, c

Visualisierungsfilter für große Datensätze

Aktivitäten nach Zeitfenster begrenzen:

MATCH (entity)-[r]-(connected)
WHERE r.timestamp >= datetime("2024-01-15T08:00:00") 
  AND r.timestamp <= datetime("2024-01-15T18:00:00")
RETURN entity, r, connected
LIMIT 200

Nach Wichtigkeit filtern:

MATCH (p:Person)-[r]->(entity)
WHERE p.risk_level = "high" OR entity.classification = "confidential"
RETURN p, r, entity
LIMIT 100

Schrittweise Expansion:

// Schritt 1: Zentrale Person finden
MATCH (p:Person {name: "Verdächtiger"})
RETURN p

// Schritt 2: Direkte Verbindungen erweitern
MATCH (p:Person {name: "Verdächtiger"})-[r]-(level1)
RETURN p, r, level1

// Schritt 3: Zweite Ebene hinzufügen
MATCH (p:Person {name: "Verdächtiger"})-[r1]-(level1)-[r2]-(level2)
WHERE type(r2) IN ['SHARED_FILE', 'COMMUNICATED_WITH']
RETURN p, r1, level1, r2, level2
LIMIT 50

Interaktive Untersuchung

Doppelklick-Expansion verwenden:

  1. Starten Sie mit einer einfachen Abfrage
  2. Doppelklicken Sie auf interessante Knoten
  3. Neo4j erweitert automatisch die Verbindungen
  4. Verwenden Sie Rechtsklick → “Dismiss” um Knoten auszublenden

Graph-Export aus dem Browser:

  1. Klicken Sie auf das Kamera-Symbol für Screenshots
  2. Oder verwenden Sie den Export-Button für SVG/PNG
  3. Für Berichte: Speichern als PNG mit Titel und Zeitstempel

Ermittlungsberichte erstellen

Zusammenfassung verdächtiger Aktivitäten:

MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
WHERE datetime(r.timestamp) >= datetime("2024-01-15T00:00:00")
WITH p, c, count(r) as login_count, collect(r.timestamp) as login_times
WHERE login_count > 20  // Verdächtig viele Anmeldungen
RETURN p.name as suspect,
       p.department,
       c.hostname as accessed_computer,
       login_count,
       login_times[0] as first_login,
       login_times[-1] as last_login
ORDER BY login_count DESC

Netzwerk-Anomalien-Bericht:

MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
WHERE conn.dst_port NOT IN [80, 443, 53, 22, 23, 25, 110, 143, 993, 995]
WITH conn.dst_ip as suspicious_ip, 
     conn.dst_port as suspicious_port,
     count(*) as frequency,
     collect(DISTINCT c.hostname) as source_computers
WHERE frequency > 5
RETURN suspicious_ip, 
       suspicious_port, 
       frequency, 
       source_computers,
       size(source_computers) as affected_computers_count
ORDER BY frequency DESC

Export für externe Werkzeuge

Gephi-Format für Visualisierung:

// Knoten für Gephi
MATCH (n)
RETURN id(n) as Id, 
       labels(n)[0] as Label, 
       n.name as Name,
       CASE labels(n)[0]
         WHEN 'Person' THEN 'red'
         WHEN 'Computer' THEN 'blue'
         WHEN 'File' THEN 'green'
         ELSE 'gray'
       END as Color

UNION

// Kanten für Gephi
MATCH (a)-[r]->(b)
RETURN id(a) as Source, 
       id(b) as Target, 
       type(r) as Type,
       r.timestamp as Timestamp

Leistungsoptimierung für große Datensätze

Indizierung strategisch einsetzen

// Zeitstempel-Indizes für Zeitleisten-Analysen
CREATE INDEX FOR (e:Event) ON (e.timestamp)
CREATE INDEX FOR (l:Login) ON (l.timestamp)

// Eindeutige Bezeichner
CREATE CONSTRAINT FOR (f:File) REQUIRE f.hash IS UNIQUE
CREATE CONSTRAINT FOR (p:Person) REQUIRE p.employee_id IS UNIQUE

// Zusammengesetzte Indizes für häufige Abfragen
CREATE INDEX FOR (c:Computer) ON (c.hostname, c.ip)

Abfrageoptimierung

PROFILE verwenden für Leistungsanalyse:

PROFILE
MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
WHERE p.department = "IT"
RETURN p.name, c.hostname

Begrenzung für große Ergebnismengen:

MATCH (f:File)
WHERE f.size > 100000000  // Dateien größer als 100MB
RETURN f.path, f.size
ORDER BY f.size DESC
LIMIT 100

Häufige Probleme und Lösungen

Speicheroptimierung

# Neo4j-Konfiguration für große Datensätze
# Speicher-Einstellungen in neo4j.conf
dbms.memory.heap.initial_size=2G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G

Datenimport-Leistung

// Stapelimport für große CSV-Dateien
:auto USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM 'file:///large_dataset.csv' AS row
CREATE (e:Event {
    timestamp: datetime(row.timestamp),
    event_id: toInteger(row.event_id),
    computer: row.computer,
    description: row.description
})

Fortgeschrittene Techniken

Maschinelles Lernen Integration

// Anomalieerkennung durch statistische Analyse
MATCH (p:Person)-[l:LOGGED_INTO]->(c:Computer)
WITH p, count(l) as login_frequency
WITH avg(login_frequency) as avg_logins, stdev(login_frequency) as std_logins
MATCH (p:Person)-[l:LOGGED_INTO]->(c:Computer)
WITH p, count(l) as user_logins, avg_logins, std_logins
WHERE user_logins > (avg_logins + 2 * std_logins)
RETURN p.name as potential_anomaly, user_logins, avg_logins

Korrelationsanalyse

// Zeitliche Korrelation zwischen Ereignissen
MATCH (e1:Event), (e2:Event)
WHERE e1.computer = e2.computer
  AND abs(duration.between(e1.timestamp, e2.timestamp).seconds) < 300  // 5 Minuten
  AND e1 <> e2
RETURN e1.event_id, e2.event_id, e1.timestamp, e2.timestamp,
       duration.between(e1.timestamp, e2.timestamp) as time_diff

Installation und Einrichtung

Docker-Installation (empfohlen)

# Neo4j mit APOC-Plugin für Forensik-Umgebungen
docker run \
    --name neo4j-forensics \
    -p7474:7474 -p7687:7687 \
    -d \
    -v $HOME/neo4j/data:/data \
    -v $HOME/neo4j/logs:/logs \
    -v $HOME/neo4j/import:/var/lib/neo4j/import \
    --env NEO4J_AUTH=neo4j/forensics123 \
    --env NEO4J_PLUGINS='["apoc"]' \
    neo4j:latest

Native Installation

# Ubuntu/Debian
wget -O - https://debian.neo4j.com/neotechnology.gpg.key | sudo apt-key add -
echo 'deb https://debian.neo4j.com stable 4.4' | sudo tee /etc/apt/sources.list.d/neo4j.list
sudo apt update
sudo apt install neo4j

# APOC Plugin herunterladen
wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/4.4.0.18/apoc-4.4.0.18-all.jar
sudo mv apoc-4.4.0.18-all.jar /var/lib/neo4j/plugins/

# Neo4j starten
sudo systemctl enable neo4j
sudo systemctl start neo4j

Erste Konfiguration

# Standardpasswort ändern
cypher-shell -u neo4j -p neo4j
ALTER USER neo4j SET PASSWORD 'IhrSicheresPasswort';

# Speicher für forensische Arbeitslasten konfigurieren
sudo nano /etc/neo4j/neo4j.conf

Wichtige Konfigurationseinstellungen:

# Speicherzuweisung
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G

# APOC-Konfiguration
dbms.security.procedures.unrestricted=apoc.*
dbms.security.procedures.allowlist=apoc.*

# Import-Verzeichnis
dbms.directories.import=/var/lib/neo4j/import

Zugriff:

  • Web-Schnittstelle: http://localhost:7474
  • Bolt-Protokoll: bolt://localhost:7687

Zusammenfassung und bewährte Praktiken

Empfohlener Arbeitsablauf

  1. Datenmodell entwerfen - Entitäten und Beziehungen für Ihren Fall definieren
  2. Daten importieren - Strukturiert mit APOC oder LOAD CSV
  3. Indizes erstellen - Leistung für häufige Abfragen optimieren
  4. Explorative Analyse - Von einfachen zu komplexen Abfragen voranschreiten
  5. Muster identifizieren - Anomalien und verdächtige Strukturen erkennen
  6. Zeitleiste rekonstruieren - Chronologische Ereignissequenzen erstellen
  7. Berichte generieren - Ergebnisse strukturiert dokumentieren
  8. Visualisieren - Komplexe Beziehungen grafisch darstellen

Wichtige Erkenntnisse

  • Graphdatenbanken eignen sich hervorragend für forensische Analysen aufgrund ihrer natürlichen Beziehungsmodellierung
  • Cypher ermöglicht intuitive Abfragen die komplexe ermittlerische Fragen beantworten
  • APOC erweitert die Funktionalität um zeitbasierte Analysen und Datenimport
  • Leistungsoptimierung ist kritisch bei großen forensischen Datensätzen
  • Strukturierte Herangehensweise führt zu effektiven Untersuchungsergebnissen

Neo4j bietet forensischen Analysten ein mächtiges Werkzeug zur Untersuchung komplexer digitaler Spuren und zur Aufdeckung versteckter Zusammenhänge in ihren Fällen.

Neo4j für Forensische Analysen - Von Grundlagen bis zur Berichtserstellung

Neo4j ist eine leistungsstarke Graphdatenbank, die sich hervorragend für forensische Analysen eignet, bei denen Beziehungen zwischen Entitäten (Personen, Dateien, Prozesse, Netzwerkverbindungen) im Mittelpunkt stehen. Dieser umfassende Leitfaden führt Sie von den Grundlagen bis zu komplexen forensischen Analyseszenarien.

Warum Neo4j für Forensik?

Graphdatenbanken sind ideal für forensische Untersuchungen, da sie:

  • Beziehungen natürlich modellieren - Verbindungen zwischen Verdächtigen, Geräten und Aktivitäten
  • Mustererkennung ermöglichen - Anomalien und verdächtige Strukturen in komplexen Datenstrukturen
  • Zeitbasierte Analysen unterstützen - Zeitleisten-Rekonstruktion und temporale Beziehungen
  • Skalierbare Abfragen bieten - Millionen von Verbindungen effizient durchsuchen
  • Visualisierung vereinfachen - Komplexe Zusammenhänge grafisch darstellen

Grundlegende Konzepte für Forensiker

Knoten (Nodes)

Repräsentieren Entitäten in Ihrer Untersuchung:

// Verschiedene forensische Entitäten
(:Person {name: "John Doe", id: "emp001"})
(:Computer {hostname: "DESKTOP-ABC123", ip: "192.168.1.100"})
(:File {path: "C:\\Users\\john\\suspicious.exe", hash: "a1b2c3..."})
(:Process {name: "cmd.exe", pid: 1234})
(:NetworkConnection {src_ip: "192.168.1.100", dst_ip: "185.199.108.153"})

Beziehungen (Relationships)

Verbindungen zwischen Entitäten mit Eigenschaften:

// Forensische Beziehungen
(:Person)-[:LOGGED_INTO {timestamp: "2024-01-15T10:30:00"}]->(:Computer)
(:Process)-[:CREATED {timestamp: "2024-01-15T10:31:15"}]->(:File)
(:Computer)-[:CONNECTED_TO {port: 443, protocol: "HTTPS"}]->(:NetworkConnection)

Grundoperationen - Knoten und Beziehungen verwalten

Knoten erstellen (CREATE)

Einzelne Knoten erstellen:

// Person erstellen
CREATE (p:Person {
    name: "Max Mustermann", 
    employee_id: "E001", 
    department: "IT",
    email: "max.mustermann@firma.de"
})

// Computer erstellen
CREATE (c:Computer {
    hostname: "DESKTOP-IT001",
    ip: "192.168.1.50",
    os: "Windows 10",
    mac_address: "00:11:22:33:44:55"
})

// Datei erstellen
CREATE (f:File {
    path: "C:\\temp\\verdaechtig.exe",
    hash_md5: "5d41402abc4b2a76b9719d911017c592",
    size: 2048,
    created_time: datetime("2024-01-15T14:30:00")
})

Mehrere Knoten gleichzeitig:

CREATE 
  (p:Person {name: "Anna Schmidt", id: "E002"}),
  (c:Computer {hostname: "LAPTOP-HR001", ip: "192.168.1.75"}),
  (f:File {path: "C:\\Users\\anna\\dokument.pdf", size: 1024})

Beziehungen erstellen

Einfache Beziehung:

MATCH (p:Person {name: "Max Mustermann"})
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
CREATE (p)-[:LOGGED_INTO {timestamp: datetime(), session_id: "12345"}]->(c)

Knoten und Beziehung in einem Schritt:

CREATE (p:Person {name: "Thomas Weber"})-[:OWNS]->(c:Computer {hostname: "LAPTOP-TW001"})

Mehrere Beziehungen:

MATCH (p:Person {name: "Max Mustermann"})
MATCH (c1:Computer {hostname: "DESKTOP-IT001"})
MATCH (c2:Computer {hostname: "SERVER-001"})
CREATE 
  (p)-[:ACCESSED {timestamp: datetime(), access_type: "remote"}]->(c1),
  (p)-[:ADMINISTERED {timestamp: datetime(), privileges: "full"}]->(c2)

Eigenschaften aktualisieren (SET)

Einzelne Eigenschaft ändern:

MATCH (p:Person {name: "Max Mustermann"})
SET p.status = "under_investigation"

Mehrere Eigenschaften aktualisieren:

MATCH (c:Computer {hostname: "DESKTOP-IT001"})
SET c.status = "compromised",
    c.last_scan = datetime(),
    c.security_level = "high_risk"

Eigenschaft zu Beziehung hinzufügen:

MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
WHERE p.name = "Max Mustermann" AND c.hostname = "DESKTOP-IT001"
SET login.logout_time = datetime(),
    login.session_duration = duration("PT2H30M")

Knoten und Beziehungen zusammenführen (MERGE)

Knoten zusammenführen (erstellt nur wenn nicht vorhanden):

MERGE (p:Person {employee_id: "E001"})
ON CREATE SET p.name = "Max Mustermann", p.created = datetime()
ON MATCH SET p.last_seen = datetime()

Beziehung zusammenführen:

MATCH (p:Person {name: "Max Mustermann"})
MATCH (c:Computer {hostname: "DESKTOP-IT001"})
MERGE (p)-[login:LOGGED_INTO {date: date()}]->(c)
ON CREATE SET login.first_login = datetime()
ON MATCH SET login.last_login = datetime()

Löschen (DELETE)

Beziehung löschen:

MATCH (p:Person {name: "Max Mustermann"})-[login:LOGGED_INTO]->(c:Computer)
DELETE login

Knoten löschen (nur wenn keine Beziehungen):

MATCH (f:File {path: "C:\\temp\\test.txt"})
DELETE f

Knoten mit allen Beziehungen löschen:

MATCH (p:Person {name: "Test User"})
DETACH DELETE p

Mehrere Knoten nach Kriterium:

MATCH (f:File)
WHERE f.size < 1024 AND f.temporary = true
DETACH DELETE f

Eigenschaften entfernen (REMOVE)

Einzelne Eigenschaft entfernen:

MATCH (p:Person {name: "Max Mustermann"})
REMOVE p.temp_status

Label entfernen:

MATCH (n:TempNode)
REMOVE n:TempNode

Von einfachen zu komplexen Abfragen

Level 1: Grundlegende MATCH-Abfragen

Alle Computer in der Untersuchung finden:

MATCH (c:Computer)
RETURN c.hostname, c.ip, c.os

Bestimmte Datei suchen:

MATCH (f:File {name: "suspicious.exe"})
RETURN f.path, f.hash, f.size

Personen mit bestimmten Eigenschaften:

MATCH (p:Person)
WHERE p.department = "IT"
RETURN p.name, p.email

Level 2: Einfache Beziehungsabfragen

Wer hat sich an welchen Computern angemeldet?

MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
RETURN p.name, c.hostname, login.timestamp
ORDER BY login.timestamp DESC

Welche Dateien wurden von einem bestimmten Prozess erstellt?

MATCH (proc:Process {name: "powershell.exe"})-[:CREATED]->(f:File)
RETURN proc.pid, f.path, f.created_time

Level 3: Mustersuche und Pfade

Verdächtige Dateierstellung-Ketten finden:

MATCH path = (p:Person)-[:LOGGED_INTO]->(c:Computer)<-[:RUNNING_ON]-(proc:Process)-[:CREATED]->(f:File)
WHERE f.path CONTAINS ".exe"
RETURN path

Netzwerkverbindungen von kompromittierten Systemen:

MATCH (c:Computer {status: "compromised"})-[:INITIATED]->(conn:NetworkConnection)
WHERE conn.dst_port IN [80, 443, 8080]
RETURN c.hostname, conn.dst_ip, conn.dst_port, conn.timestamp

Level 4: Aggregation und Statistiken

Anmeldestatistiken pro Benutzer:

MATCH (p:Person)-[login:LOGGED_INTO]->(c:Computer)
RETURN p.name, 
       count(login) as login_count,
       collect(DISTINCT c.hostname) as computers_accessed,
       min(login.timestamp) as first_login,
       max(login.timestamp) as last_login
ORDER BY login_count DESC

Häufigste Netzwerkziele identifizieren:

MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
RETURN conn.dst_ip, 
       count(*) as connection_count,
       collect(DISTINCT c.hostname) as source_computers
ORDER BY connection_count DESC
LIMIT 10

Level 5: Zeitbasierte Analysen

Aktivitäten in einem bestimmten Zeitfenster:

MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
WHERE datetime(r.timestamp) >= datetime("2024-01-15T08:00:00")
  AND datetime(r.timestamp) <= datetime("2024-01-15T18:00:00")
RETURN p.name, c.hostname, r.timestamp
ORDER BY r.timestamp

Zeitleiste einer kompromittierten Sitzung:

MATCH (c:Computer {hostname: "DESKTOP-ABC123"})
MATCH (c)<-[r]-(entity)
WHERE r.timestamp IS NOT NULL
RETURN type(entity) as entity_type,
       entity.name as entity_name,
       type(r) as relationship_type,
       r.timestamp
ORDER BY r.timestamp

Level 6: Erweiterte Pfadanalysen

Seitliche Bewegung erkennen:

MATCH path = (start:Computer)<-[:LOGGED_INTO]-(p:Person)-[:LOGGED_INTO]->(end:Computer)
WHERE start <> end
  AND datetime(path.relationships[0].timestamp) < datetime(path.relationships[1].timestamp)
RETURN p.name, 
       start.hostname as from_computer,
       end.hostname as to_computer,
       length(path) as hop_count

Dateiverteilung verfolgen:

MATCH path = (origin:File)-[:COPIED_TO|:MOVED_TO*1..5]->(destination:File)
WHERE origin.hash = destination.hash
RETURN path, length(path) as propagation_steps

Level 7: Komplexe Untersuchungsmuster

Insider-Bedrohung erkennen:

MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
MATCH (c)-[:ACCESSED]->(f:File)
WHERE f.classification = "confidential"
  AND NOT (p)-[:AUTHORIZED_FOR]->(f)
WITH p, count(f) as unauthorized_access_count
WHERE unauthorized_access_count > 5
RETURN p.name, p.department, unauthorized_access_count

Command & Control Kommunikation identifizieren:

MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
WHERE conn.dst_port IN [80, 443, 8080, 53]
WITH conn.dst_ip as suspicious_ip, count(*) as freq, collect(DISTINCT c.hostname) as computers
WHERE freq > 100 AND size(computers) > 1
MATCH (compromised:Computer)-[:CONNECTED_TO]->(cc_conn:NetworkConnection {dst_ip: suspicious_ip})
RETURN suspicious_ip, freq, computers, collect(cc_conn.timestamp) as connection_times

APOC - Erweiterte Funktionen für Forensiker

APOC (Awesome Procedures on Cypher) erweitert Neo4j um wichtige forensische Funktionen.

Zeitfunktionen

Zeitstempel-Konvertierung:

// Unix-Timestamp zu datetime
RETURN apoc.date.format(1642248600, 's', 'yyyy-MM-dd HH:mm:ss') as formatted_time

// Zeitbereich-Abfragen
MATCH (event:Event)
WHERE apoc.date.parse(event.timestamp, 's', 'yyyy-MM-dd HH:mm:ss') 
      BETWEEN apoc.date.parse('2024-01-15 08:00:00', 's', 'yyyy-MM-dd HH:mm:ss')
      AND apoc.date.parse('2024-01-15 18:00:00', 's', 'yyyy-MM-dd HH:mm:ss')
RETURN event

Datenimport für forensische Artefakte

CSV-Import von Windows Ereignisprotokollen:

CALL apoc.load.csv('file:///windows_events.csv') YIELD map
CREATE (e:Event {
    event_id: toInteger(map.EventID),
    timestamp: datetime(map.TimeGenerated),
    computer: map.ComputerName,
    user: map.User,
    description: map.Message
})

JSON-Import von Netzwerk-Protokollen:

CALL apoc.load.json('file:///network_traffic.json') YIELD value
UNWIND value.connections as conn
CREATE (nc:NetworkConnection {
    src_ip: conn.source_ip,
    dst_ip: conn.destination_ip,
    dst_port: conn.destination_port,
    protocol: conn.protocol,
    timestamp: datetime(conn.timestamp),
    bytes_transferred: conn.bytes
})

Graphalgorithmen für Untersuchungen

Kürzester Pfad zwischen verdächtigen Entitäten:

MATCH (start:Person {name: "Verdächtiger A"}), (end:File {classification: "streng_geheim"})
CALL apoc.algo.dijkstra(start, end, 'ACCESSED|SHARED|COPIED', 'weight') YIELD path, weight
RETURN path, weight

PageRank für wichtige Akteure identifizieren:

CALL apoc.algo.pageRank(['Person'], ['COMMUNICATED_WITH', 'SHARED_FILE']) 
YIELD node, score
RETURN node.name as person, score
ORDER BY score DESC
LIMIT 10

Forensische Datenmodellierung

Typische Entitätstypen

// Personen und Identitäten
CREATE (p:Person {name: "Max Mustermann", employee_id: "E001", department: "Finance"})
CREATE (u:UserAccount {username: "mmustermann", domain: "FIRMA", sid: "S-1-5-21-..."})

// Systeme und Hardware
CREATE (c:Computer {hostname: "WS001", ip: "192.168.1.100", os: "Windows 10"})
CREATE (m:Mobile {imei: "123456789012345", number: "+1234567890"})

// Dateien und Artefakte
CREATE (f:File {path: "C:\\temp\\malware.exe", hash_md5: "5d41402a...", size: 1024})
CREATE (r:RegistryKey {path: "HKLM\\Software\\Microsoft\\..."})

// Netzwerk und Kommunikation
CREATE (ip:IPAddress {address: "192.168.1.100", geolocation: "Internal"})
CREATE (e:Email {subject: "Re: Projekt X", timestamp: datetime()})

// Prozesse und Aktivitäten
CREATE (proc:Process {name: "cmd.exe", pid: 1234, command_line: "cmd.exe /c dir"})
CREATE (event:Event {event_id: 4624, source: "Security", timestamp: datetime()})

Beziehungstypen für forensische Analysen

// Benutzer-System-Interaktionen
CREATE (person)-[:OWNS]->(computer)
CREATE (user)-[:LOGGED_INTO {timestamp: datetime(), session_id: "12345"}]->(computer)

// Dateioperationen
CREATE (process)-[:CREATED {timestamp: datetime()}]->(file)
CREATE (process)-[:MODIFIED {timestamp: datetime(), action: "write"}]->(file)
CREATE (process)-[:DELETED {timestamp: datetime()}]->(file)

// Netzwerkaktivitäten
CREATE (computer)-[:CONNECTED_TO {timestamp: datetime(), protocol: "TCP"}]->(ip)
CREATE (process)-[:INITIATED_CONNECTION]->(network_connection)

// Kommunikation
CREATE (person)-[:SENT {timestamp: datetime()}]->(email)
CREATE (person)-[:RECEIVED {timestamp: datetime()}]->(email)

// Zeitliche Beziehungen
CREATE (event1)-[:FOLLOWED_BY {duration: duration("PT5M")}]->(event2)
CREATE (activity)-[:OCCURRED_DURING]->(time_period)

Zeitleisten-Analyse mit Neo4j

Zeitbasierte Ereignisrekonstruktion

Chronologische Ereignissequenz:

MATCH (e:Event)
WHERE e.computer = "DESKTOP-ABC123"
  AND e.timestamp >= datetime("2024-01-15T10:00:00")
  AND e.timestamp <= datetime("2024-01-15T12:00:00")
RETURN e.timestamp, e.event_id, e.description, e.user
ORDER BY e.timestamp

Überlappende Aktivitäten erkennen:

MATCH (e1:Event)-[:OCCURRED_ON]->(c:Computer)<-[:OCCURRED_ON]-(e2:Event)
WHERE e1.timestamp <= e2.timestamp <= e1.end_timestamp
  AND e1 <> e2
RETURN e1.description as activity1, 
       e2.description as activity2,
       c.hostname,
       e1.timestamp as start1,
       e2.timestamp as start2

Zeitliche Mustererkennung

Wiederkehrende verdächtige Aktivitäten:

MATCH (p:Process)-[:CREATED]->(f:File)
WHERE f.path CONTAINS "temp"
WITH p.name as process_name, 
     duration.between(min(f.created_time), max(f.created_time)) as time_span,
     count(f) as file_count
WHERE file_count > 10 AND time_span < duration("PT1H")
RETURN process_name, file_count, time_span

Berichtserstellung und Visualisierung

Ermittlungsberichte erstellen

Zusammenfassung verdächtiger Aktivitäten:

MATCH (p:Person)-[r:LOGGED_INTO]->(c:Computer)
WHERE datetime(r.timestamp) >= datetime("2024-01-15T00:00:00")
WITH p, c, count(r) as login_count, collect(r.timestamp) as login_times
WHERE login_count > 20  // Verdächtig viele Anmeldungen
RETURN p.name as suspect,
       p.department,
       c.hostname as accessed_computer,
       login_count,
       login_times[0] as first_login,
       login_times[-1] as last_login
ORDER BY login_count DESC

Netzwerk-Anomalien-Bericht:

MATCH (c:Computer)-[:CONNECTED_TO]->(conn:NetworkConnection)
WHERE conn.dst_port NOT IN [80, 443, 53, 22, 23, 25, 110, 143, 993, 995]
WITH conn.dst_ip as suspicious_ip, 
     conn.dst_port as suspicious_port,
     count(*) as frequency,
     collect(DISTINCT c.hostname) as source_computers
WHERE frequency > 5
RETURN suspicious_ip, 
       suspicious_port, 
       frequency, 
       source_computers,
       size(source_computers) as affected_computers_count
ORDER BY frequency DESC

Export für externe Werkzeuge

Gephi-Format für Visualisierung:

// Knoten für Gephi
MATCH (n)
RETURN id(n) as Id, 
       labels(n)[0] as Label, 
       n.name as Name,
       CASE labels(n)[0]
         WHEN 'Person' THEN 'red'
         WHEN 'Computer' THEN 'blue'
         WHEN 'File' THEN 'green'
         ELSE 'gray'
       END as Color

UNION

// Kanten für Gephi
MATCH (a)-[r]->(b)
RETURN id(a) as Source, 
       id(b) as Target, 
       type(r) as Type,
       r.timestamp as Timestamp

Leistungsoptimierung für große Datensätze

Indizierung strategisch einsetzen

// Zeitstempel-Indizes für Zeitleisten-Analysen
CREATE INDEX FOR (e:Event) ON (e.timestamp)
CREATE INDEX FOR (l:Login) ON (l.timestamp)

// Eindeutige Bezeichner
CREATE CONSTRAINT FOR (f:File) REQUIRE f.hash IS UNIQUE
CREATE CONSTRAINT FOR (p:Person) REQUIRE p.employee_id IS UNIQUE

// Zusammengesetzte Indizes für häufige Abfragen
CREATE INDEX FOR (c:Computer) ON (c.hostname, c.ip)

Abfrageoptimierung

PROFILE verwenden für Leistungsanalyse:

PROFILE
MATCH (p:Person)-[:LOGGED_INTO]->(c:Computer)
WHERE p.department = "IT"
RETURN p.name, c.hostname

Begrenzung für große Ergebnismengen:

MATCH (f:File)
WHERE f.size > 100000000  // Dateien größer als 100MB
RETURN f.path, f.size
ORDER BY f.size DESC
LIMIT 100

Häufige Probleme und Lösungen

Speicheroptimierung

# Neo4j-Konfiguration für große Datensätze
# Speicher-Einstellungen in neo4j.conf
dbms.memory.heap.initial_size=2G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G

Datenimport-Leistung

// Stapelimport für große CSV-Dateien
:auto USING PERIODIC COMMIT 1000
LOAD CSV WITH HEADERS FROM 'file:///large_dataset.csv' AS row
CREATE (e:Event {
    timestamp: datetime(row.timestamp),
    event_id: toInteger(row.event_id),
    computer: row.computer,
    description: row.description
})

Fortgeschrittene Techniken

Maschinelles Lernen Integration

// Anomalieerkennung durch statistische Analyse
MATCH (p:Person)-[l:LOGGED_INTO]->(c:Computer)
WITH p, count(l) as login_frequency
WITH avg(login_frequency) as avg_logins, stdev(login_frequency) as std_logins
MATCH (p:Person)-[l:LOGGED_INTO]->(c:Computer)
WITH p, count(l) as user_logins, avg_logins, std_logins
WHERE user_logins > (avg_logins + 2 * std_logins)
RETURN p.name as potential_anomaly, user_logins, avg_logins

Korrelationsanalyse

// Zeitliche Korrelation zwischen Ereignissen
MATCH (e1:Event), (e2:Event)
WHERE e1.computer = e2.computer
  AND abs(duration.between(e1.timestamp, e2.timestamp).seconds) < 300  // 5 Minuten
  AND e1 <> e2
RETURN e1.event_id, e2.event_id, e1.timestamp, e2.timestamp,
       duration.between(e1.timestamp, e2.timestamp) as time_diff

Installation und Einrichtung

Docker-Installation (empfohlen)

# Neo4j mit APOC-Plugin für Forensik-Umgebungen
docker run \
    --name neo4j-forensics \
    -p7474:7474 -p7687:7687 \
    -d \
    -v $HOME/neo4j/data:/data \
    -v $HOME/neo4j/logs:/logs \
    -v $HOME/neo4j/import:/var/lib/neo4j/import \
    --env NEO4J_AUTH=neo4j/forensics123 \
    --env NEO4J_PLUGINS='["apoc"]' \
    neo4j:latest

Native Installation

# Ubuntu/Debian
wget -O - https://debian.neo4j.com/neotechnology.gpg.key | sudo apt-key add -
echo 'deb https://debian.neo4j.com stable 4.4' | sudo tee /etc/apt/sources.list.d/neo4j.list
sudo apt update
sudo apt install neo4j

# APOC Plugin herunterladen
wget https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/download/4.4.0.18/apoc-4.4.0.18-all.jar
sudo mv apoc-4.4.0.18-all.jar /var/lib/neo4j/plugins/

# Neo4j starten
sudo systemctl enable neo4j
sudo systemctl start neo4j

Erste Konfiguration

# Standardpasswort ändern
cypher-shell -u neo4j -p neo4j
ALTER USER neo4j SET PASSWORD 'IhrSicheresPasswort';

# Speicher für forensische Arbeitslasten konfigurieren
sudo nano /etc/neo4j/neo4j.conf

Wichtige Konfigurationseinstellungen:

# Speicherzuweisung
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=8G
dbms.memory.pagecache.size=4G

# APOC-Konfiguration
dbms.security.procedures.unrestricted=apoc.*
dbms.security.procedures.allowlist=apoc.*

# Import-Verzeichnis
dbms.directories.import=/var/lib/neo4j/import

Zugriff:

  • Web-Schnittstelle: http://localhost:7474
  • Bolt-Protokoll: bolt://localhost:7687

Zusammenfassung und bewährte Praktiken

Empfohlener Arbeitsablauf

  1. Datenmodell entwerfen - Entitäten und Beziehungen für Ihren Fall definieren
  2. Daten importieren - Strukturiert mit APOC oder LOAD CSV
  3. Indizes erstellen - Leistung für häufige Abfragen optimieren
  4. Explorative Analyse - Von einfachen zu komplexen Abfragen voranschreiten
  5. Muster identifizieren - Anomalien und verdächtige Strukturen erkennen
  6. Zeitleiste rekonstruieren - Chronologische Ereignissequenzen erstellen
  7. Berichte generieren - Ergebnisse strukturiert dokumentieren
  8. Visualisieren - Komplexe Beziehungen grafisch darstellen

Wichtige Erkenntnisse

  • Graphdatenbanken eignen sich hervorragend für forensische Analysen aufgrund ihrer natürlichen Beziehungsmodellierung
  • Cypher ermöglicht intuitive Abfragen die komplexe ermittlerische Fragen beantworten
  • APOC erweitert die Funktionalität um zeitbasierte Analysen und Datenimport
  • Leistungsoptimierung ist kritisch bei großen forensischen Datensätzen
  • Strukturierte Herangehensweise führt zu effektiven Untersuchungsergebnissen

Neo4j bietet forensischen Analysten ein mächtiges Werkzeug zur Untersuchung komplexer digitaler Spuren und zur Aufdeckung versteckter Zusammenhänge in ihren Fällen.