Ergaenze zweiten GUI-Tab fuer Verarbeitungslauf mit Live-Fortschritt
- Fuehrt neuen Inbound-Adapter-Subpfad batchrun/ mit Tab, Koordinator, Launcher-Port und Ergebniszeilen-Model ein; der Batch-Lauf laeuft auf einem Hintergrund-Worker, UI-Updates ausschliesslich via FX-Dispatcher. - Ergaenzt application.port.in um BatchRunProgressObserver, BatchRunCancellationToken, DocumentCompletionEvent/-Status und RunSummary; DefaultBatchRunProcessingUseCase und DocumentProcessingCoordinator melden Lauf-/Dokument-Ereignisse an den Beobachter und unterstuetzen Soft-Stop zwischen Kandidaten. - Verdrahtet BootstrapRunner so, dass die GUI den vollstaendigen Headless-Pipelinepfad (Migration, Validierung, Schema-Init, Lock, Use-Case) mit Observer und Cancellation ausfuehrt; headless-Verhalten bleibt unveraendert. - Editor-Workspace bettet den zweiten Tab ein, sperrt Tab 1 mit Hinweisbanner waehrend eines Laufs und fragt den Benutzer beim Schliessen waehrend eines laufenden Batches. - Fuegt Tests fuer Observer-Wiring, Koordinator-Lebenszyklus und Tab-Smoke-Verhalten ein; aktualisiert die GUI-Bedienanleitung und docs/betrieb.md auf den neuen Tab. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+40
@@ -0,0 +1,40 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.port.in;
|
||||
|
||||
/**
|
||||
* Inbound cooperative cancellation token for a running batch.
|
||||
* <p>
|
||||
* The application layer consults the token at safe points between candidates to decide
|
||||
* whether the run should stop before starting the next candidate. The current candidate
|
||||
* is always processed to completion before the token is honoured (soft-stop semantics).
|
||||
* <p>
|
||||
* Implementations are typically shared between an inbound adapter (which sets the
|
||||
* cancellation request) and the use case (which polls it). They must be safe to read from
|
||||
* the batch thread while being written concurrently by the adapter thread.
|
||||
*
|
||||
* <h2>Default implementation</h2>
|
||||
* <p>
|
||||
* Callers that do not need cancellation (e.g. the headless batch entry point) supply
|
||||
* {@link #neverCancelled()} as the token.
|
||||
*/
|
||||
public interface BatchRunCancellationToken {
|
||||
|
||||
/**
|
||||
* Returns {@code true} if a cancellation has been requested and the batch should
|
||||
* stop before starting the next candidate.
|
||||
* <p>
|
||||
* Must be cheap to call; may be polled repeatedly during a run.
|
||||
*
|
||||
* @return {@code true} if the run should stop as soon as practical, {@code false}
|
||||
* otherwise
|
||||
*/
|
||||
boolean isCancellationRequested();
|
||||
|
||||
/**
|
||||
* Returns a singleton token that never reports a cancellation request.
|
||||
*
|
||||
* @return a non-null token that always returns {@code false}
|
||||
*/
|
||||
static BatchRunCancellationToken neverCancelled() {
|
||||
return NeverCancelledBatchRunCancellationToken.INSTANCE;
|
||||
}
|
||||
}
|
||||
+80
@@ -0,0 +1,80 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.port.in;
|
||||
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.RunId;
|
||||
|
||||
/**
|
||||
* Inbound observer port that receives progress callbacks over the life of a single
|
||||
* batch run.
|
||||
* <p>
|
||||
* The observer is an optional collaborator that an inbound adapter (e.g. a GUI) may
|
||||
* supply to follow a batch run in near real time. The callbacks never carry persistence
|
||||
* details; they only describe observable events at the use-case boundary.
|
||||
*
|
||||
* <h2>Invocation order</h2>
|
||||
* <p>
|
||||
* For a single run the observer is invoked in this order:
|
||||
* <ol>
|
||||
* <li>{@link #onRunStarted(RunId, int)} exactly once, once the total candidate count
|
||||
* is known (i.e. after the source folder scan succeeded and before the first
|
||||
* candidate is processed).</li>
|
||||
* <li>{@link #onDocumentCompleted(DocumentCompletionEvent)} once per candidate whose
|
||||
* processing reached a terminal resolution.</li>
|
||||
* <li>{@link #onRunEnded(RunSummary)} exactly once after the processing loop has
|
||||
* finished (normally, after a cancellation, or after a hard run-level error).</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h2>Threading</h2>
|
||||
* <p>
|
||||
* Callbacks are invoked on the thread executing the batch run. Inbound adapters that
|
||||
* drive a UI must themselves dispatch any UI updates onto the appropriate UI thread
|
||||
* and must not block the reporting thread.
|
||||
*
|
||||
* <h2>Exception handling</h2>
|
||||
* <p>
|
||||
* Implementations must not throw checked exceptions. Runtime exceptions thrown by an
|
||||
* observer are caught by the application layer and logged; they never affect the batch
|
||||
* run outcome or alter persistence behaviour.
|
||||
*/
|
||||
public interface BatchRunProgressObserver {
|
||||
|
||||
/**
|
||||
* Invoked once when the run has determined how many candidates will be processed.
|
||||
*
|
||||
* @param runId identifier of the run; never {@code null}
|
||||
* @param totalCandidates total number of candidates detected in the source folder
|
||||
* at scan time; never negative
|
||||
*/
|
||||
void onRunStarted(RunId runId, int totalCandidates);
|
||||
|
||||
/**
|
||||
* Invoked once per candidate whose processing reached a terminal resolution.
|
||||
* <p>
|
||||
* The event is emitted after persistence has been attempted for the candidate, so
|
||||
* observers may rely on the reported status matching the persisted attempt status
|
||||
* for that candidate.
|
||||
*
|
||||
* @param event description of the candidate result; never {@code null}
|
||||
*/
|
||||
void onDocumentCompleted(DocumentCompletionEvent event);
|
||||
|
||||
/**
|
||||
* Invoked once after the processing loop has finished, regardless of whether the
|
||||
* run completed normally, was cancelled via a {@link BatchRunCancellationToken},
|
||||
* or aborted due to a hard run-level error after the start callback fired.
|
||||
*
|
||||
* @param summary aggregated outcome counts; never {@code null}
|
||||
*/
|
||||
void onRunEnded(RunSummary summary);
|
||||
|
||||
/**
|
||||
* Returns a singleton observer that silently ignores all callbacks.
|
||||
* <p>
|
||||
* Used as the default observer for callers that do not need progress notifications
|
||||
* (e.g. the headless batch entry point).
|
||||
*
|
||||
* @return a non-null no-op observer
|
||||
*/
|
||||
static BatchRunProgressObserver noOp() {
|
||||
return NoOpBatchRunProgressObserver.INSTANCE;
|
||||
}
|
||||
}
|
||||
+60
@@ -0,0 +1,60 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.port.in;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDate;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Immutable event describing the outcome of processing exactly one candidate document.
|
||||
* <p>
|
||||
* Emitted by the application layer at every terminal resolution point of a candidate
|
||||
* (success, retryable failure, permanent failure, skip). Observers may use this event
|
||||
* to update a live progress view, write an audit record, or drive a UI list.
|
||||
* <p>
|
||||
* The event is deliberately decoupled from persistence types: it carries only what an
|
||||
* external observer needs to display or correlate a single candidate result.
|
||||
*
|
||||
* @param originalFileName the source candidate's unique identifier (typically the source
|
||||
* filename); never {@code null} or blank
|
||||
* @param status the aggregated outcome status; never {@code null}
|
||||
* @param finalFileName the final target filename, including any duplicate suffix;
|
||||
* never {@code null} for {@link DocumentCompletionStatus#SUCCESS},
|
||||
* always {@code null} for all other statuses
|
||||
* @param resolvedDate the resolved date of the naming proposal; never {@code null}
|
||||
* for {@link DocumentCompletionStatus#SUCCESS}, always {@code null}
|
||||
* for skip events. May be {@code null} for failure events.
|
||||
* @param aiReasoning the AI reasoning text associated with the naming proposal, if
|
||||
* any is available for this candidate (may be present on success
|
||||
* and on some failure paths where an AI call had previously
|
||||
* produced a reasoning); {@code null} when no reasoning exists
|
||||
* @param processingDuration the wall-clock duration spent on this candidate in the current
|
||||
* run; never {@code null} and never negative
|
||||
*/
|
||||
public record DocumentCompletionEvent(
|
||||
String originalFileName,
|
||||
DocumentCompletionStatus status,
|
||||
String finalFileName,
|
||||
LocalDate resolvedDate,
|
||||
String aiReasoning,
|
||||
Duration processingDuration) {
|
||||
|
||||
/**
|
||||
* Compact constructor validating mandatory fields.
|
||||
*
|
||||
* @throws NullPointerException if {@code originalFileName}, {@code status} or
|
||||
* {@code processingDuration} is {@code null}
|
||||
* @throws IllegalArgumentException if {@code originalFileName} is blank or
|
||||
* {@code processingDuration} is negative
|
||||
*/
|
||||
public DocumentCompletionEvent {
|
||||
Objects.requireNonNull(originalFileName, "originalFileName must not be null");
|
||||
if (originalFileName.isBlank()) {
|
||||
throw new IllegalArgumentException("originalFileName must not be blank");
|
||||
}
|
||||
Objects.requireNonNull(status, "status must not be null");
|
||||
Objects.requireNonNull(processingDuration, "processingDuration must not be null");
|
||||
if (processingDuration.isNegative()) {
|
||||
throw new IllegalArgumentException("processingDuration must not be negative");
|
||||
}
|
||||
}
|
||||
}
|
||||
+43
@@ -0,0 +1,43 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.port.in;
|
||||
|
||||
/**
|
||||
* Aggregated status classification reported to
|
||||
* {@link BatchRunProgressObserver#onDocumentCompleted(DocumentCompletionEvent)}
|
||||
* for one processed candidate.
|
||||
* <p>
|
||||
* This enum collapses the finer-grained internal processing status into the four
|
||||
* buckets that an observer (e.g. a GUI progress view) needs to distinguish:
|
||||
* successful completion, retryable failure, permanent failure, and an explicit
|
||||
* skip.
|
||||
* <p>
|
||||
* This classification is purely an observability concern — persistence,
|
||||
* retry decisions, and all other processing rules continue to work against the
|
||||
* detailed internal status.
|
||||
*/
|
||||
public enum DocumentCompletionStatus {
|
||||
|
||||
/**
|
||||
* The candidate was successfully renamed; the target copy is in place and the
|
||||
* persistence is consistent.
|
||||
*/
|
||||
SUCCESS,
|
||||
|
||||
/**
|
||||
* The candidate failed in the current run but will be retried in a later run
|
||||
* (transient technical error, not yet at the retry limit, or a first deterministic
|
||||
* content error).
|
||||
*/
|
||||
FAILED_RETRYABLE,
|
||||
|
||||
/**
|
||||
* The candidate failed permanently and will not be retried in later runs
|
||||
* (content error recorded twice, or transient retry budget exhausted).
|
||||
*/
|
||||
FAILED_PERMANENT,
|
||||
|
||||
/**
|
||||
* The candidate was skipped because it was already in a terminal state (either
|
||||
* previously successful or previously finally failed).
|
||||
*/
|
||||
SKIPPED
|
||||
}
|
||||
+21
@@ -0,0 +1,21 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.port.in;
|
||||
|
||||
/**
|
||||
* Shared singleton token returned by
|
||||
* {@link BatchRunCancellationToken#neverCancelled()}.
|
||||
* <p>
|
||||
* Not intended for direct instantiation by callers.
|
||||
*/
|
||||
final class NeverCancelledBatchRunCancellationToken implements BatchRunCancellationToken {
|
||||
|
||||
static final NeverCancelledBatchRunCancellationToken INSTANCE =
|
||||
new NeverCancelledBatchRunCancellationToken();
|
||||
|
||||
private NeverCancelledBatchRunCancellationToken() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCancellationRequested() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
+32
@@ -0,0 +1,32 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.port.in;
|
||||
|
||||
import de.gecheckt.pdf.umbenenner.domain.model.RunId;
|
||||
|
||||
/**
|
||||
* Shared singleton no-op implementation of {@link BatchRunProgressObserver}.
|
||||
* <p>
|
||||
* Returned by {@link BatchRunProgressObserver#noOp()}; not intended for direct
|
||||
* instantiation by callers.
|
||||
*/
|
||||
final class NoOpBatchRunProgressObserver implements BatchRunProgressObserver {
|
||||
|
||||
static final NoOpBatchRunProgressObserver INSTANCE = new NoOpBatchRunProgressObserver();
|
||||
|
||||
private NoOpBatchRunProgressObserver() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRunStarted(RunId runId, int totalCandidates) {
|
||||
// intentionally empty
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDocumentCompleted(DocumentCompletionEvent event) {
|
||||
// intentionally empty
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRunEnded(RunSummary summary) {
|
||||
// intentionally empty
|
||||
}
|
||||
}
|
||||
+42
@@ -0,0 +1,42 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.port.in;
|
||||
|
||||
/**
|
||||
* Aggregated outcome counts of a complete batch run, reported once at the end of the run
|
||||
* to {@link BatchRunProgressObserver#onRunEnded(RunSummary)}.
|
||||
* <p>
|
||||
* The three counts are independent, non-negative and sum up to the total number of
|
||||
* candidates that were processed in the run (possibly fewer than the originally detected
|
||||
* candidate count if the run was cancelled mid-way).
|
||||
*
|
||||
* @param successCount number of candidates that completed with
|
||||
* {@link DocumentCompletionStatus#SUCCESS}; must be ≥ 0
|
||||
* @param failedCount number of candidates that completed with either
|
||||
* {@link DocumentCompletionStatus#FAILED_RETRYABLE} or
|
||||
* {@link DocumentCompletionStatus#FAILED_PERMANENT}; must be ≥ 0
|
||||
* @param skippedCount number of candidates that completed with
|
||||
* {@link DocumentCompletionStatus#SKIPPED}; must be ≥ 0
|
||||
*/
|
||||
public record RunSummary(int successCount, int failedCount, int skippedCount) {
|
||||
|
||||
/**
|
||||
* Compact constructor enforcing non-negative counts.
|
||||
*
|
||||
* @throws IllegalArgumentException if any count is negative
|
||||
*/
|
||||
public RunSummary {
|
||||
if (successCount < 0 || failedCount < 0 || skippedCount < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"RunSummary counts must not be negative; was: "
|
||||
+ successCount + "/" + failedCount + "/" + skippedCount);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the total number of candidates reflected in this summary.
|
||||
*
|
||||
* @return {@code successCount + failedCount + skippedCount}; never negative
|
||||
*/
|
||||
public int totalProcessed() {
|
||||
return successCount + failedCount + skippedCount;
|
||||
}
|
||||
}
|
||||
+12
@@ -16,6 +16,18 @@
|
||||
* — Structured result of a batch run, designed for exit code mapping</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* Progress observation (for interactive inbound adapters):
|
||||
* <ul>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.port.in.BatchRunProgressObserver}
|
||||
* — Optional observer that receives per-run and per-candidate callbacks during a run</li>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.port.in.BatchRunCancellationToken}
|
||||
* — Optional cooperative cancellation token polled between candidates</li>
|
||||
* <li>{@link de.gecheckt.pdf.umbenenner.application.port.in.DocumentCompletionEvent},
|
||||
* {@link de.gecheckt.pdf.umbenenner.application.port.in.DocumentCompletionStatus},
|
||||
* {@link de.gecheckt.pdf.umbenenner.application.port.in.RunSummary}
|
||||
* — Event and summary value types carried to the observer</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* Architecture Rule: Inbound ports are independent of implementation and contain no business logic.
|
||||
* They define "what can be done to the application". All dependencies point inward;
|
||||
* adapters depend on ports, not vice versa.
|
||||
|
||||
+159
-4
@@ -1,10 +1,14 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.service;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.DocumentCompletionEvent;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.DocumentCompletionStatus;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentKnownProcessable;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentPersistenceException;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.DocumentRecord;
|
||||
@@ -158,6 +162,20 @@ public class DocumentProcessingCoordinator {
|
||||
private final int maxTitleLength;
|
||||
private final String activeProviderIdentifier;
|
||||
|
||||
/**
|
||||
* Optional per-run completion forwarder that is consulted by
|
||||
* {@link #publishCompletion(SourceDocumentCandidate, DocumentCompletionStatus, String,
|
||||
* LocalDate, String, Instant, Instant)} whenever a terminal candidate outcome is reached.
|
||||
* <p>
|
||||
* Assigned by the inbound use case for the duration of a single run and cleared before the
|
||||
* use case returns. A {@code null} value means no external observer is attached and the
|
||||
* completion event is dropped silently — the default for headless callers.
|
||||
* <p>
|
||||
* Accessed from the batch thread only. Not volatile because installation and read occur
|
||||
* on the same thread (the one executing the batch).
|
||||
*/
|
||||
private Consumer<DocumentCompletionEvent> completionForwarder;
|
||||
|
||||
/**
|
||||
* Creates the document processing coordinator with all required ports, logger,
|
||||
* the transient retry limit, the configured maximum base title length, and the
|
||||
@@ -235,6 +253,25 @@ public class DocumentProcessingCoordinator {
|
||||
this.maxRetriesTransient = maxRetriesTransient;
|
||||
this.maxTitleLength = maxTitleLength;
|
||||
this.activeProviderIdentifier = activeProviderIdentifier;
|
||||
this.completionForwarder = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Installs or removes a per-run completion forwarder.
|
||||
* <p>
|
||||
* When non-null, the forwarder is consulted at every terminal candidate resolution and
|
||||
* receives a {@link DocumentCompletionEvent} describing the outcome. A {@code null} value
|
||||
* detaches any previously installed forwarder.
|
||||
* <p>
|
||||
* This method is the single seam by which inbound adapters (e.g. the JavaFX GUI) attach a
|
||||
* live-progress observer to the document coordinator without widening the coordinator's
|
||||
* constructor surface. It must only be called from the thread that will drive the batch
|
||||
* run.
|
||||
*
|
||||
* @param forwarder the new forwarder, or {@code null} to detach the current one
|
||||
*/
|
||||
public void installCompletionForwarder(Consumer<DocumentCompletionEvent> forwarder) {
|
||||
this.completionForwarder = forwarder;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -509,7 +546,7 @@ public class DocumentProcessingCoordinator {
|
||||
|
||||
return persistTargetCopySuccess(
|
||||
candidate, fingerprint, existingRecord, context, attemptStart, now,
|
||||
resolvedFilename, targetFolderLocator);
|
||||
resolvedFilename, targetFolderLocator, proposalAttempt);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -518,7 +555,16 @@ public class DocumentProcessingCoordinator {
|
||||
* If the atomic persistence fails after the copy has already been written, a
|
||||
* best-effort rollback of the target file is attempted and
|
||||
* {@link ProcessingStatus#FAILED_RETRYABLE} is persisted instead.
|
||||
* <p>
|
||||
* On successful persistence, a terminal completion event is published to the attached
|
||||
* {@link BatchRunProgressObserver}; the event carries the resolved final filename,
|
||||
* the date and reasoning taken from the authoritative {@code PROPOSAL_READY} attempt.
|
||||
* On persistence failure the completion event is published by
|
||||
* {@link #persistTransientErrorAfterPersistenceFailure}.
|
||||
*
|
||||
* @param proposalAttempt the authoritative naming-proposal attempt used to populate
|
||||
* the observer event's date and reasoning fields; must not be
|
||||
* {@code null}
|
||||
* @return true if SUCCESS was persisted; false if persistence itself failed
|
||||
*/
|
||||
private boolean persistTargetCopySuccess(
|
||||
@@ -529,7 +575,8 @@ public class DocumentProcessingCoordinator {
|
||||
Instant attemptStart,
|
||||
Instant now,
|
||||
String resolvedFilename,
|
||||
String targetFolderLocator) {
|
||||
String targetFolderLocator,
|
||||
ProcessingAttempt proposalAttempt) {
|
||||
|
||||
try {
|
||||
int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint);
|
||||
@@ -550,6 +597,11 @@ public class DocumentProcessingCoordinator {
|
||||
|
||||
logger.info("Document '{}' successfully processed. Target: '{}'.",
|
||||
candidate.uniqueIdentifier(), resolvedFilename);
|
||||
publishCompletion(candidate, DocumentCompletionStatus.SUCCESS,
|
||||
resolvedFilename,
|
||||
proposalAttempt.resolvedDate(),
|
||||
proposalAttempt.aiReasoning(),
|
||||
attemptStart, now);
|
||||
return true;
|
||||
|
||||
} catch (DocumentPersistenceException e) {
|
||||
@@ -564,7 +616,8 @@ public class DocumentProcessingCoordinator {
|
||||
candidate, fingerprint, existingRecord, context, attemptStart,
|
||||
Instant.now(),
|
||||
"Persistence failed after successful target copy (best-effort rollback attempted): "
|
||||
+ e.getMessage());
|
||||
+ e.getMessage(),
|
||||
proposalAttempt);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -628,6 +681,10 @@ public class DocumentProcessingCoordinator {
|
||||
candidate.uniqueIdentifier(), fingerprint.sha256Hex(),
|
||||
updatedCounters.transientErrorCount(), maxRetriesTransient);
|
||||
}
|
||||
publishCompletion(candidate,
|
||||
retryable ? DocumentCompletionStatus.FAILED_RETRYABLE
|
||||
: DocumentCompletionStatus.FAILED_PERMANENT,
|
||||
null, null, null, attemptStart, now);
|
||||
return true;
|
||||
|
||||
} catch (DocumentPersistenceException persistEx) {
|
||||
@@ -654,7 +711,8 @@ public class DocumentProcessingCoordinator {
|
||||
BatchRunContext context,
|
||||
Instant attemptStart,
|
||||
Instant now,
|
||||
String errorMessage) {
|
||||
String errorMessage,
|
||||
ProcessingAttempt proposalAttempt) {
|
||||
|
||||
ProcessingOutcomeTransition.ProcessingOutcome transition =
|
||||
ProcessingOutcomeTransition.forKnownDocument(
|
||||
@@ -664,6 +722,7 @@ public class DocumentProcessingCoordinator {
|
||||
FailureCounters updatedCounters = transition.counters();
|
||||
ProcessingStatus errorStatus = transition.overallStatus();
|
||||
|
||||
boolean secondaryPersisted = false;
|
||||
try {
|
||||
int attemptNumber = processingAttemptRepository.loadNextAttemptNumber(fingerprint);
|
||||
ProcessingAttempt errorAttempt = ProcessingAttempt.withoutAiFields(
|
||||
@@ -679,11 +738,28 @@ public class DocumentProcessingCoordinator {
|
||||
txOps.saveProcessingAttempt(errorAttempt);
|
||||
txOps.updateDocumentRecord(errorRecord);
|
||||
});
|
||||
secondaryPersisted = true;
|
||||
|
||||
} catch (DocumentPersistenceException secondaryEx) {
|
||||
logger.error("Secondary persistence failure for '{}' after target copy rollback: {}",
|
||||
candidate.uniqueIdentifier(), secondaryEx.getMessage(), secondaryEx);
|
||||
}
|
||||
|
||||
// Observer notification: even when secondary persistence itself failed the candidate's
|
||||
// terminal resolution in this run is still a copy/persistence failure. Emitting a single
|
||||
// completion event keeps the observer in sync with the user-visible state even though
|
||||
// nothing new was persisted.
|
||||
String reasoning = proposalAttempt != null ? proposalAttempt.aiReasoning() : null;
|
||||
publishCompletion(candidate,
|
||||
transition.retryable()
|
||||
? DocumentCompletionStatus.FAILED_RETRYABLE
|
||||
: DocumentCompletionStatus.FAILED_PERMANENT,
|
||||
null, null, reasoning, attemptStart, now);
|
||||
|
||||
if (!secondaryPersisted) {
|
||||
logger.debug("Completion for '{}' reported without secondary persistence record.",
|
||||
candidate.uniqueIdentifier());
|
||||
}
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
@@ -721,6 +797,8 @@ public class DocumentProcessingCoordinator {
|
||||
|
||||
logger.debug("Skip attempt #{} persisted for '{}' with status {}.",
|
||||
attemptNumber, candidate.uniqueIdentifier(), skipStatus);
|
||||
publishCompletion(candidate, DocumentCompletionStatus.SKIPPED,
|
||||
null, null, null, attemptStart, now);
|
||||
return true;
|
||||
|
||||
} catch (DocumentPersistenceException e) {
|
||||
@@ -985,6 +1063,13 @@ public class DocumentProcessingCoordinator {
|
||||
outcome.counters().contentErrorCount(),
|
||||
outcome.counters().transientErrorCount());
|
||||
}
|
||||
// Pipeline-path terminal resolutions are reported to the progress observer.
|
||||
// PROPOSAL_READY is an intermediate state; the subsequent finalisation publishes
|
||||
// the actual completion event (SUCCESS or transient-error failure).
|
||||
if (outcome.overallStatus() != ProcessingStatus.PROPOSAL_READY) {
|
||||
publishCompletion(candidate, toCompletionStatus(outcome),
|
||||
null, null, null, attemptStart, now);
|
||||
}
|
||||
return true;
|
||||
|
||||
} catch (DocumentPersistenceException e) {
|
||||
@@ -1097,4 +1182,74 @@ public class DocumentProcessingCoordinator {
|
||||
|
||||
return base + detail;
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Progress observer dispatch
|
||||
// =========================================================================
|
||||
|
||||
/**
|
||||
* Publishes a single terminal completion event for the candidate to the attached
|
||||
* {@link BatchRunProgressObserver}.
|
||||
* <p>
|
||||
* Must be called exactly once per terminal resolution of a candidate (success, retryable
|
||||
* failure, permanent failure, skip). Intermediate states such as
|
||||
* {@link de.gecheckt.pdf.umbenenner.domain.model.ProcessingStatus#PROPOSAL_READY} must not
|
||||
* produce a completion event.
|
||||
* <p>
|
||||
* Any runtime exception thrown by the observer is caught and logged at warn level and must
|
||||
* not affect persistence or batch flow.
|
||||
*
|
||||
* @param candidate the candidate being reported; must not be null
|
||||
* @param status the aggregated completion status; must not be null
|
||||
* @param finalFileName the final target filename on success; {@code null} otherwise
|
||||
* @param resolvedDate the resolved date on success; may be {@code null} otherwise
|
||||
* @param aiReasoning the AI reasoning when one is available for this result;
|
||||
* {@code null} otherwise
|
||||
* @param startInstant the moment processing of the candidate began in this run
|
||||
* @param endInstant the moment the terminal resolution was reached
|
||||
*/
|
||||
private void publishCompletion(
|
||||
SourceDocumentCandidate candidate,
|
||||
DocumentCompletionStatus status,
|
||||
String finalFileName,
|
||||
LocalDate resolvedDate,
|
||||
String aiReasoning,
|
||||
Instant startInstant,
|
||||
Instant endInstant) {
|
||||
Consumer<DocumentCompletionEvent> forwarder = completionForwarder;
|
||||
if (forwarder == null) {
|
||||
return;
|
||||
}
|
||||
Duration duration = Duration.between(startInstant, endInstant);
|
||||
if (duration.isNegative()) {
|
||||
duration = Duration.ZERO;
|
||||
}
|
||||
try {
|
||||
forwarder.accept(new DocumentCompletionEvent(
|
||||
candidate.uniqueIdentifier(),
|
||||
status,
|
||||
finalFileName,
|
||||
resolvedDate,
|
||||
aiReasoning,
|
||||
duration));
|
||||
} catch (RuntimeException forwarderFailure) {
|
||||
logger.warn("Progress forwarder threw while reporting completion for '{}': {}",
|
||||
candidate.uniqueIdentifier(), forwarderFailure.getMessage(), forwarderFailure);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps the aggregated retryable/terminal semantics of a pipeline-path persistence outcome
|
||||
* to the observer-level {@link DocumentCompletionStatus}.
|
||||
* <p>
|
||||
* Callers guarantee that the outcome does not represent
|
||||
* {@link de.gecheckt.pdf.umbenenner.domain.model.ProcessingStatus#PROPOSAL_READY} — that
|
||||
* intermediate state is never reported as a completion.
|
||||
*/
|
||||
private static DocumentCompletionStatus toCompletionStatus(
|
||||
ProcessingOutcomeTransition.ProcessingOutcome outcome) {
|
||||
return outcome.retryable()
|
||||
? DocumentCompletionStatus.FAILED_RETRYABLE
|
||||
: DocumentCompletionStatus.FAILED_PERMANENT;
|
||||
}
|
||||
}
|
||||
|
||||
+87
@@ -0,0 +1,87 @@
|
||||
package de.gecheckt.pdf.umbenenner.application.usecase;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.BatchRunProgressObserver;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.DocumentCompletionEvent;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.DocumentCompletionStatus;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.RunSummary;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.ProcessingLogger;
|
||||
|
||||
/**
|
||||
* Internal per-run adapter that forwards every
|
||||
* {@link DocumentCompletionEvent} emitted by the
|
||||
* {@link de.gecheckt.pdf.umbenenner.application.service.DocumentProcessingCoordinator}
|
||||
* to the configured {@link BatchRunProgressObserver}, while accumulating outcome counts
|
||||
* for the run's final {@link RunSummary}.
|
||||
* <p>
|
||||
* Used only by {@link DefaultBatchRunProcessingUseCase} for the lifetime of a single run.
|
||||
* Not thread-safe: all invocations must occur on the batch thread.
|
||||
*/
|
||||
final class CountingCompletionObserver implements Consumer<DocumentCompletionEvent> {
|
||||
|
||||
private final BatchRunProgressObserver observer;
|
||||
private final ProcessingLogger logger;
|
||||
private int successCount;
|
||||
private int failedCount;
|
||||
private int skippedCount;
|
||||
|
||||
CountingCompletionObserver(BatchRunProgressObserver observer, ProcessingLogger logger) {
|
||||
this.observer = Objects.requireNonNull(observer, "observer must not be null");
|
||||
this.logger = Objects.requireNonNull(logger, "logger must not be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(DocumentCompletionEvent event) {
|
||||
Objects.requireNonNull(event, "event must not be null");
|
||||
switch (event.status()) {
|
||||
case SUCCESS -> successCount++;
|
||||
case FAILED_RETRYABLE, FAILED_PERMANENT -> failedCount++;
|
||||
case SKIPPED -> skippedCount++;
|
||||
default -> {
|
||||
// Defensive — new status values would be a programming error.
|
||||
throw new IllegalStateException(
|
||||
"Unexpected DocumentCompletionStatus: " + event.status());
|
||||
}
|
||||
}
|
||||
try {
|
||||
observer.onDocumentCompleted(event);
|
||||
} catch (RuntimeException e) {
|
||||
logger.warn("Progress observer threw on onDocumentCompleted for '{}': {}",
|
||||
event.originalFileName(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
RunSummary summary() {
|
||||
return new RunSummary(successCount, failedCount, skippedCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the completion status counts collected so far, including the terminal
|
||||
* contribution of the candidate currently being reported.
|
||||
*/
|
||||
int successCount() {
|
||||
return successCount;
|
||||
}
|
||||
|
||||
int failedCount() {
|
||||
return failedCount;
|
||||
}
|
||||
|
||||
int skippedCount() {
|
||||
return skippedCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Visible for tests that verify the mapping of completion statuses to summary buckets.
|
||||
*/
|
||||
static RunSummary summaryOf(int successCount, int failedCount, int skippedCount) {
|
||||
return new RunSummary(successCount, failedCount, skippedCount);
|
||||
}
|
||||
|
||||
/** Test hook to confirm the status classification. */
|
||||
static DocumentCompletionStatus classify(DocumentCompletionStatus status) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
+97
-4
@@ -5,8 +5,13 @@ import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import de.gecheckt.pdf.umbenenner.application.config.RuntimeConfiguration;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.BatchRunCancellationToken;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.BatchRunOutcome;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.BatchRunProcessingUseCase;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.BatchRunProgressObserver;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.DocumentCompletionEvent;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.DocumentCompletionStatus;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.in.RunSummary;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.FingerprintPort;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.FingerprintResult;
|
||||
import de.gecheckt.pdf.umbenenner.application.port.out.FingerprintSuccess;
|
||||
@@ -80,6 +85,8 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
private final DocumentProcessingCoordinator documentProcessingCoordinator;
|
||||
private final AiNamingService aiNamingService;
|
||||
private final ProcessingLogger logger;
|
||||
private final BatchRunProgressObserver progressObserver;
|
||||
private final BatchRunCancellationToken cancellationToken;
|
||||
|
||||
/**
|
||||
* Creates the batch use case with the runtime configuration and all required ports for the flow.
|
||||
@@ -112,6 +119,46 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
DocumentProcessingCoordinator documentProcessingCoordinator,
|
||||
AiNamingService aiNamingService,
|
||||
ProcessingLogger logger) {
|
||||
this(runtimeConfiguration, runLockPort, sourceDocumentCandidatesPort, pdfTextExtractionPort,
|
||||
fingerprintPort, documentProcessingCoordinator, aiNamingService, logger,
|
||||
BatchRunProgressObserver.noOp(), BatchRunCancellationToken.neverCancelled());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates the batch use case with a progress observer and cancellation token attached.
|
||||
* <p>
|
||||
* The observer is invoked on the batch thread for run start, per-candidate completion,
|
||||
* and run end. The cancellation token is polled between candidates; a requested
|
||||
* cancellation is honoured before starting the next candidate and never interrupts a
|
||||
* candidate that is already being processed (soft-stop).
|
||||
*
|
||||
* @param runtimeConfiguration the runtime configuration; must not be null
|
||||
* @param runLockPort run-lock port; must not be null
|
||||
* @param sourceDocumentCandidatesPort candidate source port; must not be null
|
||||
* @param pdfTextExtractionPort PDF text extraction port; must not be null
|
||||
* @param fingerprintPort fingerprint port; must not be null
|
||||
* @param documentProcessingCoordinator per-document coordinator; must not be null
|
||||
* @param aiNamingService AI naming service; must not be null
|
||||
* @param logger logger; must not be null
|
||||
* @param progressObserver progress observer; must not be null, use
|
||||
* {@link BatchRunProgressObserver#noOp()} when none is
|
||||
* needed
|
||||
* @param cancellationToken cancellation token; must not be null, use
|
||||
* {@link BatchRunCancellationToken#neverCancelled()}
|
||||
* when cancellation is not needed
|
||||
* @throws NullPointerException if any parameter is null
|
||||
*/
|
||||
public DefaultBatchRunProcessingUseCase(
|
||||
RuntimeConfiguration runtimeConfiguration,
|
||||
RunLockPort runLockPort,
|
||||
SourceDocumentCandidatesPort sourceDocumentCandidatesPort,
|
||||
PdfTextExtractionPort pdfTextExtractionPort,
|
||||
FingerprintPort fingerprintPort,
|
||||
DocumentProcessingCoordinator documentProcessingCoordinator,
|
||||
AiNamingService aiNamingService,
|
||||
ProcessingLogger logger,
|
||||
BatchRunProgressObserver progressObserver,
|
||||
BatchRunCancellationToken cancellationToken) {
|
||||
this.runtimeConfiguration = Objects.requireNonNull(runtimeConfiguration, "runtimeConfiguration must not be null");
|
||||
this.runLockPort = Objects.requireNonNull(runLockPort, "runLockPort must not be null");
|
||||
this.sourceDocumentCandidatesPort = Objects.requireNonNull(
|
||||
@@ -123,6 +170,8 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
documentProcessingCoordinator, "documentProcessingCoordinator must not be null");
|
||||
this.aiNamingService = Objects.requireNonNull(aiNamingService, "aiNamingService must not be null");
|
||||
this.logger = Objects.requireNonNull(logger, "logger must not be null");
|
||||
this.progressObserver = Objects.requireNonNull(progressObserver, "progressObserver must not be null");
|
||||
this.cancellationToken = Objects.requireNonNull(cancellationToken, "cancellationToken must not be null");
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -183,16 +232,60 @@ public class DefaultBatchRunProcessingUseCase implements BatchRunProcessingUseCa
|
||||
}
|
||||
logger.info("Found {} PDF candidate(s) in source folder.", candidates.size());
|
||||
|
||||
for (SourceDocumentCandidate candidate : candidates) {
|
||||
processCandidate(candidate, context);
|
||||
// Notify observer of the known candidate count up-front so observers can size their
|
||||
// progress bars. The count reflects the source folder at scan time and remains fixed
|
||||
// for the remainder of the run (also when the run is cancelled early).
|
||||
try {
|
||||
progressObserver.onRunStarted(context.runId(), candidates.size());
|
||||
} catch (RuntimeException e) {
|
||||
logger.warn("Progress observer threw on onRunStarted: {}", e.getMessage(), e);
|
||||
}
|
||||
|
||||
logger.info("Batch run completed. Processed {} candidate(s). RunId: {}",
|
||||
candidates.size(), context.runId());
|
||||
// Wrap the user-supplied observer so the per-run summary can be computed by counting
|
||||
// forwarded completion events.
|
||||
CountingCompletionObserver forwardingObserver =
|
||||
new CountingCompletionObserver(progressObserver, logger);
|
||||
documentProcessingCoordinator.installCompletionForwarder(forwardingObserver);
|
||||
try {
|
||||
int processedCount = 0;
|
||||
boolean cancelled = false;
|
||||
for (SourceDocumentCandidate candidate : candidates) {
|
||||
if (cancellationTokenRequested()) {
|
||||
cancelled = true;
|
||||
logger.info("Cancellation requested before processing next candidate. "
|
||||
+ "Stopping batch run. RunId: {}, processed {}/{} candidate(s).",
|
||||
context.runId(), processedCount, candidates.size());
|
||||
break;
|
||||
}
|
||||
processCandidate(candidate, context);
|
||||
processedCount++;
|
||||
}
|
||||
|
||||
logger.info("Batch run {}. Processed {} candidate(s). RunId: {}",
|
||||
cancelled ? "cancelled" : "completed",
|
||||
processedCount, context.runId());
|
||||
} finally {
|
||||
documentProcessingCoordinator.installCompletionForwarder(null);
|
||||
try {
|
||||
progressObserver.onRunEnded(forwardingObserver.summary());
|
||||
} catch (RuntimeException e) {
|
||||
logger.warn("Progress observer threw on onRunEnded: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
return BatchRunOutcome.SUCCESS;
|
||||
}
|
||||
|
||||
private boolean cancellationTokenRequested() {
|
||||
try {
|
||||
return cancellationToken.isCancellationRequested();
|
||||
} catch (RuntimeException e) {
|
||||
logger.warn("Cancellation token threw while being polled; treating as not cancelled: {}",
|
||||
e.getMessage(), e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the run lock if it was previously acquired.
|
||||
* <p>
|
||||
|
||||
Reference in New Issue
Block a user