Split out EntryStore.markLastApplied() 66/116266/12
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 15 Apr 2025 00:02:32 +0000 (02:02 +0200)
committerRobert Varga <nite@hq.sk>
Wed, 16 Apr 2025 22:54:23 +0000 (22:54 +0000)
Rather than going to through an async message, mark the last applied
index through persistence.

This eliminate one of the possibilities for persistAsync() and also
reduces latency and improves accuracy.

Since we are no longer sending ApplyJournalEntries to self, we also need
to switch tests to synchronize on applyIndex instead.

JIRA: CONTROLLER-2137
Change-Id: Id5aa178aa721b5086163d7b721b9447b80724d7e
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
22 files changed:
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/PekkoRaftStorage.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/RaftActorBehavior.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/AJE.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/persisted/ApplyJournalEntries.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/EntryStore.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/ForwardingDataPersistenceProvider.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/ImmediateDataPersistenceProvider.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractActorTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractRaftActorIntegrationTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/IsolationScenarioTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockReplicatedLog.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorServerConfigurationSupportTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RaftActorTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationSingleNodeTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/RecoveryIntegrationTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsIntegrationTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/behaviors/LeaderTest.java

index 03c6f941eac13d1587003ddd2318986af779050d..4e3a0a7ba750601f987988d8813d841f6190961a 100644 (file)
@@ -199,6 +199,11 @@ final class PekkoRaftStorage extends EnabledRaftStorage {
         actor.persistAsync(entry, callback);
     }
 
+    @Override
+    public void markLastApplied(final long lastApplied) {
+        actor.markLastApplied(lastApplied);
+    }
+
     @Override
     public void saveSnapshot(final Snapshot snapshot) {
         actor.saveSnapshot(snapshot);
index 8378884acef5362c844db68c09799bd13bbc3eb2..e750d310c4b1c4ffa49f57334f0b6391950ce7d5 100644 (file)
@@ -272,9 +272,6 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
             }
 
             possiblyHandleBehaviorMessage(applyState);
-        } else if (message instanceof ApplyJournalEntries applyEntries) {
-            LOG.debug("{}: Persisting ApplyJournalEntries with index={}", memberId(), applyEntries.getToIndex());
-            persistence().persistAsync(applyEntries, unused -> { });
         } else if (message instanceof FindLeader) {
             getSender().tell(new FindLeaderReply(getLeaderAddress()), self());
         } else if (message instanceof GetOnDemandRaftState) {
@@ -715,10 +712,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 // Apply the state immediately.
                 applyCommand(identifier, persistedEntry);
 
-                // Send a ApplyJournalEntries message so that we write the fact that we applied
-                // the state to durable storage
-                self().tell(new ApplyJournalEntries(persistedEntry.index()), self());
-
+                // We have finished applying the command, tell ReplicatedLog about that
+                currentLog.markLastApplied();
             } else {
                 currentLog.captureSnapshotIfReady(persistedEntry);
 
@@ -1034,6 +1029,13 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
         super.persistAsync(entry, callback::accept);
     }
 
