Implementiere ScheduledExecutorServiceSchedulerAdapter für SchedulerPort
Single-Thread-Executor mit scheduleWithFixedDelay (Initial Delay 0). Thread-Name: pdf-umbenenner-scheduler, Non-Daemon, UncaughtExceptionHandler auf ERROR. Alle Ausnahmen in onTick() werden abgefangen, damit der Tick-Zyklus nicht still abbricht. currentTrigger und onTick() sind package-private für direkte Testbarkeit. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+162
@@ -0,0 +1,162 @@
|
|||||||
|
package de.gecheckt.pdf.umbenenner.adapter.in.scheduler;
|
||||||
|
|
||||||
|
import de.gecheckt.pdf.umbenenner.application.port.out.BatchRunTrigger;
|
||||||
|
import de.gecheckt.pdf.umbenenner.application.port.out.BatchRunTriggerResult;
|
||||||
|
import de.gecheckt.pdf.umbenenner.application.port.out.SchedulerConfig;
|
||||||
|
import de.gecheckt.pdf.umbenenner.application.port.out.SchedulerPort;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implementiert {@link SchedulerPort} auf Basis eines
|
||||||
|
* {@link ScheduledExecutorService} mit
|
||||||
|
* {@link ScheduledExecutorService#scheduleWithFixedDelay}.
|
||||||
|
* <p>
|
||||||
|
* Der erste Tick startet sofort (Initial Delay 0). Nachfolgende Ticks starten
|
||||||
|
* {@link SchedulerConfig#intervalSeconds()} Sekunden nach dem Ende des
|
||||||
|
* vorherigen Ticks. Der Verarbeitungsaufruf erfolgt synchron im
|
||||||
|
* Scheduler-Thread; der aufrufende Tick-Zyklus wartet also auf den Abschluss
|
||||||
|
* des Laufs, bevor der nächste Tick geplant wird.
|
||||||
|
* <p>
|
||||||
|
* Der Adapter delegiert ausschließlich an den injizierten {@link BatchRunTrigger}
|
||||||
|
* und trifft keine eigenen fachlichen Entscheidungen. Ergebnisse werden über
|
||||||
|
* den injizierten {@code Consumer<BatchRunTriggerResult>} zurückgemeldet.
|
||||||
|
* <p>
|
||||||
|
* Alle Ausnahmen innerhalb eines Ticks werden abgefangen und geloggt, damit
|
||||||
|
* der {@link ScheduledExecutorService} den Tick-Zyklus nicht still abbricht.
|
||||||
|
* <p>
|
||||||
|
* Instanzen dieser Klasse sind für den Einsatz in einem einzigen Steuerungs-Thread
|
||||||
|
* ausgelegt. {@link #startScheduler} und {@link #stopScheduler} müssen serialisiert
|
||||||
|
* aufgerufen werden.
|
||||||
|
*/
|
||||||
|
public class ScheduledExecutorServiceSchedulerAdapter implements SchedulerPort {
|
||||||
|
|
||||||
|
private static final Logger logger =
|
||||||
|
LogManager.getLogger(ScheduledExecutorServiceSchedulerAdapter.class);
|
||||||
|
|
||||||
|
private static final String SCHEDULER_THREAD_NAME = "pdf-umbenenner-scheduler";
|
||||||
|
|
||||||
|
private final Consumer<BatchRunTriggerResult> resultConsumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hält den aktuell aktiven {@link BatchRunTrigger}. Package-private,
|
||||||
|
* damit Tests {@code onTick()} isoliert prüfen können, ohne den
|
||||||
|
* gesamten Lifecycle zu durchlaufen.
|
||||||
|
*/
|
||||||
|
final AtomicReference<BatchRunTrigger> currentTrigger = new AtomicReference<>();
|
||||||
|
|
||||||
|
private volatile ScheduledExecutorService executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Erstellt einen neuen Adapter.
|
||||||
|
*
|
||||||
|
* @param resultConsumer Empfänger für Tick-Ergebnisse; darf nicht {@code null} sein
|
||||||
|
*/
|
||||||
|
public ScheduledExecutorServiceSchedulerAdapter(Consumer<BatchRunTriggerResult> resultConsumer) {
|
||||||
|
this.resultConsumer = Objects.requireNonNull(resultConsumer,
|
||||||
|
"resultConsumer darf nicht null sein");
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// SchedulerPort
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Startet den periodischen Scheduler-Mechanismus.
|
||||||
|
* <p>
|
||||||
|
* Ist der Scheduler bereits aktiv, hat dieser Aufruf keine Wirkung (idempotent).
|
||||||
|
* Andernfalls wird ein Single-Thread-{@link ScheduledExecutorService} angelegt
|
||||||
|
* und mit {@code scheduleWithFixedDelay} und Initial-Delay 0 gestartet.
|
||||||
|
* Der erzeugte Thread heißt {@value SCHEDULER_THREAD_NAME} und ist kein Daemon-Thread.
|
||||||
|
*
|
||||||
|
* @param config Betriebskonfiguration; insbesondere das Intervall zwischen den Ticks
|
||||||
|
* @param trigger Auslöser, der bei jedem Tick synchron aufgerufen wird
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void startScheduler(SchedulerConfig config, BatchRunTrigger trigger) {
|
||||||
|
Objects.requireNonNull(config, "config darf nicht null sein");
|
||||||
|
Objects.requireNonNull(trigger, "trigger darf nicht null sein");
|
||||||
|
if (executor != null) {
|
||||||
|
logger.debug("Scheduler ist bereits aktiv – Start-Aufruf wird ignoriert.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
currentTrigger.set(trigger);
|
||||||
|
ThreadFactory threadFactory = runnable -> {
|
||||||
|
Thread t = new Thread(runnable, SCHEDULER_THREAD_NAME);
|
||||||
|
t.setDaemon(false);
|
||||||
|
t.setUncaughtExceptionHandler((thread, ex) ->
|
||||||
|
logger.error("Unbehandelte Ausnahme im Scheduler-Thread '{}'.",
|
||||||
|
thread.getName(), ex));
|
||||||
|
return t;
|
||||||
|
};
|
||||||
|
ScheduledExecutorService newExecutor =
|
||||||
|
Executors.newSingleThreadScheduledExecutor(threadFactory);
|
||||||
|
newExecutor.scheduleWithFixedDelay(
|
||||||
|
this::onTick,
|
||||||
|
0L,
|
||||||
|
config.intervalSeconds(),
|
||||||
|
TimeUnit.SECONDS);
|
||||||
|
executor = newExecutor;
|
||||||
|
logger.info("Scheduler gestartet. Intervall: {} Sekunden.", config.intervalSeconds());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stoppt den periodischen Scheduler-Mechanismus.
|
||||||
|
* <p>
|
||||||
|
* Laufende Ticks werden nicht abgebrochen; es werden lediglich keine weiteren
|
||||||
|
* Ticks geplant. Ist der Scheduler bereits gestoppt, hat dieser Aufruf keine
|
||||||
|
* Wirkung (idempotent).
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void stopScheduler() {
|
||||||
|
ScheduledExecutorService localExecutor = executor;
|
||||||
|
if (localExecutor == null) {
|
||||||
|
logger.debug("Scheduler ist bereits gestoppt – Stop-Aufruf wird ignoriert.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
executor = null;
|
||||||
|
currentTrigger.set(null);
|
||||||
|
localExecutor.shutdown();
|
||||||
|
logger.info("Scheduler angehalten.");
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Tick-Logik (package-private für Testbarkeit)
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Führt einen Verarbeitungstick aus.
|
||||||
|
* <p>
|
||||||
|
* Holt den aktuellen {@link BatchRunTrigger}, ruft ihn synchron auf und
|
||||||
|
* leitet das Ergebnis an den {@link Consumer} weiter. Ist kein Trigger
|
||||||
|
* gesetzt, wird der Tick übersprungen. Alle {@link Exception}en werden
|
||||||
|
* abgefangen und auf ERROR geloggt, damit der
|
||||||
|
* {@link ScheduledExecutorService} den Tick-Zyklus nicht still abbricht.
|
||||||
|
* <p>
|
||||||
|
* Package-private, damit Unit-Tests diese Methode direkt aufrufen können.
|
||||||
|
*/
|
||||||
|
void onTick() {
|
||||||
|
BatchRunTrigger trigger = currentTrigger.get();
|
||||||
|
if (trigger == null) {
|
||||||
|
logger.warn("Scheduler-Tick ausgelöst, aber kein aktiver Trigger vorhanden. "
|
||||||
|
+ "Tick wird übersprungen.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
BatchRunTriggerResult result = trigger.triggerRun();
|
||||||
|
resultConsumer.accept(result);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Unbehandelte Ausnahme während des Scheduler-Ticks. "
|
||||||
|
+ "Der nächste Tick wird planmäßig ausgelöst.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
+244
@@ -0,0 +1,244 @@
|
|||||||
|
package de.gecheckt.pdf.umbenenner.adapter.in.scheduler;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatCode;
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import de.gecheckt.pdf.umbenenner.application.port.out.BatchRunTriggerResult;
|
||||||
|
import de.gecheckt.pdf.umbenenner.application.port.out.RunSummary;
|
||||||
|
import de.gecheckt.pdf.umbenenner.application.port.out.SchedulerConfig;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit- und Integrationstests für {@link ScheduledExecutorServiceSchedulerAdapter}.
|
||||||
|
* <p>
|
||||||
|
* Teststrategien:
|
||||||
|
* <ul>
|
||||||
|
* <li>Lifecycle-Tests (Start, Stop, Idempotenz) nutzen {@link CountDownLatch}
|
||||||
|
* für deterministische Synchronisation ohne {@code Thread.sleep}.</li>
|
||||||
|
* <li>Tick-Logik-Tests ({@code onTick}) rufen die package-private Methode
|
||||||
|
* direkt auf und setzen {@code currentTrigger} ohne Executor.</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
class ScheduledExecutorServiceSchedulerAdapterTest {
|
||||||
|
|
||||||
|
// =========================================================================
|
||||||
|
// Lifecycle: startScheduler
|
||||||
|
// =========================================================================
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void startScheduler_triggersFirstTickImmediately() throws Exception {
|
||||||
|
List<BatchRunTriggerResult> results = new CopyOnWriteArrayList<>();
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(result -> {
|
||||||
|
results.add(result);
|
||||||
|
latch.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
SchedulerConfig config = new SchedulerConfig(3600);
|
||||||
|
adapter.startScheduler(config, () -> new BatchRunTriggerResult.SkippedBusy());
|
||||||
|
try {
|
||||||
|
assertThat(latch.await(5, TimeUnit.SECONDS))
|
||||||
|
.as("Erster Tick muss innerhalb von 5 Sekunden ausgelöst werden")
|
||||||
|
.isTrue();
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0)).isInstanceOf(BatchRunTriggerResult.SkippedBusy.class);
|
||||||
|
} finally {
|
||||||
|
adapter.stopScheduler();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void startScheduler_isIdempotent_secondCallDoesNotCreateSecondExecutor() throws Exception {
|
||||||
|
List<BatchRunTriggerResult> results = new CopyOnWriteArrayList<>();
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(result -> {
|
||||||
|
results.add(result);
|
||||||
|
latch.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
SchedulerConfig config = new SchedulerConfig(3600);
|
||||||
|
adapter.startScheduler(config, () -> new BatchRunTriggerResult.SkippedBusy());
|
||||||
|
adapter.startScheduler(config, () -> new BatchRunTriggerResult.SkippedBusy()); // no-op
|
||||||
|
|
||||||
|
try {
|
||||||
|
latch.await(5, TimeUnit.SECONDS);
|
||||||
|
// Kurze Wartezeit: ein zweiter Executor würde sofort einen zweiten Tick feuern
|
||||||
|
Thread.sleep(100);
|
||||||
|
assertThat(results)
|
||||||
|
.as("Nur ein Executor → genau ein sofortiger Tick mit Intervall 3600s")
|
||||||
|
.hasSize(1);
|
||||||
|
} finally {
|
||||||
|
adapter.stopScheduler();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void startScheduler_afterStop_canBeRestartedWithNewTrigger() throws Exception {
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(result -> {});
|
||||||
|
|
||||||
|
CountDownLatch firstLatch = new CountDownLatch(1);
|
||||||
|
CountDownLatch secondLatch = new CountDownLatch(1);
|
||||||
|
SchedulerConfig config = new SchedulerConfig(3600);
|
||||||
|
|
||||||
|
adapter.startScheduler(config, () -> {
|
||||||
|
firstLatch.countDown();
|
||||||
|
return new BatchRunTriggerResult.SkippedBusy();
|
||||||
|
});
|
||||||
|
firstLatch.await(5, TimeUnit.SECONDS);
|
||||||
|
adapter.stopScheduler();
|
||||||
|
|
||||||
|
List<BatchRunTriggerResult> secondResults = new CopyOnWriteArrayList<>();
|
||||||
|
adapter.startScheduler(config, () -> {
|
||||||
|
BatchRunTriggerResult r =
|
||||||
|
new BatchRunTriggerResult.Started(Instant.now(), RunSummary.noOp());
|
||||||
|
secondResults.add(r);
|
||||||
|
secondLatch.countDown();
|
||||||
|
return r;
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertThat(secondLatch.await(5, TimeUnit.SECONDS))
|
||||||
|
.as("Zweiter Start muss einen Tick auslösen")
|
||||||
|
.isTrue();
|
||||||
|
assertThat(secondResults.get(0)).isInstanceOf(BatchRunTriggerResult.Started.class);
|
||||||
|
} finally {
|
||||||
|
adapter.stopScheduler();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// =========================================================================
|
||||||
|
// Lifecycle: stopScheduler
|
||||||
|
// =========================================================================
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void stopScheduler_withoutPriorStart_doesNotThrow() {
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(result -> {});
|
||||||
|
|
||||||
|
assertThatCode(adapter::stopScheduler).doesNotThrowAnyException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void stopScheduler_calledTwice_isIdempotent() throws Exception {
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(result -> {});
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
adapter.startScheduler(new SchedulerConfig(3600), () -> {
|
||||||
|
latch.countDown();
|
||||||
|
return new BatchRunTriggerResult.SkippedBusy();
|
||||||
|
});
|
||||||
|
latch.await(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
adapter.stopScheduler();
|
||||||
|
assertThatCode(adapter::stopScheduler)
|
||||||
|
.as("Zweiter Stop-Aufruf darf keine Ausnahme werfen")
|
||||||
|
.doesNotThrowAnyException();
|
||||||
|
}
|
||||||
|
|
||||||
|
// =========================================================================
|
||||||
|
// Tick-Logik: onTick (direkte Aufrufe, kein Executor)
|
||||||
|
// =========================================================================
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void onTick_whenTriggerIsNull_doesNotCallConsumer() {
|
||||||
|
List<BatchRunTriggerResult> results = new ArrayList<>();
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(results::add);
|
||||||
|
|
||||||
|
// Kein startScheduler → currentTrigger ist null
|
||||||
|
adapter.onTick();
|
||||||
|
|
||||||
|
assertThat(results).isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void onTick_whenTriggerReturnsSkippedBusy_passesResultToConsumer() {
|
||||||
|
List<BatchRunTriggerResult> results = new ArrayList<>();
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(results::add);
|
||||||
|
|
||||||
|
adapter.currentTrigger.set(() -> new BatchRunTriggerResult.SkippedBusy());
|
||||||
|
adapter.onTick();
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
assertThat(results.get(0)).isInstanceOf(BatchRunTriggerResult.SkippedBusy.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void onTick_whenTriggerReturnsStarted_passesResultToConsumer() {
|
||||||
|
List<BatchRunTriggerResult> results = new ArrayList<>();
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(results::add);
|
||||||
|
|
||||||
|
Instant now = Instant.now();
|
||||||
|
RunSummary summary = new RunSummary(2, 1, 0);
|
||||||
|
adapter.currentTrigger.set(() -> new BatchRunTriggerResult.Started(now, summary));
|
||||||
|
adapter.onTick();
|
||||||
|
|
||||||
|
assertThat(results).hasSize(1);
|
||||||
|
BatchRunTriggerResult.Started started =
|
||||||
|
(BatchRunTriggerResult.Started) results.get(0);
|
||||||
|
assertThat(started.endedAt()).isEqualTo(now);
|
||||||
|
assertThat(started.summary()).isEqualTo(summary);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void onTick_whenTriggerThrowsException_exceptionIsSwallowed() {
|
||||||
|
List<BatchRunTriggerResult> results = new ArrayList<>();
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(results::add);
|
||||||
|
|
||||||
|
adapter.currentTrigger.set(() -> {
|
||||||
|
throw new RuntimeException("Simulierter Trigger-Fehler");
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThatCode(adapter::onTick)
|
||||||
|
.as("Ausnahme im Trigger darf nicht aus onTick propagieren")
|
||||||
|
.doesNotThrowAnyException();
|
||||||
|
assertThat(results)
|
||||||
|
.as("Consumer darf nicht aufgerufen werden, wenn der Trigger wirft")
|
||||||
|
.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void onTick_whenConsumerThrowsException_exceptionIsSwallowed() {
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(result -> {
|
||||||
|
throw new RuntimeException("Simulierter Consumer-Fehler");
|
||||||
|
});
|
||||||
|
|
||||||
|
adapter.currentTrigger.set(() -> new BatchRunTriggerResult.SkippedBusy());
|
||||||
|
|
||||||
|
assertThatCode(adapter::onTick)
|
||||||
|
.as("Ausnahme im Consumer darf nicht aus onTick propagieren")
|
||||||
|
.doesNotThrowAnyException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void onTick_calledMultipleTimes_passesEachResultToConsumer() {
|
||||||
|
List<BatchRunTriggerResult> results = new ArrayList<>();
|
||||||
|
ScheduledExecutorServiceSchedulerAdapter adapter =
|
||||||
|
new ScheduledExecutorServiceSchedulerAdapter(results::add);
|
||||||
|
|
||||||
|
adapter.currentTrigger.set(() -> new BatchRunTriggerResult.SkippedBusy());
|
||||||
|
adapter.onTick();
|
||||||
|
adapter.onTick();
|
||||||
|
adapter.onTick();
|
||||||
|
|
||||||
|
assertThat(results).hasSize(3);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user