diff --git a/pdf-umbenenner-adapter-in-scheduler/src/main/java/de/gecheckt/pdf/umbenenner/adapter/in/scheduler/ScheduledExecutorServiceSchedulerAdapter.java b/pdf-umbenenner-adapter-in-scheduler/src/main/java/de/gecheckt/pdf/umbenenner/adapter/in/scheduler/ScheduledExecutorServiceSchedulerAdapter.java new file mode 100644 index 0000000..7a40ff4 --- /dev/null +++ b/pdf-umbenenner-adapter-in-scheduler/src/main/java/de/gecheckt/pdf/umbenenner/adapter/in/scheduler/ScheduledExecutorServiceSchedulerAdapter.java @@ -0,0 +1,162 @@ +package de.gecheckt.pdf.umbenenner.adapter.in.scheduler; + +import de.gecheckt.pdf.umbenenner.application.port.out.BatchRunTrigger; +import de.gecheckt.pdf.umbenenner.application.port.out.BatchRunTriggerResult; +import de.gecheckt.pdf.umbenenner.application.port.out.SchedulerConfig; +import de.gecheckt.pdf.umbenenner.application.port.out.SchedulerPort; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * Implementiert {@link SchedulerPort} auf Basis eines + * {@link ScheduledExecutorService} mit + * {@link ScheduledExecutorService#scheduleWithFixedDelay}. + *

+ * Der erste Tick startet sofort (Initial Delay 0). Nachfolgende Ticks starten + * {@link SchedulerConfig#intervalSeconds()} Sekunden nach dem Ende des + * vorherigen Ticks. Der Verarbeitungsaufruf erfolgt synchron im + * Scheduler-Thread; der aufrufende Tick-Zyklus wartet also auf den Abschluss + * des Laufs, bevor der nächste Tick geplant wird. + *

+ * Der Adapter delegiert ausschließlich an den injizierten {@link BatchRunTrigger} + * und trifft keine eigenen fachlichen Entscheidungen. Ergebnisse werden über + * den injizierten {@code Consumer} zurückgemeldet. + *

+ * Alle Ausnahmen innerhalb eines Ticks werden abgefangen und geloggt, damit + * der {@link ScheduledExecutorService} den Tick-Zyklus nicht still abbricht. + *

+ * Instanzen dieser Klasse sind für den Einsatz in einem einzigen Steuerungs-Thread + * ausgelegt. {@link #startScheduler} und {@link #stopScheduler} müssen serialisiert + * aufgerufen werden. + */ +public class ScheduledExecutorServiceSchedulerAdapter implements SchedulerPort { + + private static final Logger logger = + LogManager.getLogger(ScheduledExecutorServiceSchedulerAdapter.class); + + private static final String SCHEDULER_THREAD_NAME = "pdf-umbenenner-scheduler"; + + private final Consumer resultConsumer; + + /** + * Hält den aktuell aktiven {@link BatchRunTrigger}. Package-private, + * damit Tests {@code onTick()} isoliert prüfen können, ohne den + * gesamten Lifecycle zu durchlaufen. + */ + final AtomicReference currentTrigger = new AtomicReference<>(); + + private volatile ScheduledExecutorService executor; + + /** + * Erstellt einen neuen Adapter. + * + * @param resultConsumer Empfänger für Tick-Ergebnisse; darf nicht {@code null} sein + */ + public ScheduledExecutorServiceSchedulerAdapter(Consumer resultConsumer) { + this.resultConsumer = Objects.requireNonNull(resultConsumer, + "resultConsumer darf nicht null sein"); + } + + // ------------------------------------------------------------------------- + // SchedulerPort + // ------------------------------------------------------------------------- + + /** + * Startet den periodischen Scheduler-Mechanismus. + *

+ * Ist der Scheduler bereits aktiv, hat dieser Aufruf keine Wirkung (idempotent). + * Andernfalls wird ein Single-Thread-{@link ScheduledExecutorService} angelegt + * und mit {@code scheduleWithFixedDelay} und Initial-Delay 0 gestartet. + * Der erzeugte Thread heißt {@value SCHEDULER_THREAD_NAME} und ist kein Daemon-Thread. + * + * @param config Betriebskonfiguration; insbesondere das Intervall zwischen den Ticks + * @param trigger Auslöser, der bei jedem Tick synchron aufgerufen wird + */ + @Override + public void startScheduler(SchedulerConfig config, BatchRunTrigger trigger) { + Objects.requireNonNull(config, "config darf nicht null sein"); + Objects.requireNonNull(trigger, "trigger darf nicht null sein"); + if (executor != null) { + logger.debug("Scheduler ist bereits aktiv – Start-Aufruf wird ignoriert."); + return; + } + currentTrigger.set(trigger); + ThreadFactory threadFactory = runnable -> { + Thread t = new Thread(runnable, SCHEDULER_THREAD_NAME); + t.setDaemon(false); + t.setUncaughtExceptionHandler((thread, ex) -> + logger.error("Unbehandelte Ausnahme im Scheduler-Thread '{}'.", + thread.getName(), ex)); + return t; + }; + ScheduledExecutorService newExecutor = + Executors.newSingleThreadScheduledExecutor(threadFactory); + newExecutor.scheduleWithFixedDelay( + this::onTick, + 0L, + config.intervalSeconds(), + TimeUnit.SECONDS); + executor = newExecutor; + logger.info("Scheduler gestartet. Intervall: {} Sekunden.", config.intervalSeconds()); + } + + /** + * Stoppt den periodischen Scheduler-Mechanismus. + *

+ * Laufende Ticks werden nicht abgebrochen; es werden lediglich keine weiteren + * Ticks geplant. Ist der Scheduler bereits gestoppt, hat dieser Aufruf keine + * Wirkung (idempotent). + */ + @Override + public void stopScheduler() { + ScheduledExecutorService localExecutor = executor; + if (localExecutor == null) { + logger.debug("Scheduler ist bereits gestoppt – Stop-Aufruf wird ignoriert."); + return; + } + executor = null; + currentTrigger.set(null); + localExecutor.shutdown(); + logger.info("Scheduler angehalten."); + } + + // ------------------------------------------------------------------------- + // Tick-Logik (package-private für Testbarkeit) + // ------------------------------------------------------------------------- + + /** + * Führt einen Verarbeitungstick aus. + *

+ * Holt den aktuellen {@link BatchRunTrigger}, ruft ihn synchron auf und + * leitet das Ergebnis an den {@link Consumer} weiter. Ist kein Trigger + * gesetzt, wird der Tick übersprungen. Alle {@link Exception}en werden + * abgefangen und auf ERROR geloggt, damit der + * {@link ScheduledExecutorService} den Tick-Zyklus nicht still abbricht. + *

+ * Package-private, damit Unit-Tests diese Methode direkt aufrufen können. + */ + void onTick() { + BatchRunTrigger trigger = currentTrigger.get(); + if (trigger == null) { + logger.warn("Scheduler-Tick ausgelöst, aber kein aktiver Trigger vorhanden. " + + "Tick wird übersprungen."); + return; + } + try { + BatchRunTriggerResult result = trigger.triggerRun(); + resultConsumer.accept(result); + } catch (Exception e) { + logger.error("Unbehandelte Ausnahme während des Scheduler-Ticks. " + + "Der nächste Tick wird planmäßig ausgelöst.", e); + } + } +} diff --git a/pdf-umbenenner-adapter-in-scheduler/src/test/java/de/gecheckt/pdf/umbenenner/adapter/in/scheduler/ScheduledExecutorServiceSchedulerAdapterTest.java b/pdf-umbenenner-adapter-in-scheduler/src/test/java/de/gecheckt/pdf/umbenenner/adapter/in/scheduler/ScheduledExecutorServiceSchedulerAdapterTest.java new file mode 100644 index 0000000..64cb8a6 --- /dev/null +++ b/pdf-umbenenner-adapter-in-scheduler/src/test/java/de/gecheckt/pdf/umbenenner/adapter/in/scheduler/ScheduledExecutorServiceSchedulerAdapterTest.java @@ -0,0 +1,244 @@ +package de.gecheckt.pdf.umbenenner.adapter.in.scheduler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import de.gecheckt.pdf.umbenenner.application.port.out.BatchRunTriggerResult; +import de.gecheckt.pdf.umbenenner.application.port.out.RunSummary; +import de.gecheckt.pdf.umbenenner.application.port.out.SchedulerConfig; + +/** + * Unit- und Integrationstests für {@link ScheduledExecutorServiceSchedulerAdapter}. + *

+ * Teststrategien: + *

+ */ +class ScheduledExecutorServiceSchedulerAdapterTest { + + // ========================================================================= + // Lifecycle: startScheduler + // ========================================================================= + + @Test + void startScheduler_triggersFirstTickImmediately() throws Exception { + List results = new CopyOnWriteArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(result -> { + results.add(result); + latch.countDown(); + }); + + SchedulerConfig config = new SchedulerConfig(3600); + adapter.startScheduler(config, () -> new BatchRunTriggerResult.SkippedBusy()); + try { + assertThat(latch.await(5, TimeUnit.SECONDS)) + .as("Erster Tick muss innerhalb von 5 Sekunden ausgelöst werden") + .isTrue(); + assertThat(results).hasSize(1); + assertThat(results.get(0)).isInstanceOf(BatchRunTriggerResult.SkippedBusy.class); + } finally { + adapter.stopScheduler(); + } + } + + @Test + void startScheduler_isIdempotent_secondCallDoesNotCreateSecondExecutor() throws Exception { + List results = new CopyOnWriteArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(result -> { + results.add(result); + latch.countDown(); + }); + + SchedulerConfig config = new SchedulerConfig(3600); + adapter.startScheduler(config, () -> new BatchRunTriggerResult.SkippedBusy()); + adapter.startScheduler(config, () -> new BatchRunTriggerResult.SkippedBusy()); // no-op + + try { + latch.await(5, TimeUnit.SECONDS); + // Kurze Wartezeit: ein zweiter Executor würde sofort einen zweiten Tick feuern + Thread.sleep(100); + assertThat(results) + .as("Nur ein Executor → genau ein sofortiger Tick mit Intervall 3600s") + .hasSize(1); + } finally { + adapter.stopScheduler(); + } + } + + @Test + void startScheduler_afterStop_canBeRestartedWithNewTrigger() throws Exception { + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(result -> {}); + + CountDownLatch firstLatch = new CountDownLatch(1); + CountDownLatch secondLatch = new CountDownLatch(1); + SchedulerConfig config = new SchedulerConfig(3600); + + adapter.startScheduler(config, () -> { + firstLatch.countDown(); + return new BatchRunTriggerResult.SkippedBusy(); + }); + firstLatch.await(5, TimeUnit.SECONDS); + adapter.stopScheduler(); + + List secondResults = new CopyOnWriteArrayList<>(); + adapter.startScheduler(config, () -> { + BatchRunTriggerResult r = + new BatchRunTriggerResult.Started(Instant.now(), RunSummary.noOp()); + secondResults.add(r); + secondLatch.countDown(); + return r; + }); + + try { + assertThat(secondLatch.await(5, TimeUnit.SECONDS)) + .as("Zweiter Start muss einen Tick auslösen") + .isTrue(); + assertThat(secondResults.get(0)).isInstanceOf(BatchRunTriggerResult.Started.class); + } finally { + adapter.stopScheduler(); + } + } + + // ========================================================================= + // Lifecycle: stopScheduler + // ========================================================================= + + @Test + void stopScheduler_withoutPriorStart_doesNotThrow() { + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(result -> {}); + + assertThatCode(adapter::stopScheduler).doesNotThrowAnyException(); + } + + @Test + void stopScheduler_calledTwice_isIdempotent() throws Exception { + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(result -> {}); + + CountDownLatch latch = new CountDownLatch(1); + adapter.startScheduler(new SchedulerConfig(3600), () -> { + latch.countDown(); + return new BatchRunTriggerResult.SkippedBusy(); + }); + latch.await(5, TimeUnit.SECONDS); + + adapter.stopScheduler(); + assertThatCode(adapter::stopScheduler) + .as("Zweiter Stop-Aufruf darf keine Ausnahme werfen") + .doesNotThrowAnyException(); + } + + // ========================================================================= + // Tick-Logik: onTick (direkte Aufrufe, kein Executor) + // ========================================================================= + + @Test + void onTick_whenTriggerIsNull_doesNotCallConsumer() { + List results = new ArrayList<>(); + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(results::add); + + // Kein startScheduler → currentTrigger ist null + adapter.onTick(); + + assertThat(results).isEmpty(); + } + + @Test + void onTick_whenTriggerReturnsSkippedBusy_passesResultToConsumer() { + List results = new ArrayList<>(); + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(results::add); + + adapter.currentTrigger.set(() -> new BatchRunTriggerResult.SkippedBusy()); + adapter.onTick(); + + assertThat(results).hasSize(1); + assertThat(results.get(0)).isInstanceOf(BatchRunTriggerResult.SkippedBusy.class); + } + + @Test + void onTick_whenTriggerReturnsStarted_passesResultToConsumer() { + List results = new ArrayList<>(); + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(results::add); + + Instant now = Instant.now(); + RunSummary summary = new RunSummary(2, 1, 0); + adapter.currentTrigger.set(() -> new BatchRunTriggerResult.Started(now, summary)); + adapter.onTick(); + + assertThat(results).hasSize(1); + BatchRunTriggerResult.Started started = + (BatchRunTriggerResult.Started) results.get(0); + assertThat(started.endedAt()).isEqualTo(now); + assertThat(started.summary()).isEqualTo(summary); + } + + @Test + void onTick_whenTriggerThrowsException_exceptionIsSwallowed() { + List results = new ArrayList<>(); + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(results::add); + + adapter.currentTrigger.set(() -> { + throw new RuntimeException("Simulierter Trigger-Fehler"); + }); + + assertThatCode(adapter::onTick) + .as("Ausnahme im Trigger darf nicht aus onTick propagieren") + .doesNotThrowAnyException(); + assertThat(results) + .as("Consumer darf nicht aufgerufen werden, wenn der Trigger wirft") + .isEmpty(); + } + + @Test + void onTick_whenConsumerThrowsException_exceptionIsSwallowed() { + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(result -> { + throw new RuntimeException("Simulierter Consumer-Fehler"); + }); + + adapter.currentTrigger.set(() -> new BatchRunTriggerResult.SkippedBusy()); + + assertThatCode(adapter::onTick) + .as("Ausnahme im Consumer darf nicht aus onTick propagieren") + .doesNotThrowAnyException(); + } + + @Test + void onTick_calledMultipleTimes_passesEachResultToConsumer() { + List results = new ArrayList<>(); + ScheduledExecutorServiceSchedulerAdapter adapter = + new ScheduledExecutorServiceSchedulerAdapter(results::add); + + adapter.currentTrigger.set(() -> new BatchRunTriggerResult.SkippedBusy()); + adapter.onTick(); + adapter.onTick(); + adapter.onTick(); + + assertThat(results).hasSize(3); + } +}