+    final void markLastApplied(final long lastApplied) {
+        LOG.debug("{}: Persisting ApplyJournalEntries with index={}", memberId(), lastApplied);
+        super.persistAsync(new ApplyJournalEntries(lastApplied), unused -> {
+            // No-op
+        });
+    }
+
     @Override
     @Deprecated(since = "11.0.0", forRemoval = true)
     public final void loadSnapshot(final String persistenceId, final SnapshotSelectionCriteria criteria,
index 0907eaff5f8bc16b5fba6d78d89d13d697d7f7a0..a727d6f6da29e406b9f42d6cbcc4ba85eab6f6b4 100644 (file)
@@ -132,6 +132,11 @@ public interface ReplicatedLog {
      */
     void setLastApplied(long lastApplied);
 
+    /**
+     * Mark the current value {@link #getLastApplied()} for recovery purposes.
+     */
+    void markLastApplied();
+
     /**
      * Removes entries from the in-memory log starting at the given index. This method exists only to deal with the
      * effects of {@link #trimToReceive(long)} with Pekko Persistence.
index d9d0650964907e7e30707fdb48a1e5600646786e..de530bcc35bd260a942ca48eaa29dbc154a734a7 100644 (file)
@@ -123,4 +123,9 @@ final class ReplicatedLogImpl extends AbstractReplicatedLog {
             callback.accept(entry);
         }
     }
+
+    @Override
+    public void markLastApplied() {
+        context.getPersistenceProvider().markLastApplied(getLastApplied());
+    }
 }
index da414d208a0166703681acae30cade23be946e33..31b895a54f392a32948e021aadf90a951297b1bb 100644 (file)
@@ -25,7 +25,6 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.spi.LogEntry;
 import org.opendaylight.raft.api.RaftRole;
 import org.opendaylight.raft.api.TermInfo;
@@ -385,9 +384,9 @@ public abstract class RaftActorBehavior implements AutoCloseable {
 
         // send a message to persist a ApplyLogEntries marker message into akka's persistent journal
         // will be used during recovery
-        //in case if the above code throws an error and this message is not sent, it would be fine
+        // in case if the above code throws an error and this message is not sent, it would be fine
         // as the  append entries received later would initiate add this message to the journal
-        actor().tell(new ApplyJournalEntries(replLog.getLastApplied()), actor());
+        replLog.markLastApplied();
     }
 
     /**
index 4e39e9884c24bdae6b3e441a6bdd87a945c24c4b..044d24c05a953986588d1b985074e32a0cb486ee 100644 (file)
@@ -19,7 +19,8 @@ import org.opendaylight.yangtools.concepts.WritableObjects;
 /**
  * Serialization proxy for {@link ApplyJournalEntries}.
  */
-final class AJE implements Externalizable {
+@Deprecated(since = "11.0.0", forRemoval = true)
+final class AJE implements PekkoPersistenceContract, Externalizable {
     @java.io.Serial
     private static final long serialVersionUID = 1L;
 
index e7ba99776b4155d30ed7dfc44f73104dba36f9f8..b9877933459256d58f5a52a86eee1d6d17bb9de0 100644 (file)
@@ -8,17 +8,16 @@
 package org.opendaylight.controller.cluster.raft.persisted;
 
 import java.io.Serializable;
-import org.apache.pekko.dispatch.ControlMessage;
 
 /**
  * This is an internal message that is stored in the akka's persistent journal. During recovery, this
  * message is used to apply recovered journal entries to the state whose indexes range from the context's
- * current lastApplied index to "toIndex" contained in the message. This message is sent internally from a
- * behavior to the RaftActor to persist.
+ * current lastApplied index to "toIndex" contained in the message.
  *
  * @author Thomas Pantelis
  */
-public final class ApplyJournalEntries implements Serializable, ControlMessage {
+@Deprecated(since = "11.0.0", forRemoval = true)
+public final class ApplyJournalEntries implements PekkoPersistenceContract, Serializable {
     @java.io.Serial
     private static final long serialVersionUID = 1L;
 
index ed9fa91a66e036319769bb5b6840278e12910358..5fb1dc9a4da747fe165d21f93ed832207fd018a7 100644 (file)
@@ -62,6 +62,21 @@ public interface EntryStore {
      */
     long lastSequenceNumber();
 
+    /**
+     * Record a known value of {@code lastApplied} as a recovery optimization. If we can recover this information,
+     * recovery can re-apply these entries before we attempt to talk to other members. It is okay to lose this marker,
+     * as in that case we will just apply those entries as part of being a follower or becoming a leader.
+     *
+     * <p>This amounts to persisting a lower bound on {@code commitIndex}, which is explicitly volatile state. We could
+     * remember that instead (or perhaps as well) -- but now we just derive it.
+     *
+     * <p>If we later discover that this index lies beyond current leader's {@code commitIndex}, we will ask for
+     * a complete snapshot -- which is not particularly nice, but should happen seldom enough for it not to matter much.
+     *
+     * @param lastApplied lastApplied index to remember
+     */
+    void markLastApplied(long lastApplied);
+
     /**
      * Receive and potentially handle a {@link JournalProtocol} response.
      *
index 1c83a74c1b9a9667eff203d8d8e4f6f71dc265e1..50ad9a7507a0c4f08db52b1a58e4e4563e2e293b 100644 (file)
@@ -49,6 +49,11 @@ public abstract class ForwardingDataPersistenceProvider implements DataPersisten
         delegate().persistAsync(entry, callback);
     }
 
+    @Override
+    public void markLastApplied(final long lastApplied) {
+        delegate().markLastApplied(lastApplied);
+    }
+
     @Override
     public void saveSnapshot(final Snapshot entry) {
         delegate().saveSnapshot(entry);
index 62ac8c1c4875a7d21b8b57d14a5f3ca1f28ea845..6fe0e0ce754b97307b2bf007bf78f77d79613197 100644 (file)
@@ -54,6 +54,11 @@ public interface ImmediateDataPersistenceProvider extends DataPersistenceProvide
         actor().executeInSelf(() -> callback.accept(entry));
     }
 
+    @Override
+    default void markLastApplied(final long lastApplied) {
+        // No-op
+    }
+
     @Override
     default void deleteSnapshots(final long maxTimestamp) {
         // no-op
index 9c01bb5331cb40eacec5feb4bfbf591554aca4b0..0b76ffe4eab6db08a1c0208f58766fa769a55e8f 100644 (file)
@@ -8,14 +8,18 @@
 package org.opendaylight.controller.cluster.raft;
 
 import static java.util.Objects.requireNonNull;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Comparator;
 import org.apache.commons.io.FileUtils;
 import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.testkit.TestActorRef;
 import org.apache.pekko.testkit.javadsl.TestKit;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -75,4 +79,10 @@ public abstract class AbstractActorTest {
     protected static void deleteJournal() throws IOException {
         FileUtils.deleteDirectory(Path.of("journal").toFile());
     }
+
+    protected static final void verifyApplyIndex(final TestActorRef<? extends RaftActor> actor, final long expIndex) {
+        await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
+            assertEquals(expIndex, actor.underlyingActor().getRaftActorContext().getReplicatedLog().getLastApplied());
+        });
+    }
 }
index 188511516ca339b3d1c378150cdfcd5a77913c99..3bb81ddde92c9c09944ca64505b4e714fc1172b3 100644 (file)
@@ -40,7 +40,6 @@ import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
-import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ClusterConfig;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.spi.LogEntry;
@@ -304,11 +303,6 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         testkit.unwatch(actor);
     }
 
-    protected void verifyApplyJournalEntries(final ActorRef actor, final long expIndex) {
-        MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class,
-            msg -> msg.getToIndex() == expIndex);
-    }
-
     protected void verifySnapshot(final String prefix, final Snapshot snapshot, final long lastAppliedTerm,
             final long lastAppliedIndex, final long lastTerm, final long lastIndex) {
         assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
index 4847766f07bb26c023df6be4801a06a0f2bfcaa3..b383e963f3f0f2b09004782baa2611f2c986a462 100644 (file)
@@ -52,11 +52,11 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
 
         // Send an initial payloads and verify replication.
 
-        final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
-        final MockCommand payload1 = sendPayloadData(leaderActor, "one");
-        verifyApplyJournalEntries(leaderCollectorActor, 1);
-        verifyApplyJournalEntries(follower1CollectorActor, 1);
-        verifyApplyJournalEntries(follower2CollectorActor, 1);
+        final var payload0 = sendPayloadData(leaderActor, "zero");
+        final var payload1 = sendPayloadData(leaderActor, "one");
+        verifyApplyIndex(leaderActor, 1);
+        verifyApplyIndex(follower1Actor, 1);
+        verifyApplyIndex(follower2Actor, 1);
 
         isolateLeader();
 
@@ -86,9 +86,9 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
 
         testLog.info("Sending payload to new leader");
 
-        final MockCommand newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
-        verifyApplyJournalEntries(follower1CollectorActor, 2);
-        verifyApplyJournalEntries(follower2CollectorActor, 2);
+        final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+        verifyApplyIndex(follower1Actor, 2);
+        verifyApplyIndex(follower2Actor, 2);
 
         final var follower1log = follower1Context.getReplicatedLog();
         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
@@ -109,7 +109,7 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
         // The previous leader has a conflicting log entry at index 2 with a different term which should get
         // replaced by the new leader's index 1 entry.
 
-        verifyApplyJournalEntries(leaderCollectorActor, 2);
+        verifyApplyIndex(leaderActor, 2);
 
         final var leaderLog = leaderContext.getReplicatedLog();
         assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
@@ -138,9 +138,9 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
         // Submit an initial payload that is committed/applied on all nodes.
 
         final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
-        verifyApplyJournalEntries(leaderCollectorActor, 0);
-        verifyApplyJournalEntries(follower1CollectorActor, 0);
-        verifyApplyJournalEntries(follower2CollectorActor, 0);
+        verifyApplyIndex(leaderActor, 0);
+        verifyApplyIndex(follower1Actor, 0);
+        verifyApplyIndex(follower2Actor, 0);
 
         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
@@ -162,7 +162,7 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
                 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
                         && ae.getEntries().getFirst().command().equals(payload1));
 
-        verifyApplyJournalEntries(leaderCollectorActor, 1);
+        verifyApplyIndex(leaderActor, 1);
 
         isolateLeader();
 
@@ -193,9 +193,9 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
 
         testLog.info("Sending payload to new leader");
 
-        final MockCommand newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
-        verifyApplyJournalEntries(follower1CollectorActor, 3);
-        verifyApplyJournalEntries(follower2CollectorActor, 3);
+        final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+        verifyApplyIndex(follower1Actor, 3);
+        verifyApplyIndex(follower2Actor, 3);
 
         final var follower1log = follower1Context.getReplicatedLog();
         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
@@ -216,7 +216,7 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
         // The previous leader has a conflicting log entry at index 2 with a different term which should get
         // replaced by the new leader's entry.
 
-        verifyApplyJournalEntries(leaderCollectorActor, 3);
+        verifyApplyIndex(leaderActor, 3);
 
         verifyRaftState(leaderActor, raftState -> {
             final var leaderLog = leaderContext.getReplicatedLog();
@@ -259,9 +259,9 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
         // Submit an initial payload that is committed/applied on all nodes.
 
         final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
-        verifyApplyJournalEntries(leaderCollectorActor, 0);
-        verifyApplyJournalEntries(follower1CollectorActor, 0);
-        verifyApplyJournalEntries(follower2CollectorActor, 0);
+        verifyApplyIndex(leaderActor, 0);
+        verifyApplyIndex(follower1Actor, 0);
+        verifyApplyIndex(follower2Actor, 0);
 
         // Submit another payload that is replicated to all followers and committed on the leader but the leader is
         // isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
@@ -283,7 +283,7 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
                 ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
                         && ae.getEntries().getFirst().command().equals(payload1));
 
-        verifyApplyJournalEntries(leaderCollectorActor, 1);
+        verifyApplyIndex(leaderActor, 1);
 
         isolateLeader();
 
@@ -322,8 +322,8 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
         final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
         final var newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
         final var newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
-        verifyApplyJournalEntries(follower1CollectorActor, 5);
-        verifyApplyJournalEntries(follower2CollectorActor, 5);
+        verifyApplyIndex(follower1Actor, 5);
+        verifyApplyIndex(follower2Actor, 5);
 
         final var follower1log = follower1Context.getReplicatedLog();
         assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
@@ -344,7 +344,7 @@ public class IsolationScenarioTest extends AbstractRaftActorIntegrationTest {
         // The previous leader has conflicting log entries starting at index 2 with different terms which should get
         // replaced by the new leader's entries.
 
-        verifyApplyJournalEntries(leaderCollectorActor, 5);
+        verifyApplyIndex(leaderActor, 5);
 
         verifyRaftState(leaderActor, raftState -> {
             final var leaderLog = leaderContext.getReplicatedLog();
index d80a36e3b8b3549e253d4abc84004dad7d8fd963..4b1c09f4e2893b64b26441ae79769b2757a71dee 100644 (file)
@@ -143,6 +143,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
             }
             return true;
         }
+
+        @Override
+        public void markLastApplied() {
+            // No-op
+        }
     }
 
     public static final class Builder {
index c4571c9ca2113cbaa3625c313f68834720cd1085..5808916618c83271001b3dfb4f02a7c60e2684d9 100644 (file)
@@ -37,6 +37,11 @@ final class MockReplicatedLog extends AbstractReplicatedLog {
         return true;
     }
 
+    @Override
+    public void markLastApplied() {
+        // No-op
+    }
+
     @Override
     public void captureSnapshotIfReady(final EntryMeta replicatedLogEntry) {
         // No-op
index d1d9b7142338dc90d794b78a0003fc0796a2dbc5..05597e447220b373443861ab1dd81a7d3f8df979 100644 (file)
@@ -1212,17 +1212,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
 
-        ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
-                ApplyJournalEntries.class);
-        assertEquals("getToIndex", 1, apply.getToIndex());
+        verifyApplyIndex(node1RaftActorRef, 1);
         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
                 nonVotingServer("downNode2"));
         assertTrue("isVotingMember", node1RaftActor.getRaftActorContext().isVotingMember());
         assertEquals("getRaftState", RaftRole.Leader, node1RaftActor.getRaftState());
 
-        apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
-        assertEquals("getToIndex", 1, apply.getToIndex());
+        verifyApplyIndex(node2RaftActorRef, 1);
         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
                 votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
                 nonVotingServer("downNode2"));
@@ -1339,18 +1336,17 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         // is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
         // forward the request to node2.
 
-        ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
-                Map.of(node1ID, true, node2ID, true));
+        final var changeServers = new ChangeServersVotingStatus(Map.of(node1ID, true, node2ID, true));
         node1RaftActorRef.tell(changeServers, testKit.getRef());
-        ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
+        final var reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
 
-        MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+        verifyApplyIndex(node2RaftActorRef, 2);
         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
                 votingServer(node1ID), votingServer(node2ID));
         assertEquals("getRaftState", RaftRole.Leader, node2RaftActor.getRaftState());
 
-        MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+        verifyApplyIndex(node1RaftActorRef, 2);
         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
                 votingServer(node1ID), votingServer(node2ID));
         assertTrue("isVotingMember", node1RaftActor.getRaftActorContext().isVotingMember());
@@ -1414,13 +1410,14 @@ public class RaftActorServerConfigurationSupportTest extends AbstractActorTest {
         ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
         assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
 
-        MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+        verifyApplyIndex(node1RaftActorRef, 1);
         verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
                 votingServer(node1ID), votingServer(node2ID));
         assertTrue("isVotingMember", node1RaftActor.getRaftActorContext().isVotingMember());
         assertEquals("getRaftState", RaftRole.Follower, node1RaftActor.getRaftState());
 
-        MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+        verifyApplyIndex(node2RaftActorRef, 1);
+
         verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
                 votingServer(node1ID), votingServer(node2ID));
         assertEquals("getRaftState", RaftRole.Leader, node2RaftActor.getRaftState());
index 42e3f7fe01e7b5fa6b18a0a3c15eeff21837ffde..c85e30c66ea0ecfe4a1fcaba953f53c5c6731fe8 100644 (file)
@@ -350,26 +350,18 @@ public class RaftActorTest extends AbstractActorTest {
 
     @Test
     public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
-        String persistenceId = factory.generateActorId("leader-");
-
-        DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
+        final var persistenceId = factory.generateActorId("leader-");
+        final var config = new DefaultConfigParamsImpl();
         config.setHeartBeatInterval(ONE_DAY);
-
-        DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
-        TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
-            stateDir(), Map.of(), config, dataPersistenceProvider), persistenceId);
-
-        MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
+        final var dataPersistenceProvider = mock(DataPersistenceProvider.class);
+        final var mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.props(persistenceId, stateDir(),
+            Map.of(), config, dataPersistenceProvider), persistenceId)
+            .underlyingActor();
         mockRaftActor.waitForInitializeBehaviorComplete();
-
         mockRaftActor.waitUntilLeader();
 
-        mockRaftActor.handleCommand(new ApplyJournalEntries(10));
-
-        verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Consumer.class));
+        mockRaftActor.getRaftActorContext().getReplicatedLog().markLastApplied();
+        verify(dataPersistenceProvider).markLastApplied(-1);
     }
 
     @Test
index 1049cad8605ac49ee2589b23c53e92832dad6e5d..a482e0a3396478fbf3a654997e3a8a8206773610 100644 (file)
@@ -11,7 +11,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 import java.util.Map;
-import org.apache.pekko.actor.ActorRef;
 import org.apache.pekko.persistence.SaveSnapshotSuccess;
 import org.apache.pekko.testkit.TestActorRef;
 import org.junit.Before;
@@ -37,7 +36,7 @@ public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrat
 
         waitUntilLeader(singleNodeActorRef);
 
-        ActorRef singleNodeCollectorActor = singleNodeActorRef.underlyingActor().collectorActor();
+        final var singleNodeCollectorActor = singleNodeActorRef.underlyingActor().collectorActor();
         final var singleNodeContext = singleNodeActorRef.underlyingActor().getRaftActorContext();
 
         InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 6, ApplyJournalEntries.class);
@@ -46,12 +45,12 @@ public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrat
         final MockCommand payload1 = sendPayloadData(singleNodeActorRef, "one");
         final MockCommand payload2 = sendPayloadData(singleNodeActorRef, "two");
 
-        MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 3);
+        verifyApplyIndex(singleNodeActorRef, 2);
 
         // this should trigger a snapshot
         final MockCommand payload3 = sendPayloadData(singleNodeActorRef, "three");
 
-        MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 4);
+        verifyApplyIndex(singleNodeActorRef, 3);
 
         //add 2 more
         final MockCommand payload4 = sendPayloadData(singleNodeActorRef, "four");
@@ -61,7 +60,7 @@ public class RecoveryIntegrationSingleNodeTest extends AbstractRaftActorIntegrat
         // Wait for snapshot complete.
         MessageCollectorActor.expectFirstMatching(singleNodeCollectorActor, SaveSnapshotSuccess.class);
 
-        MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 6);
+        verifyApplyIndex(singleNodeActorRef, 5);
 
         assertEquals("Last applied", 5, singleNodeContext.getReplicatedLog().getLastApplied());
 
index ec46540cd3fed1c368069ff72b381de0928e8ba6..85f2f4559683648145e1a3306e0e2038acf32b22 100644 (file)
@@ -18,7 +18,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.raft.SnapshotManager.ApplyLeaderSnapshot;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 
 /**
  * Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
@@ -66,7 +65,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         // Now deliver the AppendEntries to the follower
         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
 
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+        verifyApplyIndex(leaderActor, 4);
 
         // Now deliver the SaveSnapshotSuccess to the leader.
         final var saveSuccess = MessageCollectorActor.expectFirstMatching(
@@ -115,7 +114,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         // Now deliver the AppendEntries to the follower
         follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
 
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+        verifyApplyIndex(leaderActor, 4);
 
         reinstateLeaderActor();
 
@@ -147,9 +146,8 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         final MockCommand payload2 = sendPayloadData(leaderActor, "two");
 
         // Verify the leader applies the 3rd payload state.
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
-
-        MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
+        verifyApplyIndex(leaderActor, 2);
+        verifyApplyIndex(follower2Actor, 2);
 
         final var leaderLog = leaderContext.getReplicatedLog();
         assertEquals("Leader commit index", 2, leaderLog.getCommitIndex());
@@ -209,7 +207,7 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
         sendPayloadData(leaderActor, "three");
 
         MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+        verifyApplyIndex(leaderActor, 3);
 
         // Disconnect follower from leader
         killActor(follower1Actor);
@@ -265,12 +263,12 @@ public class RecoveryIntegrationTest extends AbstractRaftActorIntegrationTest {
 
         payload0 = sendPayloadData(leaderActor, "zero");
 
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+        verifyApplyIndex(leaderActor, 0);
 
         payload1 = sendPayloadData(leaderActor, "one");
 
         // Verify the leader applies the states.
-        MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+        verifyApplyIndex(leaderActor, 1);
 
         assertEquals("Leader last applied", 1, leaderContext.getReplicatedLog().getLastApplied());
 
index 7f8b19f18c6105bc64262df8c050a992ed748dcb..2e5d6da5e4c90b10e41f827ba667aa495e16744c 100644 (file)
@@ -110,14 +110,13 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
 
         // The followers should receive AppendEntries for each leader log entry that was recovered from
         // persistence and apply each one.
-        List<ApplyState> applyStates = MessageCollectorActor.expectMatching(
-                follower1CollectorActor, ApplyState.class, 3);
+        var applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
         verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
         verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
         verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
 
         // Verify follower 1 applies a log entry for at least the last entry index.
-        verifyApplyJournalEntries(follower1CollectorActor, 2);
+        verifyApplyIndex(follower1Actor, 2);
 
         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
         verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
@@ -125,7 +124,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
 
         // Verify follower 1]2 applies a log entry for at least the last entry index.
-        verifyApplyJournalEntries(follower2CollectorActor, 2);
+        verifyApplyIndex(follower2Actor, 2);
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
         MessageCollectorActor.clearMessages(follower1CollectorActor);
@@ -206,7 +205,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
         verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
 
-        verifyApplyJournalEntries(leaderCollectorActor, 3);
+        verifyApplyIndex(leaderActor, 3);
 
         final var leaderLog = leaderContext.getReplicatedLog();
         assertEquals("Leader commit index", 3, leaderLog.getCommitIndex());
@@ -214,12 +213,12 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
 
-        verifyApplyJournalEntries(follower1CollectorActor, 3);
+        verifyApplyIndex(follower1Actor, 3);
 
         applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
         verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
 
-        verifyApplyJournalEntries(follower2CollectorActor, 3);
+        verifyApplyIndex(follower2Actor, 3);
 
         assertEquals("Leader snapshot term", initialTerm, leaderLog.getSnapshotTerm());
         assertEquals("Leader snapshot index", 2, leaderLog.getSnapshotIndex());
@@ -264,7 +263,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
 
         // Verify the leader applies a log entry for at least the last entry index.
-        verifyApplyJournalEntries(leaderCollectorActor, 6);
+        verifyApplyIndex(leaderActor, 6);
 
         // The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
         // trimmed the in-memory log so that only the last entry remains.
@@ -284,7 +283,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
 
         // Verify follower 1 applies a log entry for at least the last entry index.
-        verifyApplyJournalEntries(follower1CollectorActor, 6);
+        verifyApplyIndex(follower1Actor, 6);
 
         // Verify follower 2 applies the states.
         applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
@@ -293,7 +292,7 @@ public class ReplicationAndSnapshotsIntegrationTest extends AbstractRaftActorInt
         verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
 
         // Verify follower 2 applies a log entry for at least the last entry index.
-        verifyApplyJournalEntries(follower2CollectorActor, 6);
+        verifyApplyIndex(follower2Actor, 6);
 
         MessageCollectorActor.clearMessages(leaderCollectorActor);
 
index 92c5d011668338f03c2d540032928ad194c03181..0a6cbf10a08923bde1cfb92824b978034066f682 100644 (file)
@@ -757,7 +757,7 @@ public class ReplicationAndSnapshotsWithLaggingFollowerIntegrationTest extends A
         verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
 
         // Verify the leader applies a log entry for at least the last entry index.
-        verifyApplyJournalEntries(leaderCollectorActor, 6);
+        verifyApplyIndex(leaderActor, 6);
 
         // Ensure there's at least 1 more heartbeat to trim the log.
         MessageCollectorActor.clearMessages(leaderCollectorActor);
index 70b52481c6e75cc38a74819cd96f9f525f7f37d2..bf4d16413248f7c8b27e4505ef9eaf43a7f60395 100644 (file)
@@ -69,7 +69,6 @@ import org.opendaylight.controller.cluster.raft.messages.InstallSnapshotReply;
 import org.opendaylight.controller.cluster.raft.messages.Payload;
 import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ByteState;
 import org.opendaylight.controller.cluster.raft.persisted.ByteStateSnapshotCohort;
 import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
@@ -1692,18 +1691,13 @@ public class LeaderTest extends AbstractLeaderTest<Leader> {
         final var leaderLog = leaderActorContext.getReplicatedLog();
         assertEquals(2, leaderLog.getCommitIndex());
 
-        final var applyJournalEntries = MessageCollectorActor.expectFirstMatching(leaderActor,
-            ApplyJournalEntries.class);
-
         assertEquals(2, leaderLog.getLastApplied());
 
-        assertEquals(2, applyJournalEntries.getToIndex());
-
         final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class);
 
         assertEquals(1,applyStateList.size());
 
-        ApplyState applyState = applyStateList.get(0);
+        final var applyState = applyStateList.getFirst();
 
         assertEquals(2, applyState.entry().index());