Fix #49: Flyway-Integration mit V1-Basisskript und 3-Fall-Strategie

Ersetzt die manuelle evolveTableColumns()-Schema-Evolution durch Flyway 10.20.1.
Die Initialisierung unterscheidet drei Faelle: leere DB (Flyway-Migration),
Bestandsschema ohne Flyway-History (Baseline nach Schema-Pruefung) und
Folgestart mit Flyway-History (idempotent). Smoke-Test-Deadlock auf Windows
durch paralleles Ausgabe-Draining des Subprozesses behoben.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-30 11:44:28 +02:00
parent 500a8c5340
commit 732d00c4ad
9 changed files with 1145 additions and 740 deletions
+4
View File
@@ -31,6 +31,10 @@
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
@@ -1,337 +1,577 @@
package de.gecheckt.pdf.umbenenner.adapter.out.sqlite;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import javax.sql.DataSource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.flywaydb.core.Flyway;
import org.sqlite.SQLiteConfig;
import org.sqlite.SQLiteDataSource;
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentPersistenceException;
import de.gecheckt.pdf.umbenenner.application.port.out.PersistenceSchemaInitializationPort;
/**
* SQLite implementation of {@link PersistenceSchemaInitializationPort}.
* <p>
* Creates or verifies the two-level persistence schema in the configured SQLite
* database file, and performs a controlled schema evolution from an earlier schema
* version to the current one.
* Flyway-basierte Implementierung von {@link PersistenceSchemaInitializationPort}.
*
* <h2>Two-level schema</h2>
* <p>The schema consists of exactly two tables:
* <ol>
* <li><strong>{@code document_record}</strong> — the document master record
* (Dokument-Stammsatz). One row per unique SHA-256 fingerprint.</li>
* <li><strong>{@code processing_attempt}</strong> — the processing attempt history
* (Versuchshistorie). One row per historised processing attempt, referencing
* the master record via fingerprint.</li>
* </ol>
* <p>Erstellt oder verifiziert das Zwei-Ebenen-Persistenzschema in der konfigurierten
* SQLite-Datenbank und führt dabei eine differenzierte Startstrategie durch,
* die drei Fälle unterscheidet:
*
* <h2>Schema evolution</h2>
* <p>
* When upgrading from an earlier schema, this adapter uses idempotent
* {@code ALTER TABLE ... ADD COLUMN} statements for both tables. Columns that already
* exist are silently skipped, making the evolution safe to run on both fresh and existing
* databases. The current evolution adds:
* <ul>
* <li>AI-traceability columns to {@code processing_attempt}</li>
* <li>Target-copy columns ({@code last_target_path}, {@code last_target_file_name}) to
* {@code document_record}</li>
* <li>Target-copy column ({@code final_target_file_name}) to {@code processing_attempt}</li>
* <li>Provider-identifier column ({@code ai_provider}) to {@code processing_attempt};
* existing rows receive {@code NULL} as the default, which is the correct value for
* attempts recorded before provider tracking was introduced.</li>
* </ul>
* <h2>Fall 1 Leere Datenbank</h2>
* <p>Keine fachlichen Tabellen und keine Flyway-History-Tabelle vorhanden
* (bzw. Datei existiert noch nicht). Flyway führt {@code V1__initial_schema.sql}
* vollständig aus und legt das komplette Schema an.
*
* <h2>Legacy-state migration</h2>
* <p>
* Documents in an earlier positive intermediate state ({@code SUCCESS} recorded without
* a validated naming proposal) are idempotently migrated to {@code READY_FOR_AI} so that
* the AI naming pipeline processes them in the next run. Terminal negative states
* ({@code FAILED_RETRYABLE}, {@code FAILED_FINAL}, skip states) are left unchanged.
* <h2>Fall 2 Bestehende Datenbank ohne Flyway-History</h2>
* <p>Fachliche Tabellen sind vorhanden, aber die Flyway-History-Tabelle fehlt.
* Vor der Baseline-Eintralung wird eine vollständige Schema-Prüfung gegen das
* V1-Zielschema durchgeführt. Bei konformem Schema wird ein datiertes Backup der
* SQLite-Datei erstellt, und Flyway trägt nur eine Baseline ein (Skript wird
* <em>nicht</em> ausgeführt). Bei fehlendem Schema-Element bricht der Start mit
* einer klaren Fehlermeldung ab.
*
* <h2>Initialisation timing</h2>
* <p>This adapter must be invoked <em>once</em> at program startup, before the batch
* document processing loop begins.
* <h2>Fall 3 Folgestart mit Flyway-History</h2>
* <p>Flyway-History-Tabelle ist vorhanden. Flyway läuft idempotent und
* führt nur noch fehlende Migrationen aus.
*
* <h2>Architecture boundary</h2>
* <p>All JDBC connections, SQL DDL, and SQLite-specific behaviour are strictly confined
* to this class. No JDBC or SQLite types appear in the port interface or in any
* application/domain type.
* <h2>Fremdschlüssel</h2>
* <p>Foreign-Key-Durchsetzung wird über {@code SQLiteConfig.enforceForeignKeys(true)}
* auf DataSource-Ebene aktiviert, sodass jede neue Verbindung automatisch
* {@code PRAGMA foreign_keys = ON} erhält.
*
* <h2>Architekturgrenze</h2>
* <p>Alle JDBC-Verbindungen, SQL-DDL und SQLite-spezifisches Verhalten sind
* ausschließlich in dieser Klasse gekapselt. Im Port-Interface und in den
* Domain-/Application-Typen erscheinen keine JDBC- oder SQLite-Typen.
*/
public class SqliteSchemaInitializationAdapter implements PersistenceSchemaInitializationPort {
private static final Logger logger = LogManager.getLogger(SqliteSchemaInitializationAdapter.class);
// -------------------------------------------------------------------------
// DDL — document_record table
// Erwartete Tabellen und Spalten gemäß V1-Zielschema
// -------------------------------------------------------------------------
/**
* DDL for the document master record table.
* <p>
* Columns: id (PK), fingerprint (unique), last_known_source_locator,
* last_known_source_file_name, overall_status, content_error_count,
* transient_error_count, last_failure_instant, last_success_instant,
* created_at, updated_at.
*/
private static final String DDL_CREATE_DOCUMENT_RECORD = """
CREATE TABLE IF NOT EXISTS document_record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
last_known_source_locator TEXT NOT NULL,
last_known_source_file_name TEXT NOT NULL,
overall_status TEXT NOT NULL,
content_error_count INTEGER NOT NULL DEFAULT 0,
transient_error_count INTEGER NOT NULL DEFAULT 0,
last_failure_instant TEXT,
last_success_instant TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
CONSTRAINT uq_document_record_fingerprint UNIQUE (fingerprint)
)
""";
/** Alle erwarteten Spalten der Tabelle {@code document_record}. */
private static final Set<String> EXPECTED_COLUMNS_DOCUMENT_RECORD = Set.of(
"id", "fingerprint", "last_known_source_locator", "last_known_source_file_name",
"overall_status", "content_error_count", "transient_error_count",
"last_failure_instant", "last_success_instant", "created_at", "updated_at",
"last_target_path", "last_target_file_name"
);
/** Alle erwarteten Spalten der Tabelle {@code processing_attempt}. */
private static final Set<String> EXPECTED_COLUMNS_PROCESSING_ATTEMPT = Set.of(
"id", "fingerprint", "run_id", "attempt_number", "started_at", "ended_at",
"status", "failure_class", "failure_message", "retryable",
"model_name", "prompt_identifier", "processed_page_count", "sent_character_count",
"ai_raw_response", "ai_reasoning", "resolved_date", "date_source",
"validated_title", "final_target_file_name", "ai_provider"
);
/** Erwartete Indizes. */
private static final Set<String> EXPECTED_INDEXES = Set.of(
"idx_processing_attempt_fingerprint",
"idx_processing_attempt_run_id",
"idx_document_record_overall_status"
);
/** Name der Flyway-History-Tabelle. */
private static final String FLYWAY_HISTORY_TABLE = "flyway_schema_history";
// -------------------------------------------------------------------------
// DDL — processing_attempt table (base schema, without AI traceability cols)
// Felder
// -------------------------------------------------------------------------
/**
* DDL for the base processing attempt history table.
* <p>
* Base columns (present in all schema versions): id, fingerprint, run_id,
* attempt_number, started_at, ended_at, status, failure_class, failure_message, retryable.
* <p>
* AI traceability columns are added separately via {@code ALTER TABLE} to support
* idempotent evolution from earlier schemas.
*/
private static final String DDL_CREATE_PROCESSING_ATTEMPT = """
CREATE TABLE IF NOT EXISTS processing_attempt (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
run_id TEXT NOT NULL,
attempt_number INTEGER NOT NULL,
started_at TEXT NOT NULL,
ended_at TEXT NOT NULL,
status TEXT NOT NULL,
failure_class TEXT,
failure_message TEXT,
retryable INTEGER NOT NULL DEFAULT 0,
CONSTRAINT fk_processing_attempt_fingerprint
FOREIGN KEY (fingerprint) REFERENCES document_record (fingerprint),
CONSTRAINT uq_processing_attempt_fingerprint_number
UNIQUE (fingerprint, attempt_number)
)
""";
// -------------------------------------------------------------------------
// DDL — indexes
// -------------------------------------------------------------------------
/** Index on {@code processing_attempt.fingerprint} for fast per-document lookups. */
private static final String DDL_IDX_ATTEMPT_FINGERPRINT =
"CREATE INDEX IF NOT EXISTS idx_processing_attempt_fingerprint "
+ "ON processing_attempt (fingerprint)";
/** Index on {@code processing_attempt.run_id} for fast per-run lookups. */
private static final String DDL_IDX_ATTEMPT_RUN_ID =
"CREATE INDEX IF NOT EXISTS idx_processing_attempt_run_id "
+ "ON processing_attempt (run_id)";
/** Index on {@code document_record.overall_status} for fast status-based filtering. */
private static final String DDL_IDX_RECORD_STATUS =
"CREATE INDEX IF NOT EXISTS idx_document_record_overall_status "
+ "ON document_record (overall_status)";
// -------------------------------------------------------------------------
// DDL — columns added to processing_attempt via schema evolution
// -------------------------------------------------------------------------
/**
* Columns to add idempotently to {@code processing_attempt}.
* Each entry is {@code [column_name, column_type]}.
* <p>
* {@code ai_provider} is nullable; existing rows receive {@code NULL}, which is the
* correct sentinel for attempts recorded before provider tracking was introduced.
*/
private static final String[][] EVOLUTION_ATTEMPT_COLUMNS = {
{"model_name", "TEXT"},
{"prompt_identifier", "TEXT"},
{"processed_page_count", "INTEGER"},
{"sent_character_count", "INTEGER"},
{"ai_raw_response", "TEXT"},
{"ai_reasoning", "TEXT"},
{"resolved_date", "TEXT"},
{"date_source", "TEXT"},
{"validated_title", "TEXT"},
{"final_target_file_name", "TEXT"},
{"ai_provider", "TEXT"},
};
// -------------------------------------------------------------------------
// DDL — columns added to document_record via schema evolution
// -------------------------------------------------------------------------
/**
* Columns to add idempotently to {@code document_record}.
* Each entry is {@code [column_name, column_type]}.
*/
private static final String[][] EVOLUTION_RECORD_COLUMNS = {
{"last_target_path", "TEXT"},
{"last_target_file_name", "TEXT"},
};
// -------------------------------------------------------------------------
// Legacy-state status migration
// -------------------------------------------------------------------------
/**
* Migrates earlier positive intermediate states in {@code document_record} that were
* recorded as {@code SUCCESS} without a validated naming proposal to {@code READY_FOR_AI},
* so the AI naming pipeline processes them in the next run.
* <p>
* Only rows with {@code overall_status = 'SUCCESS'} that have no corresponding
* {@code processing_attempt} with {@code status = 'PROPOSAL_READY'} are updated.
* This migration is idempotent.
*/
private static final String SQL_MIGRATE_LEGACY_SUCCESS_TO_READY_FOR_AI = """
UPDATE document_record
SET overall_status = 'READY_FOR_AI',
updated_at = datetime('now')
WHERE overall_status = 'SUCCESS'
AND NOT EXISTS (
SELECT 1 FROM processing_attempt pa
WHERE pa.fingerprint = document_record.fingerprint
AND pa.status = 'PROPOSAL_READY'
)
""";
private final String jdbcUrl;
/**
* Constructs the adapter with the JDBC URL of the SQLite database file.
* Erstellt den Adapter mit der JDBC-URL der SQLite-Datenbankdatei.
*
* @param jdbcUrl the JDBC URL of the SQLite database; must not be null or blank
* @throws NullPointerException if {@code jdbcUrl} is null
* @throws IllegalArgumentException if {@code jdbcUrl} is blank
* @param jdbcUrl die JDBC-URL der SQLite-Datenbank; darf nicht {@code null} oder leer sein
* @throws NullPointerException wenn {@code jdbcUrl} {@code null} ist
* @throws IllegalArgumentException wenn {@code jdbcUrl} leer ist
*/
public SqliteSchemaInitializationAdapter(String jdbcUrl) {
Objects.requireNonNull(jdbcUrl, "jdbcUrl must not be null");
Objects.requireNonNull(jdbcUrl, "jdbcUrl darf nicht null sein");
if (jdbcUrl.isBlank()) {
throw new IllegalArgumentException("jdbcUrl must not be blank");
throw new IllegalArgumentException("jdbcUrl darf nicht leer sein");
}
this.jdbcUrl = jdbcUrl;
}
/**
* Creates or verifies the persistence schema and performs schema evolution and
* status migration.
* <p>
* Execution order:
* <ol>
* <li>Enable foreign key enforcement.</li>
* <li>Create {@code document_record} table (if not exists).</li>
* <li>Create {@code processing_attempt} table (if not exists).</li>
* <li>Create all indexes (if not exist).</li>
* <li>Add AI-traceability and provider-identifier columns to {@code processing_attempt}
* (idempotent evolution).</li>
* <li>Migrate earlier positive intermediate state to {@code READY_FOR_AI} (idempotent).</li>
* </ol>
* <p>
* All steps are safe to run on both fresh and existing databases.
* Erstellt oder verifiziert das Persistenzschema per Flyway.
*
* @throws DocumentPersistenceException if any DDL or migration step fails
* <p>Erkennt anhand des Datenbankzustands automatisch einen der drei Fälle
* (leere DB, bestehende DB ohne Flyway-History, Folgestart mit Flyway-History)
* und wählt die passende Flyway-Konfiguration.
*
* @throws DocumentPersistenceException wenn das Schema nicht erstellt oder verifiziert
* werden kann, oder wenn die Schema-Prüfung bei
* einer bestehenden Datenbank fehlschlägt
*/
@Override
public void initializeSchema() {
logger.info("Initialising SQLite persistence schema at: {}", jdbcUrl);
try (Connection connection = DriverManager.getConnection(jdbcUrl);
Statement statement = connection.createStatement()) {
logger.info("Schema-Initialisierung gestartet für: {}", jdbcUrl);
try {
DataSource dataSource = createDataSource();
DbState state = determineDbState(dataSource);
logger.info("Erkannter Datenbankzustand: {}", state);
// Enable foreign key enforcement (SQLite disables it by default)
statement.execute("PRAGMA foreign_keys = ON");
// Level 1: document master record
statement.execute(DDL_CREATE_DOCUMENT_RECORD);
logger.debug("Table 'document_record' created or already present.");
// Level 2: processing attempt history (base columns only)
statement.execute(DDL_CREATE_PROCESSING_ATTEMPT);
logger.debug("Table 'processing_attempt' created or already present.");
// Indexes for efficient per-document, per-run, and per-status access
statement.execute(DDL_IDX_ATTEMPT_FINGERPRINT);
statement.execute(DDL_IDX_ATTEMPT_RUN_ID);
statement.execute(DDL_IDX_RECORD_STATUS);
logger.debug("Indexes created or already present.");
// Schema evolution: add AI-traceability + target-copy columns (idempotent)
evolveTableColumns(connection, "processing_attempt", EVOLUTION_ATTEMPT_COLUMNS);
evolveTableColumns(connection, "document_record", EVOLUTION_RECORD_COLUMNS);
// Status migration: earlier positive intermediate state → READY_FOR_AI
int migrated = statement.executeUpdate(SQL_MIGRATE_LEGACY_SUCCESS_TO_READY_FOR_AI);
if (migrated > 0) {
logger.info("Status migration: {} document(s) migrated from legacy SUCCESS state to READY_FOR_AI.",
migrated);
} else {
logger.debug("Status migration: no documents required migration.");
switch (state) {
case EMPTY -> runFall1NewDb(dataSource);
case EXISTING_WITHOUT_FLYWAY -> runFall2BaselineExistingDb(dataSource);
case FLYWAY_MANAGED -> runFall3FollowUpStart(dataSource);
}
logger.info("SQLite schema initialisation and migration completed successfully.");
} catch (SQLException e) {
String message = "Failed to initialise SQLite persistence schema at '" + jdbcUrl + "': " + e.getMessage();
logger.error(message, e);
throw new DocumentPersistenceException(message, e);
logger.info("Schema-Initialisierung erfolgreich abgeschlossen.");
} catch (DocumentPersistenceException e) {
throw e;
} catch (Exception e) {
String msg = "Schema-Initialisierung fehlgeschlagen für '" + jdbcUrl + "': " + e.getMessage();
logger.error(msg, e);
throw new DocumentPersistenceException(msg, e);
}
}
/**
* Idempotently adds the given columns to the specified table.
* <p>
* For each column that does not yet exist, an {@code ALTER TABLE ... ADD COLUMN}
* statement is executed. Columns that already exist are silently skipped.
* Gibt die JDBC-URL zurück, die dieser Adapter verwendet.
*
* @param connection an open JDBC connection to the database
* @param tableName the name of the table to evolve
* @param columns array of {@code [column_name, column_type]} pairs to add
* @throws SQLException if a column addition fails for a reason other than duplicate column
*/
private void evolveTableColumns(Connection connection, String tableName, String[][] columns)
throws SQLException {
java.util.Set<String> existingColumns = new java.util.HashSet<>();
try (ResultSet rs = connection.getMetaData().getColumns(null, null, tableName, null)) {
while (rs.next()) {
existingColumns.add(rs.getString("COLUMN_NAME").toLowerCase());
}
}
for (String[] col : columns) {
String columnName = col[0];
String columnType = col[1];
if (!existingColumns.contains(columnName.toLowerCase())) {
String alterSql = "ALTER TABLE " + tableName + " ADD COLUMN " + columnName + " " + columnType;
try (Statement stmt = connection.createStatement()) {
stmt.execute(alterSql);
}
logger.debug("Schema evolution: added column '{}' to '{}'.", columnName, tableName);
} else {
logger.debug("Schema evolution: column '{}' in '{}' already present, skipped.",
columnName, tableName);
}
}
}
/**
* Returns the JDBC URL this adapter uses to connect to the SQLite database.
*
* @return the JDBC URL; never null or blank
* @return die JDBC-URL; niemals {@code null} oder leer
*/
public String getJdbcUrl() {
return jdbcUrl;
}
// -------------------------------------------------------------------------
// Fallbehandlung
// -------------------------------------------------------------------------
/**
* Fall 1: Leere Datenbank Flyway führt V1__initial_schema.sql vollständig aus.
*
* @param dataSource die konfigurierte DataSource
*/
private void runFall1NewDb(DataSource dataSource) {
logger.info("Fall 1: Leere Datenbank Flyway legt vollständiges Schema an.");
Flyway flyway = buildFlyway(dataSource, false);
flyway.migrate();
logger.info("Fall 1: Schema vollständig erstellt.");
}
/**
* Fall 2: Bestehende Datenbank ohne Flyway-History.
*
* <p>Führt die vollständige Schema-Prüfcheckliste durch. Bei konformem Schema
* wird ein datiertes Backup angelegt und Flyway trägt nur eine Baseline ein.
* Bei fehlendem Schema-Element bricht der Start ab.
*
* @param dataSource die konfigurierte DataSource
* @throws DocumentPersistenceException wenn das Schema nicht konform ist oder das Backup schlägt fehl
*/
private void runFall2BaselineExistingDb(DataSource dataSource) {
logger.info("Fall 2: Bestehende Datenbank ohne Flyway-History Schema-Prüfung läuft.");
// Vollständige Schema-Prüfung vor Baseline
try (Connection conn = dataSource.getConnection()) {
verifyExistingSchemaMatches(conn);
} catch (SQLException e) {
String msg = "Datenbankverbindung für Schema-Prüfung fehlgeschlagen: " + e.getMessage();
logger.error(msg, e);
throw new DocumentPersistenceException(msg, e);
}
logger.info("Fall 2: Schema-Prüfung bestanden.");
// Backup der SQLite-Datei anlegen
createDatedBackup();
// Flyway-Baseline eintragen (V1 wird NICHT ausgeführt)
Flyway flyway = buildFlyway(dataSource, true);
flyway.migrate();
logger.info("Fall 2: Flyway-Baseline erfolgreich eingetragen.");
}
/**
* Fall 3: Folgestart Flyway läuft idempotent und führt nur fehlende Migrationen aus.
*
* @param dataSource die konfigurierte DataSource
*/
private void runFall3FollowUpStart(DataSource dataSource) {
logger.info("Fall 3: Folgestart mit Flyway-History idempotente Migration.");
Flyway flyway = buildFlyway(dataSource, false);
flyway.migrate();
logger.info("Fall 3: Migration abgeschlossen (idempotent).");
}
/**
* Erzeugt eine standardisiert konfigurierte {@link Flyway}-Instanz.
*
* <p>Alle drei Fälle nutzen dieselbe Grundkonfiguration:
* <ul>
* <li>Explizite Migrations-Location {@code classpath:db/migration} verhindert
* unerwünschtes Klasspfad-Scannen des gesamten JARs.</li>
* <li>Keine Umgebungsvariablen-Konfiguration verhindert unbeabsichtigte
* Übersteuerung durch Build-System-Variablen.</li>
* <li>Kein Verbindungs-Retry ({@code connectRetries=0}) Fehler schlagen
* sofort statt nach mehreren Sekunden Wartezeit fehl.</li>
* </ul>
*
* @param dataSource die zu verwendende DataSource
* @param baselineOnMigrate ob beim Migrate eine Baseline einzutragen ist (nur Fall 2)
* @return eine konfigurierte, betriebsbereite {@link Flyway}-Instanz
*/
private Flyway buildFlyway(DataSource dataSource, boolean baselineOnMigrate) {
var config = Flyway.configure()
.dataSource(dataSource)
.locations("classpath:db/migration")
.connectRetries(0)
.baselineOnMigrate(baselineOnMigrate);
if (baselineOnMigrate) {
config = config
.baselineVersion("1")
.baselineDescription("Bestehende Datenbank baselined");
}
return config.load();
}
// -------------------------------------------------------------------------
// Datenbankzustand erkennen
// -------------------------------------------------------------------------
/**
* Repräsentiert den erkannten Zustand der SQLite-Datenbank beim Start.
*/
enum DbState {
/** Keine fachlichen Tabellen und keine Flyway-History vorhanden. */
EMPTY,
/** Fachliche Tabellen vorhanden, aber keine Flyway-History-Tabelle. */
EXISTING_WITHOUT_FLYWAY,
/** Flyway-History-Tabelle vorhanden Datenbank wird bereits von Flyway verwaltet. */
FLYWAY_MANAGED
}
/**
* Ermittelt den aktuellen Zustand der Datenbank.
*
* <p>"Leer" bedeutet: keine Tabellen vorhanden nicht nur Dateigröße 0 Byte.
*
* @param dataSource die zu prüfende DataSource
* @return der erkannte {@link DbState}
* @throws DocumentPersistenceException bei Verbindungsfehlern
*/
private DbState determineDbState(DataSource dataSource) {
try (Connection conn = dataSource.getConnection()) {
DatabaseMetaData meta = conn.getMetaData();
Set<String> tables = readTableNames(meta);
if (tables.contains(FLYWAY_HISTORY_TABLE)) {
return DbState.FLYWAY_MANAGED;
}
// "Leer" = keine Tabellen vorhanden (unabhängig von Dateigröße)
boolean hasFachlicheTabellen = tables.contains("document_record")
|| tables.contains("processing_attempt");
if (hasFachlicheTabellen) {
return DbState.EXISTING_WITHOUT_FLYWAY;
}
return DbState.EMPTY;
} catch (SQLException e) {
String msg = "Datenbankzustand konnte nicht ermittelt werden: " + e.getMessage();
logger.error(msg, e);
throw new DocumentPersistenceException(msg, e);
}
}
// -------------------------------------------------------------------------
// Schema-Prüfcheckliste (Fall 2)
// -------------------------------------------------------------------------
/**
* Vollständige Schema-Prüfung gegen das V1-Zielschema.
*
* <p>Prüft alle erwarteten Tabellen, Spalten, Constraints und Indizes per
* {@link DatabaseMetaData}. Bei fehlendem Element wird der Start sofort mit
* einer aussagekräftigen Fehlermeldung abgebrochen kein stilles Heilen.
*
* @param conn offene JDBC-Verbindung zur Datenbank
* @throws DocumentPersistenceException wenn ein Schema-Element fehlt
* @throws SQLException bei technischen Datenbankfehlern
*/
private void verifyExistingSchemaMatches(Connection conn) throws SQLException {
DatabaseMetaData meta = conn.getMetaData();
List<String> fehler = new ArrayList<>();
// Tabellen prüfen
Set<String> tabellen = readTableNames(meta);
if (!tabellen.contains("document_record")) {
fehler.add("Tabelle 'document_record' fehlt");
}
if (!tabellen.contains("processing_attempt")) {
fehler.add("Tabelle 'processing_attempt' fehlt");
}
// Spalten prüfen nur wenn Tabellen vorhanden
if (tabellen.contains("document_record")) {
pruefeSpaltenvollstaendigkeit(meta, "document_record",
EXPECTED_COLUMNS_DOCUMENT_RECORD, fehler);
}
if (tabellen.contains("processing_attempt")) {
pruefeSpaltenvollstaendigkeit(meta, "processing_attempt",
EXPECTED_COLUMNS_PROCESSING_ATTEMPT, fehler);
}
// Indizes prüfen
if (tabellen.contains("document_record") && tabellen.contains("processing_attempt")) {
Set<String> vorhandeneIndizes = readIndexNames(meta);
for (String erwartetIndex : EXPECTED_INDEXES) {
if (!vorhandeneIndizes.contains(erwartetIndex)) {
fehler.add("Index '" + erwartetIndex + "' fehlt");
}
}
}
// Constraints prüfen (soweit per Metadata prüfbar)
if (tabellen.contains("document_record")) {
pruefeUniqueConstraintAufFingerprint(conn, fehler);
}
if (tabellen.contains("processing_attempt")) {
pruefeForeignKeyAufDocumentRecord(conn, fehler);
}
if (!fehler.isEmpty()) {
String fehlerliste = String.join("; ", fehler);
String msg = "Schema-Prüfung fehlgeschlagen folgende Elemente fehlen oder sind nicht konform: "
+ fehlerliste;
logger.error(msg);
throw new DocumentPersistenceException(msg);
}
}
/**
* Prüft, ob alle erwarteten Spalten in der angegebenen Tabelle vorhanden sind.
*
* @param meta Datenbankmetadaten
* @param tabellenname Name der zu prüfenden Tabelle
* @param erwarteteSpalten Menge der erwarteten Spaltennamen (Kleinschreibung)
* @param fehler Liste, in die fehlende Elemente eingetragen werden
* @throws SQLException bei technischen Datenbankfehlern
*/
private void pruefeSpaltenvollstaendigkeit(DatabaseMetaData meta, String tabellenname,
Set<String> erwarteteSpalten, List<String> fehler) throws SQLException {
Set<String> vorhandeneSpalten = new HashSet<>();
try (ResultSet rs = meta.getColumns(null, null, tabellenname, null)) {
while (rs.next()) {
vorhandeneSpalten.add(rs.getString("COLUMN_NAME").toLowerCase());
}
}
for (String erwartet : erwarteteSpalten) {
if (!vorhandeneSpalten.contains(erwartet)) {
fehler.add("Spalte '" + tabellenname + "." + erwartet + "' fehlt");
}
}
}
/**
* Prüft das UNIQUE-Constraint auf {@code document_record.fingerprint} anhand der
* Indexmetadaten.
*
* @param conn offene JDBC-Verbindung
* @param fehler Liste, in die fehlende Elemente eingetragen werden
* @throws SQLException bei technischen Datenbankfehlern
*/
private void pruefeUniqueConstraintAufFingerprint(Connection conn,
List<String> fehler) throws SQLException {
boolean uniqueGefunden = false;
try (ResultSet rs = conn.getMetaData().getIndexInfo(null, null, "document_record", true, false)) {
while (rs.next()) {
String spalte = rs.getString("COLUMN_NAME");
if ("fingerprint".equalsIgnoreCase(spalte)) {
uniqueGefunden = true;
break;
}
}
}
if (!uniqueGefunden) {
fehler.add("UNIQUE-Constraint auf 'document_record.fingerprint' fehlt");
}
}
/**
* Prüft den Foreign Key von {@code processing_attempt.fingerprint} auf
* {@code document_record.fingerprint} anhand der Importschlüssel-Metadaten.
*
* @param conn offene JDBC-Verbindung
* @param fehler Liste, in die fehlende Elemente eingetragen werden
* @throws SQLException bei technischen Datenbankfehlern
*/
private void pruefeForeignKeyAufDocumentRecord(Connection conn,
List<String> fehler) throws SQLException {
boolean fkGefunden = false;
try (ResultSet rs = conn.getMetaData().getImportedKeys(null, null, "processing_attempt")) {
while (rs.next()) {
String pkTabelle = rs.getString("PKTABLE_NAME");
String fkSpalte = rs.getString("FKCOLUMN_NAME");
if ("document_record".equalsIgnoreCase(pkTabelle)
&& "fingerprint".equalsIgnoreCase(fkSpalte)) {
fkGefunden = true;
break;
}
}
}
if (!fkGefunden) {
fehler.add("Foreign Key von 'processing_attempt.fingerprint' auf 'document_record.fingerprint' fehlt");
}
}
// -------------------------------------------------------------------------
// Backup-Erstellung (Fall 2)
// -------------------------------------------------------------------------
/**
* Erstellt eine datierte Kopie der SQLite-Datei als Backup.
*
* <p>Das Backup-Dateiname-Schema lautet: {@code <original>.<timestamp>.bak},
* z. B. {@code data.db.20260430T120000Z.bak}.
* Bei einer Kollision wird ein Zähler angehängt.
*
* @throws DocumentPersistenceException wenn das Backup nicht angelegt werden kann
*/
private void createDatedBackup() {
Path dbPath = extractDbPath();
if (dbPath == null) {
logger.warn("Kein lokaler Dateipfad aus JDBC-URL ableitbar Backup übersprungen: {}", jdbcUrl);
return;
}
if (!Files.exists(dbPath)) {
logger.debug("Datenbankdatei existiert noch nicht kein Backup nötig.");
return;
}
String zeitstempel = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'")
.format(java.time.ZonedDateTime.now(java.time.ZoneOffset.UTC));
Path backup = dbPath.resolveSibling(dbPath.getFileName() + "." + zeitstempel + ".bak");
// Kollisionsauflösung
int zaehler = 1;
while (Files.exists(backup)) {
backup = dbPath.resolveSibling(dbPath.getFileName() + "." + zeitstempel + "." + zaehler + ".bak");
zaehler++;
}
try {
Files.copy(dbPath, backup, StandardCopyOption.COPY_ATTRIBUTES);
logger.info("Backup der Datenbankdatei erstellt: {}", backup);
} catch (IOException e) {
String msg = "Backup der Datenbankdatei konnte nicht erstellt werden: " + e.getMessage();
logger.error(msg, e);
throw new DocumentPersistenceException(msg, e);
}
}
/**
* Leitet den Dateisystempfad aus der JDBC-URL ab.
*
* <p>Erwartet URLs der Form {@code jdbc:sqlite:/pfad/zur/datei.db}.
*
* @return der abgeleitete {@link Path} oder {@code null}, wenn kein Pfad ableitbar ist
*/
private Path extractDbPath() {
// Erwartet: jdbc:sqlite:/pfad/zur/datei oder jdbc:sqlite:C:/pfad/datei
String prefix = "jdbc:sqlite:";
if (!jdbcUrl.startsWith(prefix)) {
return null;
}
String pfad = jdbcUrl.substring(prefix.length());
if (pfad.isBlank()) {
return null;
}
try {
return Paths.get(pfad);
} catch (Exception e) {
logger.warn("Pfad aus JDBC-URL konnte nicht geparst werden: {}", pfad);
return null;
}
}
// -------------------------------------------------------------------------
// DataSource-Erstellung
// -------------------------------------------------------------------------
/**
* Erstellt eine {@link SQLiteDataSource} mit aktivierten Fremdschlüsseln.
*
* <p>Die Aktivierung über {@link SQLiteConfig#enforceForeignKeys(boolean)} stellt
* sicher, dass jede neue Verbindung automatisch {@code PRAGMA foreign_keys = ON}
* erhält ein einmaliges Statement nach dem Verbindungsaufbau wäre nicht ausreichend.
*
* @return eine konfigurierte {@link DataSource}; niemals {@code null}
*/
private DataSource createDataSource() {
SQLiteConfig config = new SQLiteConfig();
config.enforceForeignKeys(true);
SQLiteDataSource ds = new SQLiteDataSource(config);
ds.setUrl(jdbcUrl);
return ds;
}
// -------------------------------------------------------------------------
// Hilfsmethoden
// -------------------------------------------------------------------------
/**
* Liest alle Tabellennamen aus den Datenbankmetadaten (Kleinschreibung).
*
* @param meta Datenbankmetadaten
* @return Menge aller Tabellennamen in Kleinschreibung
* @throws SQLException bei technischen Datenbankfehlern
*/
private static Set<String> readTableNames(DatabaseMetaData meta) throws SQLException {
Set<String> names = new HashSet<>();
try (ResultSet rs = meta.getTables(null, null, "%", new String[]{"TABLE"})) {
while (rs.next()) {
names.add(rs.getString("TABLE_NAME").toLowerCase());
}
}
return names;
}
/**
* Liest alle Indexnamen aus den Datenbankmetadaten für beide fachlichen Tabellen.
*
* @param meta Datenbankmetadaten
* @return Menge aller Indexnamen in Kleinschreibung
* @throws SQLException bei technischen Datenbankfehlern
*/
private static Set<String> readIndexNames(DatabaseMetaData meta) throws SQLException {
Set<String> names = new HashSet<>();
for (String tabelle : new String[]{"document_record", "processing_attempt"}) {
try (ResultSet rs = meta.getIndexInfo(null, null, tabelle, false, false)) {
while (rs.next()) {
String indexName = rs.getString("INDEX_NAME");
if (indexName != null) {
names.add(indexName.toLowerCase());
}
}
}
}
return names;
}
}
@@ -1,35 +1,43 @@
/**
* SQLite persistence adapter for the two-level persistence model.
* SQLite-Persistenz-Adapter für das Zwei-Ebenen-Persistenzmodell.
*
* <h2>Purpose</h2>
* <p>This package contains the technical SQLite infrastructure for the persistence
* layer. It is the only place in the entire application where JDBC connections, SQL DDL,
* and SQLite-specific types are used. No JDBC or SQLite types leak into the
* {@code application} or {@code domain} modules.
* <h2>Zweck</h2>
* <p>Dieses Paket enthält die technische SQLite-Infrastruktur der Persistenzschicht.
* Es ist die einzige Stelle in der gesamten Anwendung, an der JDBC-Verbindungen,
* SQL-DDL und SQLite-spezifische Typen verwendet werden. Keine JDBC- oder
* SQLite-Typen verlassen dieses Paket in Richtung der {@code application}-
* oder {@code domain}-Module.
*
* <h2>Two-level persistence model</h2>
* <p>Persistence is structured in exactly two levels:
* <h2>Zwei-Ebenen-Persistenzmodell</h2>
* <p>Die Persistenz ist in genau zwei Ebenen strukturiert:
* <ol>
* <li><strong>Document master record</strong> ({@code document_record} table)
* one row per unique SHA-256 fingerprint; carries the current overall status,
* failure counters, and the most recently known source location.</li>
* <li><strong>Processing attempt history</strong> ({@code processing_attempt} table)
* one row per historised processing attempt; references the master record via
* fingerprint; attempt numbers are monotonically increasing per fingerprint.</li>
* <li><strong>Dokument-Stammsatz</strong> ({@code document_record}-Tabelle)
* eine Zeile pro eindeutigem SHA-256-Fingerprint; trägt den aktuellen
* Gesamtstatus, Fehlerzähler und den zuletzt bekannten Quellort.</li>
* <li><strong>Versuchshistorie</strong> ({@code processing_attempt}-Tabelle)
* eine Zeile pro historisiertem Verarbeitungsversuch; referenziert den
* Stammsatz über den Fingerprint; Versuchsnummern sind pro Fingerprint
* monoton steigend.</li>
* </ol>
*
* <h2>Schema initialisation timing</h2>
* <p>The {@link de.gecheckt.pdf.umbenenner.adapter.out.sqlite.SqliteSchemaInitializationAdapter}
* implements the
* <h2>Schema-Initialisierung mit Flyway</h2>
* <p>Der {@link de.gecheckt.pdf.umbenenner.adapter.out.sqlite.SqliteSchemaInitializationAdapter}
* implementiert den
* {@link de.gecheckt.pdf.umbenenner.application.port.out.PersistenceSchemaInitializationPort}
* and must be called <em>once</em> at program startup, before the batch document
* processing loop begins. There is no lazy or hidden initialisation during document
* processing.
* und muss <em>einmal</em> beim Programmstart aufgerufen werden, bevor die
* Verarbeitungsschleife beginnt. Die Initialisierung unterscheidet drei Fälle:
* leere Datenbank, bestehende Datenbank ohne Flyway-History (Baseline-Eintragung
* nach vollständiger Schema-Prüfung) und Folgestart mit Flyway-History (idempotent).
*
* <h2>Architecture boundary</h2>
* <p>All JDBC connections, SQL statements, and SQLite-specific behaviour are strictly
* confined to this package. The application layer interacts exclusively through the
* port interfaces defined in
* <h2>Fremdschlüssel</h2>
* <p>Foreign-Key-Durchsetzung wird über {@code SQLiteConfig.enforceForeignKeys(true)}
* auf DataSource-Ebene aktiviert, sodass jede neue Verbindung automatisch
* {@code PRAGMA foreign_keys = ON} erhält.
*
* <h2>Architekturgrenze</h2>
* <p>Alle JDBC-Verbindungen, SQL-Anweisungen und SQLite-spezifisches Verhalten sind
* ausschließlich in diesem Paket gekapselt. Die Application-Schicht interagiert
* ausschließlich über die Port-Interfaces in
* {@code de.gecheckt.pdf.umbenenner.application.port.out}.
*/
package de.gecheckt.pdf.umbenenner.adapter.out.sqlite;
@@ -0,0 +1,58 @@
-- Vollständiges Basisschema: Dokument-Stammsatz und Versuchshistorie.
-- Dieses Skript wird für neue Datenbanken ausgeführt (Fall 1).
-- Für bestehende Datenbanken mit konformem Schema wird nur eine Flyway-Baseline
-- eingetragen; das Skript wird in diesem Fall NICHT ausgeführt (Fall 2).
CREATE TABLE document_record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
last_known_source_locator TEXT NOT NULL,
last_known_source_file_name TEXT NOT NULL,
overall_status TEXT NOT NULL,
content_error_count INTEGER NOT NULL DEFAULT 0,
transient_error_count INTEGER NOT NULL DEFAULT 0,
last_failure_instant TEXT,
last_success_instant TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
last_target_path TEXT,
last_target_file_name TEXT,
CONSTRAINT uq_document_record_fingerprint UNIQUE (fingerprint)
);
CREATE TABLE processing_attempt (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
run_id TEXT NOT NULL,
attempt_number INTEGER NOT NULL,
started_at TEXT NOT NULL,
ended_at TEXT NOT NULL,
status TEXT NOT NULL,
failure_class TEXT,
failure_message TEXT,
retryable INTEGER NOT NULL DEFAULT 0,
model_name TEXT,
prompt_identifier TEXT,
processed_page_count INTEGER,
sent_character_count INTEGER,
ai_raw_response TEXT,
ai_reasoning TEXT,
resolved_date TEXT,
date_source TEXT,
validated_title TEXT,
final_target_file_name TEXT,
ai_provider TEXT,
CONSTRAINT fk_processing_attempt_fingerprint
FOREIGN KEY (fingerprint) REFERENCES document_record (fingerprint),
CONSTRAINT uq_processing_attempt_fingerprint_number
UNIQUE (fingerprint, attempt_number)
);
CREATE INDEX idx_processing_attempt_fingerprint
ON processing_attempt (fingerprint);
CREATE INDEX idx_processing_attempt_run_id
ON processing_attempt (run_id);
CREATE INDEX idx_document_record_overall_status
ON document_record (overall_status);
@@ -24,11 +24,11 @@ import de.gecheckt.pdf.umbenenner.domain.model.ProcessingStatus;
import de.gecheckt.pdf.umbenenner.domain.model.RunId;
/**
* Tests for the additive {@code ai_provider} column in {@code processing_attempt}.
* <p>
* Covers schema migration (idempotency, nullable default for existing rows),
* write/read round-trips for both supported provider identifiers, and
* backward compatibility with databases created before provider tracking was introduced.
* Tests für {@code ai_provider} in {@code processing_attempt}.
*
* <p>Prüft Schreib-/Lese-Roundtrips für beide Provider-Identifikatoren,
* Idempotenz der Initialisierung sowie das Verhalten bei Schemata,
* die nicht dem Zielschema entsprechen (harter Abbruch per Fall-2-Strategie).
*/
class SqliteAttemptProviderPersistenceTest {
@@ -64,25 +64,24 @@ class SqliteAttemptProviderPersistenceTest {
}
/**
* A database that already has the {@code processing_attempt} table without
* {@code ai_provider} (simulating an existing installation before this column was added)
* must receive the column via the idempotent schema evolution.
* Eine bestehende Datenbank ohne {@code ai_provider}-Spalte in {@code processing_attempt}
* entspricht nicht dem vollständigen Zielschema. Die Initialisierung muss mit einem
* klaren Fehler abbrechen, da kein stilles Heilen stattfindet.
*/
@Test
void addsProviderColumnOnExistingDbWithoutColumn() throws SQLException {
// Bootstrap schema without the ai_provider column (simulate legacy DB)
void existingDbOhneAiProviderSpalte_brichtAb() throws SQLException {
// Schema ohne ai_provider anlegen
createLegacySchema();
assertThat(columnExists("processing_attempt", "ai_provider"))
.as("ai_provider must not be present before evolution")
.as("ai_provider darf im Legacy-Schema noch nicht vorhanden sein")
.isFalse();
// Running initializeSchema must add the column
schemaAdapter.initializeSchema();
assertThat(columnExists("processing_attempt", "ai_provider"))
.as("ai_provider column must be added by schema evolution")
.isTrue();
// Initialisierung muss mit Fehler abbrechen (nicht konformes Schema)
org.junit.jupiter.api.Assertions.assertThrows(
de.gecheckt.pdf.umbenenner.application.port.out.DocumentPersistenceException.class,
() -> schemaAdapter.initializeSchema(),
"Erwarte Fehler bei nicht konformem Schema (fehlende ai_provider-Spalte)");
}
/**
@@ -101,25 +100,28 @@ class SqliteAttemptProviderPersistenceTest {
}
/**
* Rows that existed before the {@code ai_provider} column was added must have
* {@code NULL} as the column value, not a non-null default.
* Neue Versuche die ohne Provider-Information gespeichert werden (z. B. über
* {@code ProcessingAttempt.withoutAiFields}), müssen {@code null} als
* {@code ai_provider} zurückliefern.
*/
@Test
void existingRowsKeepNullProvider() throws SQLException {
// Create legacy schema and insert a row without ai_provider
createLegacySchema();
DocumentFingerprint fp = fingerprint("aa");
insertLegacyDocumentRecord(fp);
insertLegacyAttemptRow(fp, "READY_FOR_AI");
// Now evolve the schema
void neuerVersuchOhneProvider_haeltNullProviderNachSchreibenUndLesen() {
schemaAdapter.initializeSchema();
DocumentFingerprint fp = fingerprint("aa");
insertDocumentRecord(fp);
// Read the existing row — ai_provider must be NULL
List<ProcessingAttempt> attempts = repository.findAllByFingerprint(fp);
assertThat(attempts).hasSize(1);
assertThat(attempts.get(0).aiProvider())
.as("Existing rows must have NULL ai_provider after schema evolution")
java.time.Instant now = java.time.Instant.now().truncatedTo(java.time.temporal.ChronoUnit.MICROS);
ProcessingAttempt attemptOhneProvider = ProcessingAttempt.withoutAiFields(
fp, new RunId("run-null"), 1,
now, now.plusSeconds(1),
ProcessingStatus.FAILED_RETRYABLE,
"Err", "msg", true);
repository.save(attemptOhneProvider);
List<ProcessingAttempt> gelesen = repository.findAllByFingerprint(fp);
assertThat(gelesen).hasSize(1);
assertThat(gelesen.get(0).aiProvider())
.as("Versuche ohne Provider müssen null zurückgeben")
.isNull();
}
@@ -213,29 +215,24 @@ class SqliteAttemptProviderPersistenceTest {
}
/**
* Reading a database that was created without the {@code ai_provider} column
* (a pre-extension database) must succeed; the new field must be empty/null
* for historical attempts.
* Eine Datenbank mit nicht konformem Schema (fehlende Spalten, fehlende Indizes)
* wird von der Initialisierung mit einem klaren Fehler abgebrochen.
* Es findet kein stilles Heilen statt.
*/
@Test
void legacyDataReadingDoesNotFail() throws SQLException {
// Set up legacy schema with a row that has no ai_provider column
void nichtKonformesSchema_brichtMitAussagekraeftigemFehlerAb() throws SQLException {
// Legacy-Schema anlegen (fehlt: ai_provider, last_target_path, last_target_file_name,
// Indizes fehlen ebenfalls)
createLegacySchema();
DocumentFingerprint fp = fingerprint("ee");
insertLegacyDocumentRecord(fp);
insertLegacyAttemptRow(fp, "FAILED_RETRYABLE");
// Evolve schema — now ai_provider column exists but legacy rows have NULL
schemaAdapter.initializeSchema();
// Reading must not throw and must return null for ai_provider
List<ProcessingAttempt> attempts = repository.findAllByFingerprint(fp);
assertThat(attempts).hasSize(1);
assertThat(attempts.get(0).aiProvider())
.as("Legacy attempt (from before provider tracking) must have null aiProvider")
.isNull();
// Other fields must still be readable
assertThat(attempts.get(0).status()).isEqualTo(ProcessingStatus.FAILED_RETRYABLE);
// Initialisierung muss abbrechen
org.junit.jupiter.api.Assertions.assertThrows(
de.gecheckt.pdf.umbenenner.application.port.out.DocumentPersistenceException.class,
() -> schemaAdapter.initializeSchema(),
"Erwarte Fehler bei nicht konformem Bestands-Schema");
}
/**
@@ -3,6 +3,7 @@ package de.gecheckt.pdf.umbenenner.adapter.out.sqlite;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -14,38 +15,34 @@ import java.util.Set;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.sqlite.SQLiteConfig;
import org.sqlite.SQLiteDataSource;
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentPersistenceException;
/**
* Tests for {@link SqliteSchemaInitializationAdapter}.
* <p>
* Verifies that the two-level schema is created correctly, that schema evolution
* (idempotent addition of AI traceability columns) works, that the idempotent
* status migration of earlier positive intermediate states to {@code READY_FOR_AI}
* is correct, and that invalid configuration is rejected.
* Tests für {@link SqliteSchemaInitializationAdapter}.
*
* <p>Prüft die differenzierte 3-Fall-Strategie (leere DB, bestehende DB ohne
* Flyway-History, Folgestart), die vollständige Schema-Prüfcheckliste für Fall 2,
* die Foreign-Key-Aktivierung via DataSource sowie den Konstruktor.
*/
class SqliteSchemaInitializationAdapterTest {
@TempDir
Path tempDir;
// -------------------------------------------------------------------------
// Construction
// Konstruktor
// -------------------------------------------------------------------------
@Test
void constructor_rejectsNullJdbcUrl() {
assertThatThrownBy(() -> new SqliteSchemaInitializationAdapter(null))
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("jdbcUrl");
.isInstanceOf(NullPointerException.class);
}
@Test
void constructor_rejectsBlankJdbcUrl() {
assertThatThrownBy(() -> new SqliteSchemaInitializationAdapter(" "))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("jdbcUrl");
.isInstanceOf(IllegalArgumentException.class);
}
@Test
@@ -56,215 +53,341 @@ class SqliteSchemaInitializationAdapterTest {
}
// -------------------------------------------------------------------------
// Schema creation tables present
// Fall 1: Leere Datenbank vollständiges Schema anlegen
// -------------------------------------------------------------------------
@Test
void initializeSchema_createsBothTables(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "schema_test.db");
SqliteSchemaInitializationAdapter adapter = new SqliteSchemaInitializationAdapter(jdbcUrl);
void fall1_leereDb_laegtVollstaendigesSchemaAn(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "fall1.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
adapter.initializeSchema();
Set<String> tables = readTableNames(jdbcUrl);
assertThat(tables).contains("document_record", "processing_attempt");
Set<String> tabellen = readTableNames(jdbcUrl);
assertThat(tabellen).contains("document_record", "processing_attempt");
}
@Test
void initializeSchema_documentRecordHasAllMandatoryColumns(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "columns_test.db");
void fall1_leereDb_documentRecordHatAlleErwartetenSpalten(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "fall1_columns_dr.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
Set<String> columns = readColumnNames(jdbcUrl, "document_record");
assertThat(columns).containsExactlyInAnyOrder(
"id",
"fingerprint",
"last_known_source_locator",
"last_known_source_file_name",
"overall_status",
"content_error_count",
"transient_error_count",
"last_failure_instant",
"last_success_instant",
"created_at",
"updated_at",
"last_target_path",
"last_target_file_name"
Set<String> spalten = readColumnNames(jdbcUrl, "document_record");
assertThat(spalten).containsExactlyInAnyOrder(
"id", "fingerprint", "last_known_source_locator", "last_known_source_file_name",
"overall_status", "content_error_count", "transient_error_count",
"last_failure_instant", "last_success_instant", "created_at", "updated_at",
"last_target_path", "last_target_file_name"
);
}
@Test
void initializeSchema_processingAttemptHasAllMandatoryColumns(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "attempt_columns_test.db");
void fall1_leereDb_processingAttemptHatAlleErwartetenSpalten(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "fall1_columns_pa.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
Set<String> columns = readColumnNames(jdbcUrl, "processing_attempt");
assertThat(columns).containsExactlyInAnyOrder(
"id",
"fingerprint",
"run_id",
"attempt_number",
"started_at",
"ended_at",
"status",
"failure_class",
"failure_message",
"retryable",
"model_name",
"prompt_identifier",
"processed_page_count",
"sent_character_count",
"ai_raw_response",
"ai_reasoning",
"resolved_date",
"date_source",
"validated_title",
"final_target_file_name",
"ai_provider"
Set<String> spalten = readColumnNames(jdbcUrl, "processing_attempt");
assertThat(spalten).containsExactlyInAnyOrder(
"id", "fingerprint", "run_id", "attempt_number", "started_at", "ended_at",
"status", "failure_class", "failure_message", "retryable",
"model_name", "prompt_identifier", "processed_page_count", "sent_character_count",
"ai_raw_response", "ai_reasoning", "resolved_date", "date_source",
"validated_title", "final_target_file_name", "ai_provider"
);
}
// -------------------------------------------------------------------------
// Idempotency
// -------------------------------------------------------------------------
@Test
void initializeSchema_isIdempotent_calledTwice(@TempDir Path dir) {
String jdbcUrl = jdbcUrl(dir, "idempotent_test.db");
SqliteSchemaInitializationAdapter adapter = new SqliteSchemaInitializationAdapter(jdbcUrl);
void fall1_leereDb_indizesVorhanden(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "fall1_indexes.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
// Must not throw on second call
adapter.initializeSchema();
adapter.initializeSchema();
Set<String> indizes = readIndexNames(jdbcUrl);
assertThat(indizes).contains(
"idx_processing_attempt_fingerprint",
"idx_processing_attempt_run_id",
"idx_document_record_overall_status"
);
}
/**
* "Leer" bedeutet: keine Tabellen vorhanden NICHT nur Dateigröße 0 Byte.
* Eine leere SQLite-Datei (0 Byte) muss als leere DB erkannt werden.
*/
@Test
void fall1_erkenntLeereDbAuchBeiDateiOhneInhalt(@TempDir Path dir) throws Exception {
// Leere Datei anlegen (0 Byte)
Path dbPath = dir.resolve("empty.db");
Files.createFile(dbPath);
assertThat(dbPath).exists();
String jdbcUrl = jdbcUrl(dir, "empty.db");
// Muss als Fall 1 behandelt werden und erfolgreich durchlaufen
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
Set<String> tabellen = readTableNames(jdbcUrl);
assertThat(tabellen).contains("document_record", "processing_attempt");
}
// -------------------------------------------------------------------------
// Unique constraint: fingerprint in document_record
// Fall 2: Bestehende DB ohne Flyway-History Baseline eintragen
// -------------------------------------------------------------------------
@Test
void documentRecord_fingerprintUniqueConstraintIsEnforced(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "unique_test.db");
void fall2_bestehendeDbOhneHistory_traegtBaseline_einUndLaeuftErfolgreich(@TempDir Path dir)
throws SQLException {
String jdbcUrl = jdbcUrl(dir, "fall2.db");
// Vollständiges konformes Schema anlegen (wie eine bestehende Produktions-DB)
erstelleKonformesSchema(jdbcUrl);
// Adapter muss als Fall 2 erkennen und Baseline eintragen
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String insertSql = """
INSERT INTO document_record
(fingerprint, last_known_source_locator, last_known_source_file_name,
overall_status, created_at, updated_at)
VALUES (?, 'locator', 'file.pdf', 'SUCCESS', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')
""";
// Flyway-History-Tabelle muss jetzt vorhanden sein
Set<String> tabellen = readTableNames(jdbcUrl);
assertThat(tabellen).contains("flyway_schema_history");
// Fachliche Daten müssen erhalten bleiben
assertThat(tabellen).contains("document_record", "processing_attempt");
}
@Test
void fall2_bestehendeDbOhneHistory_erstelltDatiertesBackup(@TempDir Path dir)
throws Exception {
Path dbPath = dir.resolve("fall2_backup.db");
String jdbcUrl = "jdbc:sqlite:" + dbPath.toAbsolutePath().toString().replace('\\', '/');
erstelleKonformesSchema(jdbcUrl);
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
// Backup-Datei muss vorhanden sein
long backupAnzahl = Files.list(dir)
.filter(p -> p.getFileName().toString().startsWith("fall2_backup.db.")
&& p.getFileName().toString().endsWith(".bak"))
.count();
assertThat(backupAnzahl).isEqualTo(1);
}
@Test
void fall2_bestehendeDbMitFehlendemElement_brichtMitFehlerAb(@TempDir Path dir) {
String jdbcUrl = jdbcUrl(dir, "fall2_broken.db");
// Schema ohne Spalte ai_provider anlegen (nicht konform)
erstelleSchemaOhneAiProvider(jdbcUrl);
assertThatThrownBy(() -> new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema())
.isInstanceOf(DocumentPersistenceException.class)
.hasMessageContaining("ai_provider");
}
@Test
void fall2_bestehendeDbOhneProcessingAttemptTabelle_brichtAb(@TempDir Path dir) {
String jdbcUrl = jdbcUrl(dir, "fall2_no_attempt.db");
// Nur document_record anlegen, processing_attempt fehlt
erstelleNurDocumentRecord(jdbcUrl);
assertThatThrownBy(() -> new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema())
.isInstanceOf(DocumentPersistenceException.class)
.hasMessageContaining("processing_attempt");
}
// -------------------------------------------------------------------------
// Fall 3: Folgestart mit Flyway-History idempotent
// -------------------------------------------------------------------------
@Test
void fall3_folgestart_laeuftIdempotentOhneException(@TempDir Path dir) {
String jdbcUrl = jdbcUrl(dir, "fall3.db");
SqliteSchemaInitializationAdapter adapter = new SqliteSchemaInitializationAdapter(jdbcUrl);
// Erster Aufruf (Fall 1)
adapter.initializeSchema();
// Zweiter Aufruf (Fall 3) darf nicht werfen
adapter.initializeSchema();
// Dritter Aufruf (Fall 3) ebenfalls idempotent
adapter.initializeSchema();
}
@Test
void fall3_folgestart_fachlicheDatenBleiben(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "fall3_data.db");
SqliteSchemaInitializationAdapter adapter = new SqliteSchemaInitializationAdapter(jdbcUrl);
adapter.initializeSchema();
// Testdatensatz einfügen
String fp = "a".repeat(64);
insertiereDocumentRecord(jdbcUrl, fp, "SUCCESS");
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
try (var ps = conn.prepareStatement(insertSql)) {
ps.setString(1, fp);
ps.executeUpdate();
}
// Second insert with same fingerprint must fail
try (var ps = conn.prepareStatement(insertSql)) {
ps.setString(1, fp);
org.junit.jupiter.api.Assertions.assertThrows(
SQLException.class, ps::executeUpdate,
"Expected UNIQUE constraint violation on document_record.fingerprint");
}
// Folgestart
adapter.initializeSchema();
// Daten müssen erhalten bleiben
assertThat(leseStatus(jdbcUrl, fp)).isEqualTo("SUCCESS");
}
// -------------------------------------------------------------------------
// PRAGMA foreign_keys Foreign-Key-Aktivierung via DataSource
// -------------------------------------------------------------------------
@Test
void foreignKeys_sindNachSchemaInitAktiv(@TempDir Path dir) throws Exception {
String jdbcUrl = jdbcUrl(dir, "fk_test.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
// Neue Verbindung über SQLiteConfig aufbauen (wie der Adapter es tut)
org.sqlite.SQLiteConfig config = new org.sqlite.SQLiteConfig();
config.enforceForeignKeys(true);
org.sqlite.SQLiteDataSource ds = new org.sqlite.SQLiteDataSource(config);
ds.setUrl(jdbcUrl);
try (Connection conn = ds.getConnection();
var stmt = conn.createStatement()) {
// PRAGMA foreign_keys muss 1 zurückliefern
ResultSet rs = stmt.executeQuery("PRAGMA foreign_keys");
assertThat(rs.next()).isTrue();
assertThat(rs.getInt(1)).isEqualTo(1);
}
}
@Test
void foreignKeys_verletzungWirdDurchgesetzt(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "fk_enforced.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
// Versuch, einen processing_attempt ohne passendem document_record einzufügen
org.sqlite.SQLiteConfig config = new org.sqlite.SQLiteConfig();
config.enforceForeignKeys(true);
org.sqlite.SQLiteDataSource ds = new org.sqlite.SQLiteDataSource(config);
ds.setUrl(jdbcUrl);
try (Connection conn = ds.getConnection()) {
assertThatThrownBy(() -> {
try (var ps = conn.prepareStatement("""
INSERT INTO processing_attempt
(fingerprint, run_id, attempt_number, started_at, ended_at, status, retryable)
VALUES ('nichtvorhanden', 'run-1', 1, '2026-01-01T00:00:00Z',
'2026-01-01T00:01:00Z', 'FAILED_RETRYABLE', 1)
""")) {
ps.executeUpdate();
}
}).isInstanceOf(SQLException.class);
}
}
// -------------------------------------------------------------------------
// Unique constraint: (fingerprint, attempt_number) in processing_attempt
// Eindeutigkeits-Constraints
// -------------------------------------------------------------------------
@Test
void processingAttempt_fingerprintAttemptNumberUniqueConstraintIsEnforced(@TempDir Path dir)
void documentRecord_fingerprintUniqueConstraintWirdDurchgesetzt(@TempDir Path dir)
throws SQLException {
String jdbcUrl = jdbcUrl(dir, "attempt_unique_test.db");
String jdbcUrl = jdbcUrl(dir, "unique_dr.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String fp = "b".repeat(64);
insertiereDocumentRecord(jdbcUrl, fp, "SUCCESS");
// Insert master record first (FK)
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
try (var ps = conn.prepareStatement("""
INSERT INTO document_record
(fingerprint, last_known_source_locator, last_known_source_file_name,
overall_status, created_at, updated_at)
VALUES (?, 'loc', 'f.pdf', 'FAILED_RETRYABLE', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')
""")) {
ps.setString(1, fp);
ps.executeUpdate();
}
String attemptSql = """
INSERT INTO processing_attempt
(fingerprint, run_id, attempt_number, started_at, ended_at, status, retryable)
VALUES (?, 'run-1', 1, '2026-01-01T00:00:00Z', '2026-01-01T00:01:00Z', 'FAILED_RETRYABLE', 1)
""";
try (var ps = conn.prepareStatement(attemptSql)) {
ps.setString(1, fp);
ps.executeUpdate();
}
// Duplicate (fingerprint, attempt_number) must fail
try (var ps = conn.prepareStatement(attemptSql)) {
ps.setString(1, fp);
org.junit.jupiter.api.Assertions.assertThrows(
SQLException.class, ps::executeUpdate,
"Expected UNIQUE constraint violation on (fingerprint, attempt_number)");
}
}
// Zweiter Insert mit gleichem Fingerprint muss fehlschlagen
assertThatThrownBy(() -> insertiereDocumentRecord(jdbcUrl, fp, "SUCCESS"))
.isInstanceOf(SQLException.class);
}
// -------------------------------------------------------------------------
// Skip attempts are storable
// -------------------------------------------------------------------------
@Test
void processingAttempt_skipStatusIsStorable(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "skip_test.db");
void processingAttempt_fingerprintUndAttemptNumberUniqueConstraintWirdDurchgesetzt(
@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "unique_pa.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String fp = "c".repeat(64);
insertiereDocumentRecord(jdbcUrl, fp, "FAILED_RETRYABLE");
insertiereProcessingAttempt(jdbcUrl, fp, 1);
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
// Insert master record
try (var ps = conn.prepareStatement("""
INSERT INTO document_record
(fingerprint, last_known_source_locator, last_known_source_file_name,
overall_status, created_at, updated_at)
VALUES (?, 'loc', 'f.pdf', 'SUCCESS', '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z')
""")) {
ps.setString(1, fp);
ps.executeUpdate();
}
// Insert a SKIPPED_ALREADY_PROCESSED attempt (null failure fields, retryable=0)
try (var ps = conn.prepareStatement("""
INSERT INTO processing_attempt
(fingerprint, run_id, attempt_number, started_at, ended_at,
status, failure_class, failure_message, retryable)
VALUES (?, 'run-2', 2, '2026-01-02T00:00:00Z', '2026-01-02T00:00:01Z',
'SKIPPED_ALREADY_PROCESSED', NULL, NULL, 0)
""")) {
ps.setString(1, fp);
int rows = ps.executeUpdate();
assertThat(rows).isEqualTo(1);
}
}
// Zweiter Insert mit gleicher (fingerprint, attempt_number) muss fehlschlagen
assertThatThrownBy(() -> insertiereProcessingAttempt(jdbcUrl, fp, 1))
.isInstanceOf(SQLException.class);
}
// -------------------------------------------------------------------------
// Schema evolution — AI traceability columns
// Fehlerfall: ungültige URL
// -------------------------------------------------------------------------
@Test
void initializeSchema_addsAiTraceabilityColumnsToExistingSchema(@TempDir Path dir)
throws SQLException {
// Simulate a pre-evolution schema: create the base tables without AI columns
String jdbcUrl = jdbcUrl(dir, "evolution_test.db");
void initializeSchema_wirftDocumentPersistenceException_beiUngueltigerUrl() {
SqliteSchemaInitializationAdapter adapter =
new SqliteSchemaInitializationAdapter("keine-jdbc-url");
assertThatThrownBy(adapter::initializeSchema)
.isInstanceOf(DocumentPersistenceException.class);
}
// -------------------------------------------------------------------------
// Hilfsmethoden Schema-Erstellung für Tests
// -------------------------------------------------------------------------
/**
* Erstellt ein vollständig konformes Schema (entspricht V1-Zielschema) ohne Flyway-History.
*/
private static void erstelleKonformesSchema(String jdbcUrl) {
try (Connection conn = DriverManager.getConnection(jdbcUrl);
var stmt = conn.createStatement()) {
stmt.execute("PRAGMA foreign_keys = ON");
stmt.execute("""
CREATE TABLE IF NOT EXISTS document_record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
last_known_source_locator TEXT NOT NULL,
last_known_source_file_name TEXT NOT NULL,
overall_status TEXT NOT NULL,
content_error_count INTEGER NOT NULL DEFAULT 0,
transient_error_count INTEGER NOT NULL DEFAULT 0,
last_failure_instant TEXT,
last_success_instant TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
last_target_path TEXT,
last_target_file_name TEXT,
CONSTRAINT uq_document_record_fingerprint UNIQUE (fingerprint)
)
""");
stmt.execute("""
CREATE TABLE IF NOT EXISTS processing_attempt (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
run_id TEXT NOT NULL,
attempt_number INTEGER NOT NULL,
started_at TEXT NOT NULL,
ended_at TEXT NOT NULL,
status TEXT NOT NULL,
failure_class TEXT,
failure_message TEXT,
retryable INTEGER NOT NULL DEFAULT 0,
model_name TEXT,
prompt_identifier TEXT,
processed_page_count INTEGER,
sent_character_count INTEGER,
ai_raw_response TEXT,
ai_reasoning TEXT,
resolved_date TEXT,
date_source TEXT,
validated_title TEXT,
final_target_file_name TEXT,
ai_provider TEXT,
CONSTRAINT fk_processing_attempt_fingerprint
FOREIGN KEY (fingerprint) REFERENCES document_record (fingerprint),
CONSTRAINT uq_processing_attempt_fingerprint_number
UNIQUE (fingerprint, attempt_number)
)
""");
stmt.execute("CREATE INDEX IF NOT EXISTS idx_processing_attempt_fingerprint ON processing_attempt (fingerprint)");
stmt.execute("CREATE INDEX IF NOT EXISTS idx_processing_attempt_run_id ON processing_attempt (run_id)");
stmt.execute("CREATE INDEX IF NOT EXISTS idx_document_record_overall_status ON document_record (overall_status)");
} catch (SQLException e) {
throw new RuntimeException("Testvorbereitungsfehler: Schema konnte nicht erstellt werden", e);
}
}
/**
* Erstellt ein Schema ohne die Spalte {@code ai_provider} in {@code processing_attempt}.
*/
private static void erstelleSchemaOhneAiProvider(String jdbcUrl) {
try (Connection conn = DriverManager.getConnection(jdbcUrl);
var stmt = conn.createStatement()) {
stmt.execute("""
CREATE TABLE IF NOT EXISTS document_record (
CREATE TABLE document_record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
last_known_source_locator TEXT NOT NULL,
@@ -276,11 +399,14 @@ class SqliteSchemaInitializationAdapterTest {
last_success_instant TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
last_target_path TEXT,
last_target_file_name TEXT,
CONSTRAINT uq_document_record_fingerprint UNIQUE (fingerprint)
)
""");
// processing_attempt OHNE ai_provider
stmt.execute("""
CREATE TABLE IF NOT EXISTS processing_attempt (
CREATE TABLE processing_attempt (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
run_id TEXT NOT NULL,
@@ -290,112 +416,56 @@ class SqliteSchemaInitializationAdapterTest {
status TEXT NOT NULL,
failure_class TEXT,
failure_message TEXT,
retryable INTEGER NOT NULL DEFAULT 0
retryable INTEGER NOT NULL DEFAULT 0,
model_name TEXT,
prompt_identifier TEXT,
processed_page_count INTEGER,
sent_character_count INTEGER,
ai_raw_response TEXT,
ai_reasoning TEXT,
resolved_date TEXT,
date_source TEXT,
validated_title TEXT,
final_target_file_name TEXT
)
""");
} catch (SQLException e) {
throw new RuntimeException("Testvorbereitungsfehler", e);
}
}
// Running initializeSchema on the existing base schema must succeed (evolution)
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
Set<String> columns = readColumnNames(jdbcUrl, "processing_attempt");
assertThat(columns).contains(
"model_name", "prompt_identifier", "processed_page_count",
"sent_character_count", "ai_raw_response", "ai_reasoning",
"resolved_date", "date_source", "validated_title");
/**
* Erstellt nur die Tabelle {@code document_record} (ohne {@code processing_attempt}).
*/
private static void erstelleNurDocumentRecord(String jdbcUrl) {
try (Connection conn = DriverManager.getConnection(jdbcUrl);
var stmt = conn.createStatement()) {
stmt.execute("""
CREATE TABLE document_record (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fingerprint TEXT NOT NULL,
last_known_source_locator TEXT NOT NULL,
last_known_source_file_name TEXT NOT NULL,
overall_status TEXT NOT NULL,
content_error_count INTEGER NOT NULL DEFAULT 0,
transient_error_count INTEGER NOT NULL DEFAULT 0,
last_failure_instant TEXT,
last_success_instant TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""");
} catch (SQLException e) {
throw new RuntimeException("Testvorbereitungsfehler", e);
}
}
// -------------------------------------------------------------------------
// Status migration — earlier positive intermediate state → READY_FOR_AI
// -------------------------------------------------------------------------
@Test
void initializeSchema_migrates_legacySuccessWithoutProposal_toReadyForAi(@TempDir Path dir)
throws SQLException {
String jdbcUrl = jdbcUrl(dir, "migration_test.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
// Insert a document with SUCCESS status and no PROPOSAL_READY attempt
String fp = "d".repeat(64);
insertDocumentRecordWithStatus(jdbcUrl, fp, "SUCCESS");
// Run schema initialisation again (migration step runs every time)
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String status = readOverallStatus(jdbcUrl, fp);
assertThat(status).isEqualTo("READY_FOR_AI");
}
@Test
void initializeSchema_migration_isIdempotent(@TempDir Path dir) throws SQLException {
String jdbcUrl = jdbcUrl(dir, "migration_idempotent_test.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String fp = "e".repeat(64);
insertDocumentRecordWithStatus(jdbcUrl, fp, "SUCCESS");
// Run migration twice — must not corrupt data or throw
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String status = readOverallStatus(jdbcUrl, fp);
assertThat(status).isEqualTo("READY_FOR_AI");
}
@Test
void initializeSchema_doesNotMigrate_successWithProposalReadyAttempt(@TempDir Path dir)
throws SQLException {
String jdbcUrl = jdbcUrl(dir, "migration_proposal_test.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String fp = "f".repeat(64);
// SUCCESS document that already has a PROPOSAL_READY attempt must NOT be migrated
insertDocumentRecordWithStatus(jdbcUrl, fp, "SUCCESS");
insertAttemptWithStatus(jdbcUrl, fp, "PROPOSAL_READY");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String status = readOverallStatus(jdbcUrl, fp);
assertThat(status).isEqualTo("SUCCESS");
}
@Test
void initializeSchema_doesNotMigrate_terminalFailureStates(@TempDir Path dir)
throws SQLException {
String jdbcUrl = jdbcUrl(dir, "migration_failure_test.db");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
String fpRetryable = "1".repeat(64);
String fpFinal = "2".repeat(64);
insertDocumentRecordWithStatus(jdbcUrl, fpRetryable, "FAILED_RETRYABLE");
insertDocumentRecordWithStatus(jdbcUrl, fpFinal, "FAILED_FINAL");
new SqliteSchemaInitializationAdapter(jdbcUrl).initializeSchema();
assertThat(readOverallStatus(jdbcUrl, fpRetryable)).isEqualTo("FAILED_RETRYABLE");
assertThat(readOverallStatus(jdbcUrl, fpFinal)).isEqualTo("FAILED_FINAL");
}
// -------------------------------------------------------------------------
// Error handling
// -------------------------------------------------------------------------
@Test
void initializeSchema_throwsDocumentPersistenceException_onInvalidUrl() {
// SQLite is lenient with paths; use a truly invalid JDBC URL format
SqliteSchemaInitializationAdapter badAdapter =
new SqliteSchemaInitializationAdapter("not-a-jdbc-url-at-all");
assertThatThrownBy(badAdapter::initializeSchema)
.isInstanceOf(DocumentPersistenceException.class);
}
// -------------------------------------------------------------------------
// Helpers
// Hilfsmethoden JDBC
// -------------------------------------------------------------------------
private static String jdbcUrl(Path dir, String filename) {
return "jdbc:sqlite:" + dir.resolve(filename).toAbsolutePath();
return "jdbc:sqlite:" + dir.resolve(filename).toAbsolutePath().toString().replace('\\', '/');
}
private static Set<String> readTableNames(String jdbcUrl) throws SQLException {
@@ -411,7 +481,8 @@ class SqliteSchemaInitializationAdapterTest {
return tables;
}
private static Set<String> readColumnNames(String jdbcUrl, String tableName) throws SQLException {
private static Set<String> readColumnNames(String jdbcUrl, String tableName)
throws SQLException {
Set<String> columns = new HashSet<>();
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
DatabaseMetaData meta = conn.getMetaData();
@@ -424,7 +495,25 @@ class SqliteSchemaInitializationAdapterTest {
return columns;
}
private static void insertDocumentRecordWithStatus(String jdbcUrl, String fingerprint,
private static Set<String> readIndexNames(String jdbcUrl) throws SQLException {
Set<String> indexes = new HashSet<>();
try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
DatabaseMetaData meta = conn.getMetaData();
for (String table : new String[]{"document_record", "processing_attempt"}) {
try (ResultSet rs = meta.getIndexInfo(null, null, table, false, false)) {
while (rs.next()) {
String name = rs.getString("INDEX_NAME");
if (name != null) {
indexes.add(name.toLowerCase());
}
}
}
}
}
return indexes;
}
private static void insertiereDocumentRecord(String jdbcUrl, String fingerprint,
String status) throws SQLException {
try (Connection conn = DriverManager.getConnection(jdbcUrl);
var ps = conn.prepareStatement("""
@@ -439,21 +528,22 @@ class SqliteSchemaInitializationAdapterTest {
}
}
private static void insertAttemptWithStatus(String jdbcUrl, String fingerprint,
String status) throws SQLException {
private static void insertiereProcessingAttempt(String jdbcUrl, String fingerprint,
int attemptNumber) throws SQLException {
try (Connection conn = DriverManager.getConnection(jdbcUrl);
var ps = conn.prepareStatement("""
INSERT INTO processing_attempt
(fingerprint, run_id, attempt_number, started_at, ended_at, status, retryable)
VALUES (?, 'run-1', 1, '2026-01-01T00:00:00Z', '2026-01-01T00:01:00Z', ?, 0)
VALUES (?, 'run-1', ?, '2026-01-01T00:00:00Z', '2026-01-01T00:01:00Z',
'FAILED_RETRYABLE', 1)
""")) {
ps.setString(1, fingerprint);
ps.setString(2, status);
ps.setInt(2, attemptNumber);
ps.executeUpdate();
}
}
private static String readOverallStatus(String jdbcUrl, String fingerprint) throws SQLException {
private static String leseStatus(String jdbcUrl, String fingerprint) throws SQLException {
try (Connection conn = DriverManager.getConnection(jdbcUrl);
var ps = conn.prepareStatement(
"SELECT overall_status FROM document_record WHERE fingerprint = ?")) {
@@ -462,7 +552,7 @@ class SqliteSchemaInitializationAdapterTest {
if (rs.next()) {
return rs.getString("overall_status");
}
throw new IllegalStateException("No document record found for fingerprint: " + fingerprint);
throw new IllegalStateException("Kein Eintrag für Fingerprint: " + fingerprint);
}
}
}