actor.persistAsync(entry, callback);
}
+ @Override
+ public void markLastApplied(final long lastApplied) {
+ actor.markLastApplied(lastApplied);
+ }
+
@Override
public void saveSnapshot(final Snapshot snapshot) {
actor.saveSnapshot(snapshot);
}
possiblyHandleBehaviorMessage(applyState);
- } else if (message instanceof ApplyJournalEntries applyEntries) {
- LOG.debug("{}: Persisting ApplyJournalEntries with index={}", memberId(), applyEntries.getToIndex());
- persistence().persistAsync(applyEntries, unused -> { });
} else if (message instanceof FindLeader) {
getSender().tell(new FindLeaderReply(getLeaderAddress()), self());
} else if (message instanceof GetOnDemandRaftState) {
// Apply the state immediately.
applyCommand(identifier, persistedEntry);
- // Send a ApplyJournalEntries message so that we write the fact that we applied
- // the state to durable storage
- self().tell(new ApplyJournalEntries(persistedEntry.index()), self());
-
+ // We have finished applying the command, tell ReplicatedLog about that
+ currentLog.markLastApplied();
} else {
currentLog.captureSnapshotIfReady(persistedEntry);
super.persistAsync(entry, callback::accept);
}
+ final void markLastApplied(final long lastApplied) {
+ LOG.debug("{}: Persisting ApplyJournalEntries with index={}", memberId(), lastApplied);
+ super.persistAsync(new ApplyJournalEntries(lastApplied), unused -> {
+ // No-op
+ });
+ }
+
@Override
@Deprecated(since = "11.0.0", forRemoval = true)
public final void loadSnapshot(final String persistenceId, final SnapshotSelectionCriteria criteria,
*/
void setLastApplied(long lastApplied);
+ /**
+ * Mark the current value {@link #getLastApplied()} for recovery purposes.
+ */
+ void markLastApplied();
+
/**
* Removes entries from the in-memory log starting at the given index. This method exists only to deal with the
* effects of {@link #trimToReceive(long)} with Pekko Persistence.
callback.accept(entry);
}
}
+
+ @Override
+ public void markLastApplied() {
+ context.getPersistenceProvider().markLastApplied(getLastApplied());
+ }
}
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.spi.LogEntry;
import org.opendaylight.raft.api.RaftRole;
import org.opendaylight.raft.api.TermInfo;
// send a message to persist a ApplyLogEntries marker message into akka's persistent journal
// will be used during recovery
- //in case if the above code throws an error and this message is not sent, it would be fine
+ // in case if the above code throws an error and this message is not sent, it would be fine
// as the append entries received later would initiate add this message to the journal
- actor().tell(new ApplyJournalEntries(replLog.getLastApplied()), actor());
+ replLog.markLastApplied();
}
/**
/**
* Serialization proxy for {@link ApplyJournalEntries}.
*/
-final class AJE implements Externalizable {
+@Deprecated(since = "11.0.0", forRemoval = true)
+final class AJE implements PekkoPersistenceContract, Externalizable {
@java.io.Serial
private static final long serialVersionUID = 1L;
package org.opendaylight.controller.cluster.raft.persisted;
import java.io.Serializable;
-import org.apache.pekko.dispatch.ControlMessage;
/**
* This is an internal message that is stored in the akka's persistent journal. During recovery, this
* message is used to apply recovered journal entries to the state whose indexes range from the context's
- * current lastApplied index to "toIndex" contained in the message. This message is sent internally from a
- * behavior to the RaftActor to persist.
+ * current lastApplied index to "toIndex" contained in the message.
*
* @author Thomas Pantelis
*/
-public final class ApplyJournalEntries implements Serializable, ControlMessage {
+@Deprecated(since = "11.0.0", forRemoval = true)
+public final class ApplyJournalEntries implements PekkoPersistenceContract, Serializable {
@java.io.Serial
private static final long serialVersionUID = 1L;
*/
long lastSequenceNumber();
+ /**
+ * Record a known value of {@code lastApplied} as a recovery optimization. If we can recover this information,
+ * recovery can re-apply these entries before we attempt to talk to other members. It is okay to lose this marker,
+ * as in that case we will just apply those entries as part of being a follower or becoming a leader.
+ *
+ * <p>This amounts to persisting a lower bound on {@code commitIndex}, which is explicitly volatile state. We could
+ * remember that instead (or perhaps as well) -- but now we just derive it.
+ *
+ * <p>If we later discover that this index lies beyond current leader's {@code commitIndex}, we will ask for
+ * a complete snapshot -- which is not particularly nice, but should happen seldom enough for it not to matter much.
+ *
+ * @param lastApplied lastApplied index to remember
+ */
+ void markLastApplied(long lastApplied);
+
/**
* Receive and potentially handle a {@link JournalProtocol} response.
*
delegate().persistAsync(entry, callback);
}
+ @Override
+ public void markLastApplied(final long lastApplied) {
+ delegate().markLastApplied(lastApplied);
+ }
+
@Override
public void saveSnapshot(final Snapshot entry) {
delegate().saveSnapshot(entry);
actor().executeInSelf(() -> callback.accept(entry));
}
+ @Override
+ default void markLastApplied(final long lastApplied) {
+ // No-op
+ }
+
@Override
default void deleteSnapshots(final long maxTimestamp) {
// no-op
package org.opendaylight.controller.cluster.raft;
import static java.util.Objects.requireNonNull;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Comparator;
import org.apache.commons.io.FileUtils;
import org.apache.pekko.actor.ActorSystem;
+import org.apache.pekko.testkit.TestActorRef;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.junit.After;
import org.junit.AfterClass;
protected static void deleteJournal() throws IOException {
FileUtils.deleteDirectory(Path.of("journal").toFile());
}
+
+ protected static final void verifyApplyIndex(final TestActorRef<? extends RaftActor> actor, final long expIndex) {
+ await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
+ assertEquals(expIndex, actor.underlyingActor().getRaftActorContext().getReplicatedLog().getLastApplied());
+ });
+ }
}
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.Payload;
-import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ClusterConfig;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.spi.LogEntry;
testkit.unwatch(actor);
}
- protected void verifyApplyJournalEntries(final ActorRef actor, final long expIndex) {
- MessageCollectorActor.expectFirstMatching(actor, ApplyJournalEntries.class,
- msg -> msg.getToIndex() == expIndex);
- }
-
protected void verifySnapshot(final String prefix, final Snapshot snapshot, final long lastAppliedTerm,
final long lastAppliedIndex, final long lastTerm, final long lastIndex) {
assertEquals(prefix + " Snapshot getLastAppliedTerm", lastAppliedTerm, snapshot.getLastAppliedTerm());
// Send an initial payloads and verify replication.
- final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
- final MockCommand payload1 = sendPayloadData(leaderActor, "one");
- verifyApplyJournalEntries(leaderCollectorActor, 1);
- verifyApplyJournalEntries(follower1CollectorActor, 1);
- verifyApplyJournalEntries(follower2CollectorActor, 1);
+ final var payload0 = sendPayloadData(leaderActor, "zero");
+ final var payload1 = sendPayloadData(leaderActor, "one");
+ verifyApplyIndex(leaderActor, 1);
+ verifyApplyIndex(follower1Actor, 1);
+ verifyApplyIndex(follower2Actor, 1);
isolateLeader();
testLog.info("Sending payload to new leader");
- final MockCommand newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
- verifyApplyJournalEntries(follower1CollectorActor, 2);
- verifyApplyJournalEntries(follower2CollectorActor, 2);
+ final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ verifyApplyIndex(follower1Actor, 2);
+ verifyApplyIndex(follower2Actor, 2);
final var follower1log = follower1Context.getReplicatedLog();
assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
// The previous leader has a conflicting log entry at index 2 with a different term which should get
// replaced by the new leader's index 1 entry.
- verifyApplyJournalEntries(leaderCollectorActor, 2);
+ verifyApplyIndex(leaderActor, 2);
final var leaderLog = leaderContext.getReplicatedLog();
assertEquals("Prior leader journal last term", currentTerm, leaderLog.lastTerm());
// Submit an initial payload that is committed/applied on all nodes.
final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
- verifyApplyJournalEntries(leaderCollectorActor, 0);
- verifyApplyJournalEntries(follower1CollectorActor, 0);
- verifyApplyJournalEntries(follower2CollectorActor, 0);
+ verifyApplyIndex(leaderActor, 0);
+ verifyApplyIndex(follower1Actor, 0);
+ verifyApplyIndex(follower2Actor, 0);
// Submit another payload that is replicated to all followers and committed on the leader but the leader is
// isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
&& ae.getEntries().getFirst().command().equals(payload1));
- verifyApplyJournalEntries(leaderCollectorActor, 1);
+ verifyApplyIndex(leaderActor, 1);
isolateLeader();
testLog.info("Sending payload to new leader");
- final MockCommand newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
- verifyApplyJournalEntries(follower1CollectorActor, 3);
- verifyApplyJournalEntries(follower2CollectorActor, 3);
+ final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
+ verifyApplyIndex(follower1Actor, 3);
+ verifyApplyIndex(follower2Actor, 3);
final var follower1log = follower1Context.getReplicatedLog();
assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
// The previous leader has a conflicting log entry at index 2 with a different term which should get
// replaced by the new leader's entry.
- verifyApplyJournalEntries(leaderCollectorActor, 3);
+ verifyApplyIndex(leaderActor, 3);
verifyRaftState(leaderActor, raftState -> {
final var leaderLog = leaderContext.getReplicatedLog();
// Submit an initial payload that is committed/applied on all nodes.
final MockCommand payload0 = sendPayloadData(leaderActor, "zero");
- verifyApplyJournalEntries(leaderCollectorActor, 0);
- verifyApplyJournalEntries(follower1CollectorActor, 0);
- verifyApplyJournalEntries(follower2CollectorActor, 0);
+ verifyApplyIndex(leaderActor, 0);
+ verifyApplyIndex(follower1Actor, 0);
+ verifyApplyIndex(follower2Actor, 0);
// Submit another payload that is replicated to all followers and committed on the leader but the leader is
// isolated before the entry is committed on the followers. To accomplish this we drop the AppendEntries
ae.getEntries().size() == 1 && ae.getEntries().getFirst().index() == 1
&& ae.getEntries().getFirst().command().equals(payload1));
- verifyApplyJournalEntries(leaderCollectorActor, 1);
+ verifyApplyIndex(leaderActor, 1);
isolateLeader();
final var newLeaderPayload2 = sendPayloadData(follower1Actor, "two-new");
final var newLeaderPayload3 = sendPayloadData(follower1Actor, "three-new");
final var newLeaderPayload4 = sendPayloadData(follower1Actor, "four-new");
- verifyApplyJournalEntries(follower1CollectorActor, 5);
- verifyApplyJournalEntries(follower2CollectorActor, 5);
+ verifyApplyIndex(follower1Actor, 5);
+ verifyApplyIndex(follower2Actor, 5);
final var follower1log = follower1Context.getReplicatedLog();
assertEquals("Follower 1 journal last term", currentTerm, follower1log.lastTerm());
// The previous leader has conflicting log entries starting at index 2 with different terms which should get
// replaced by the new leader's entries.
- verifyApplyJournalEntries(leaderCollectorActor, 5);
+ verifyApplyIndex(leaderActor, 5);
verifyRaftState(leaderActor, raftState -> {
final var leaderLog = leaderContext.getReplicatedLog();
}
return true;
}
+
+ @Override
+ public void markLastApplied() {
+ // No-op
+ }
}
public static final class Builder {
return true;
}
+ @Override
+ public void markLastApplied() {
+ // No-op
+ }
+
@Override
public void captureSnapshotIfReady(final EntryMeta replicatedLogEntry) {
// No-op
reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
- ApplyJournalEntries apply = MessageCollectorActor.expectFirstMatching(node1Collector,
- ApplyJournalEntries.class);
- assertEquals("getToIndex", 1, apply.getToIndex());
+ verifyApplyIndex(node1RaftActorRef, 1);
verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
nonVotingServer("downNode2"));
assertTrue("isVotingMember", node1RaftActor.getRaftActorContext().isVotingMember());
assertEquals("getRaftState", RaftRole.Leader, node1RaftActor.getRaftState());
- apply = MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
- assertEquals("getToIndex", 1, apply.getToIndex());
+ verifyApplyIndex(node2RaftActorRef, 1);
verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
votingServer(node1ID), votingServer(node2ID), nonVotingServer("downNode1"),
nonVotingServer("downNode2"));
// is behind node2's so node2 should not grant node1's vote. This should cause node1 to time out and
// forward the request to node2.
- ChangeServersVotingStatus changeServers = new ChangeServersVotingStatus(
- Map.of(node1ID, true, node2ID, true));
+ final var changeServers = new ChangeServersVotingStatus(Map.of(node1ID, true, node2ID, true));
node1RaftActorRef.tell(changeServers, testKit.getRef());
- ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
+ final var reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
- MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+ verifyApplyIndex(node2RaftActorRef, 2);
verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
votingServer(node1ID), votingServer(node2ID));
assertEquals("getRaftState", RaftRole.Leader, node2RaftActor.getRaftState());
- MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+ verifyApplyIndex(node1RaftActorRef, 2);
verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
votingServer(node1ID), votingServer(node2ID));
assertTrue("isVotingMember", node1RaftActor.getRaftActorContext().isVotingMember());
ServerChangeReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ServerChangeReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, reply.getStatus());
- MessageCollectorActor.expectFirstMatching(node1Collector, ApplyJournalEntries.class);
+ verifyApplyIndex(node1RaftActorRef, 1);
verifyServerConfigurationPayloadEntry(node1RaftActor.getRaftActorContext().getReplicatedLog(),
votingServer(node1ID), votingServer(node2ID));
assertTrue("isVotingMember", node1RaftActor.getRaftActorContext().isVotingMember());
assertEquals("getRaftState", RaftRole.Follower, node1RaftActor.getRaftState());
- MessageCollectorActor.expectFirstMatching(node2Collector, ApplyJournalEntries.class);
+ verifyApplyIndex(node2RaftActorRef, 1);
+
verifyServerConfigurationPayloadEntry(node2RaftActor.getRaftActorContext().getReplicatedLog(),
votingServer(node1ID), votingServer(node2ID));
assertEquals("getRaftState", RaftRole.Leader, node2RaftActor.getRaftState());
@Test
public void testApplyJournalEntriesCallsDataPersistence() throws Exception {
- String persistenceId = factory.generateActorId("leader-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
+ final var persistenceId = factory.generateActorId("leader-");
+ final var config = new DefaultConfigParamsImpl();
config.setHeartBeatInterval(ONE_DAY);
-
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
-
- TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
- stateDir(), Map.of(), config, dataPersistenceProvider), persistenceId);
-
- MockRaftActor mockRaftActor = mockActorRef.underlyingActor();
-
+ final var dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ final var mockRaftActor = factory.<MockRaftActor>createTestActor(MockRaftActor.props(persistenceId, stateDir(),
+ Map.of(), config, dataPersistenceProvider), persistenceId)
+ .underlyingActor();
mockRaftActor.waitForInitializeBehaviorComplete();
-
mockRaftActor.waitUntilLeader();
- mockRaftActor.handleCommand(new ApplyJournalEntries(10));
-
- verify(dataPersistenceProvider).persistAsync(any(ApplyJournalEntries.class), any(Consumer.class));
+ mockRaftActor.getRaftActorContext().getReplicatedLog().markLastApplied();
+ verify(dataPersistenceProvider).markLastApplied(-1);
}
@Test
import java.util.List;
import java.util.Map;
-import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.persistence.SaveSnapshotSuccess;
import org.apache.pekko.testkit.TestActorRef;
import org.junit.Before;
waitUntilLeader(singleNodeActorRef);
- ActorRef singleNodeCollectorActor = singleNodeActorRef.underlyingActor().collectorActor();
+ final var singleNodeCollectorActor = singleNodeActorRef.underlyingActor().collectorActor();
final var singleNodeContext = singleNodeActorRef.underlyingActor().getRaftActorContext();
InMemoryJournal.addWriteMessagesCompleteLatch(persistenceId, 6, ApplyJournalEntries.class);
final MockCommand payload1 = sendPayloadData(singleNodeActorRef, "one");
final MockCommand payload2 = sendPayloadData(singleNodeActorRef, "two");
- MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 3);
+ verifyApplyIndex(singleNodeActorRef, 2);
// this should trigger a snapshot
final MockCommand payload3 = sendPayloadData(singleNodeActorRef, "three");
- MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 4);
+ verifyApplyIndex(singleNodeActorRef, 3);
//add 2 more
final MockCommand payload4 = sendPayloadData(singleNodeActorRef, "four");
// Wait for snapshot complete.
MessageCollectorActor.expectFirstMatching(singleNodeCollectorActor, SaveSnapshotSuccess.class);
- MessageCollectorActor.expectMatching(singleNodeCollectorActor, ApplyJournalEntries.class, 6);
+ verifyApplyIndex(singleNodeActorRef, 5);
assertEquals("Last applied", 5, singleNodeContext.getReplicatedLog().getLastApplied());
import org.junit.Test;
import org.opendaylight.controller.cluster.raft.SnapshotManager.ApplyLeaderSnapshot;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
-import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
/**
* Tests raft actor persistence recovery end-to-end using real RaftActors and behavior communication.
// Now deliver the AppendEntries to the follower
follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+ verifyApplyIndex(leaderActor, 4);
// Now deliver the SaveSnapshotSuccess to the leader.
final var saveSuccess = MessageCollectorActor.expectFirstMatching(
// Now deliver the AppendEntries to the follower
follower1Actor.underlyingActor().stopDropMessages(AppendEntries.class);
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+ verifyApplyIndex(leaderActor, 4);
reinstateLeaderActor();
final MockCommand payload2 = sendPayloadData(leaderActor, "two");
// Verify the leader applies the 3rd payload state.
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
-
- MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyJournalEntries.class, 1);
+ verifyApplyIndex(leaderActor, 2);
+ verifyApplyIndex(follower2Actor, 2);
final var leaderLog = leaderContext.getReplicatedLog();
assertEquals("Leader commit index", 2, leaderLog.getCommitIndex());
sendPayloadData(leaderActor, "three");
MessageCollectorActor.expectFirstMatching(leaderCollectorActor, SaveSnapshotSuccess.class);
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+ verifyApplyIndex(leaderActor, 3);
// Disconnect follower from leader
killActor(follower1Actor);
payload0 = sendPayloadData(leaderActor, "zero");
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 1);
+ verifyApplyIndex(leaderActor, 0);
payload1 = sendPayloadData(leaderActor, "one");
// Verify the leader applies the states.
- MessageCollectorActor.expectMatching(leaderCollectorActor, ApplyJournalEntries.class, 2);
+ verifyApplyIndex(leaderActor, 1);
assertEquals("Leader last applied", 1, leaderContext.getReplicatedLog().getLastApplied());
// The followers should receive AppendEntries for each leader log entry that was recovered from
// persistence and apply each one.
- List<ApplyState> applyStates = MessageCollectorActor.expectMatching(
- follower1CollectorActor, ApplyState.class, 3);
+ var applyStates = MessageCollectorActor.expectMatching(follower1CollectorActor, ApplyState.class, 3);
verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
verifyApplyState(applyStates.get(1), null, null, initialTerm, 1, recoveredPayload1);
verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
// Verify follower 1 applies a log entry for at least the last entry index.
- verifyApplyJournalEntries(follower1CollectorActor, 2);
+ verifyApplyIndex(follower1Actor, 2);
applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
verifyApplyState(applyStates.get(0), null, null, initialTerm, 0, recoveredPayload0);
verifyApplyState(applyStates.get(2), null, null, initialTerm, 2, recoveredPayload2);
// Verify follower 1]2 applies a log entry for at least the last entry index.
- verifyApplyJournalEntries(follower2CollectorActor, 2);
+ verifyApplyIndex(follower2Actor, 2);
MessageCollectorActor.clearMessages(leaderCollectorActor);
MessageCollectorActor.clearMessages(follower1CollectorActor);
ApplyState applyState = MessageCollectorActor.expectFirstMatching(leaderCollectorActor, ApplyState.class);
verifyApplyState(applyState, leaderCollectorActor, payload3.toString(), currentTerm, 3, payload3);
- verifyApplyJournalEntries(leaderCollectorActor, 3);
+ verifyApplyIndex(leaderActor, 3);
final var leaderLog = leaderContext.getReplicatedLog();
assertEquals("Leader commit index", 3, leaderLog.getCommitIndex());
applyState = MessageCollectorActor.expectFirstMatching(follower1CollectorActor, ApplyState.class);
verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
- verifyApplyJournalEntries(follower1CollectorActor, 3);
+ verifyApplyIndex(follower1Actor, 3);
applyState = MessageCollectorActor.expectFirstMatching(follower2CollectorActor, ApplyState.class);
verifyApplyState(applyState, null, null, currentTerm, 3, payload3);
- verifyApplyJournalEntries(follower2CollectorActor, 3);
+ verifyApplyIndex(follower2Actor, 3);
assertEquals("Leader snapshot term", initialTerm, leaderLog.getSnapshotTerm());
assertEquals("Leader snapshot index", 2, leaderLog.getSnapshotIndex());
verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
// Verify the leader applies a log entry for at least the last entry index.
- verifyApplyJournalEntries(leaderCollectorActor, 6);
+ verifyApplyIndex(leaderActor, 6);
// The leader should have performed fake snapshots due to the follower's AppendEntriesReplies and
// trimmed the in-memory log so that only the last entry remains.
verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
// Verify follower 1 applies a log entry for at least the last entry index.
- verifyApplyJournalEntries(follower1CollectorActor, 6);
+ verifyApplyIndex(follower1Actor, 6);
// Verify follower 2 applies the states.
applyStates = MessageCollectorActor.expectMatching(follower2CollectorActor, ApplyState.class, 3);
verifyApplyState(applyStates.get(2), null, null, currentTerm, 6, payload6);
// Verify follower 2 applies a log entry for at least the last entry index.
- verifyApplyJournalEntries(follower2CollectorActor, 6);
+ verifyApplyIndex(follower2Actor, 6);
MessageCollectorActor.clearMessages(leaderCollectorActor);
verifyApplyState(applyStates.get(2), leaderCollectorActor, payload6.toString(), currentTerm, 6, payload6);
// Verify the leader applies a log entry for at least the last entry index.
- verifyApplyJournalEntries(leaderCollectorActor, 6);
+ verifyApplyIndex(leaderActor, 6);
// Ensure there's at least 1 more heartbeat to trim the log.
MessageCollectorActor.clearMessages(leaderCollectorActor);
import org.opendaylight.controller.cluster.raft.messages.Payload;
import org.opendaylight.controller.cluster.raft.messages.RaftRPC;
import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
-import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.ByteState;
import org.opendaylight.controller.cluster.raft.persisted.ByteStateSnapshotCohort;
import org.opendaylight.controller.cluster.raft.persisted.SimpleReplicatedLogEntry;
final var leaderLog = leaderActorContext.getReplicatedLog();
assertEquals(2, leaderLog.getCommitIndex());
- final var applyJournalEntries = MessageCollectorActor.expectFirstMatching(leaderActor,
- ApplyJournalEntries.class);
-
assertEquals(2, leaderLog.getLastApplied());
- assertEquals(2, applyJournalEntries.getToIndex());
-
final var applyStateList = MessageCollectorActor.getAllMatching(leaderActor, ApplyState.class);
assertEquals(1,applyStateList.size());
- ApplyState applyState = applyStateList.get(0);
+ final var applyState = applyStateList.getFirst();
assertEquals(2, applyState.entry().index());