+ @Test
+ public void testIncrementalRecovery() {
+ int recoverySnapshotInterval = 3;
+ int numberOfEntries = 5;
+ configParams.setRecoverySnapshotIntervalSeconds(recoverySnapshotInterval);
+ Consumer<Optional<OutputStream>> mockSnapshotConsumer = mock(Consumer.class);
+ context.getSnapshotManager().setCreateSnapshotConsumer(mockSnapshotConsumer);
+
+ ScheduledExecutorService applyEntriesExecutor = Executors.newSingleThreadScheduledExecutor();
+ ReplicatedLog replicatedLog = context.getReplicatedLog();
+
+ for (int i = 0; i <= numberOfEntries; i++) {
+ replicatedLog.append(new SimpleReplicatedLogEntry(i, 1,
+ new MockRaftActorContext.MockPayload(String.valueOf(i))));
+ }
+
+ AtomicInteger entryCount = new AtomicInteger();
+ ScheduledFuture<?> applyEntriesFuture = applyEntriesExecutor.scheduleAtFixedRate(() -> {
+ int run = entryCount.getAndIncrement();
+ LOG.info("Sending entry number {}", run);
+ sendMessageToSupport(new ApplyJournalEntries(run));
+ }, 0, 1, TimeUnit.SECONDS);
+
+ ScheduledFuture<Boolean> canceller = applyEntriesExecutor.schedule(() -> applyEntriesFuture.cancel(false),
+ numberOfEntries, TimeUnit.SECONDS);
+ try {
+ canceller.get();
+ verify(mockSnapshotConsumer, times(1)).accept(any());
+ applyEntriesExecutor.shutdown();
+ } catch (InterruptedException | ExecutionException e) {
+ Assert.fail();
+ }
+ }
+