import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
-import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
-import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
-import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshotReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
+import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
+import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
+import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.util.StringIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
ApplyJournalEntries applyJournalEntries = new ApplyJournalEntries(2);
mockRaftActor.handleRecover(applyJournalEntries);
- ApplyLogEntries applyLogEntries = new ApplyLogEntries(0);
- mockRaftActor.handleRecover(applyLogEntries);
-
DeleteEntries deleteEntries = new DeleteEntries(1);
mockRaftActor.handleRecover(deleteEntries);
- org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries deprecatedDeleteEntries =
- new org.opendaylight.controller.cluster.raft.RaftActor.DeleteEntries(1);
- mockRaftActor.handleRecover(deprecatedDeleteEntries);
-
UpdateElectionTerm updateElectionTerm = new UpdateElectionTerm(5, "member2");
mockRaftActor.handleRecover(updateElectionTerm);
- org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm deprecatedUpdateElectionTerm =
- new org.opendaylight.controller.cluster.raft.RaftActor.UpdateElectionTerm(6, "member3");
- mockRaftActor.handleRecover(deprecatedUpdateElectionTerm);
-
verify(mockSupport).handleRecoveryMessage(same(snapshotOffer), any(PersistentDataProvider.class));
verify(mockSupport).handleRecoveryMessage(same(logEntry), any(PersistentDataProvider.class));
verify(mockSupport).handleRecoveryMessage(same(applyJournalEntries), any(PersistentDataProvider.class));
- verify(mockSupport).handleRecoveryMessage(same(applyLogEntries), any(PersistentDataProvider.class));
verify(mockSupport).handleRecoveryMessage(same(deleteEntries), any(PersistentDataProvider.class));
- verify(mockSupport).handleRecoveryMessage(same(deprecatedDeleteEntries), any(PersistentDataProvider.class));
verify(mockSupport).handleRecoveryMessage(same(updateElectionTerm), any(PersistentDataProvider.class));
- verify(mockSupport).handleRecoveryMessage(same(deprecatedUpdateElectionTerm), any(PersistentDataProvider.class));
}
@Test
ReplicatedLogEntry entry = new MockRaftActorContext.MockReplicatedLogEntry(1, 5,
new MockRaftActorContext.MockPayload("F"));
- final Identifier id = new StringIdentifier("apply-state");
+ final Identifier id = new MockIdentifier("apply-state");
mockRaftActor.onReceiveCommand(new ApplyState(mockActorRef, id, entry));
verify(mockRaftActor.actorDelegate).applyState(eq(mockActorRef), eq(id), anyObject());
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- leaderActor.getRaftActorContext().getSnapshotManager().commit(-1);
+ leaderActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
// capture snapshot reply should remove the snapshotted entries only
assertEquals(3, leaderActor.getReplicatedLog().size());
assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
- followerActor.getRaftActorContext().getSnapshotManager().commit(-1);
+ followerActor.getRaftActorContext().getSnapshotManager().commit(-1, -1);
// capture snapshot reply should remove the snapshotted entries only till replicatedToAllIndex
assertEquals(3, followerActor.getReplicatedLog().size()); //indexes 5,6,7 left in the log
assertEquals(RaftState.Leader, leaderActor.getCurrentBehavior().state());
// Persist another entry (this will cause a CaptureSnapshot to be triggered
- leaderActor.persistData(mockActorRef, new StringIdentifier("x"),
+ leaderActor.persistData(mockActorRef, new MockIdentifier("x"),
new MockRaftActorContext.MockPayload("duh"));
// Now send a CaptureSnapshotReply
}};
}
- @Test
- public void testRaftActorOnRecoverySnapshot() throws Exception {
- TEST_LOG.info("testRaftActorOnRecoverySnapshot");
-
- new JavaTestKit(getSystem()) {{
- String persistenceId = factory.generateActorId("follower-");
-
- DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
-
- // Set the heartbeat interval high to essentially disable election otherwise the test
- // may fail if the actor is switched to Leader
- config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
-
- ImmutableMap<String, String> peerAddresses = ImmutableMap.<String, String>builder().put("member1", "address").build();
-
- // Create mock ReplicatedLogEntry
- ReplicatedLogEntry replLogEntry = new MockRaftActorContext.MockReplicatedLogEntry(1,1,
- new MockRaftActorContext.MockPayload("F", 1));
-
- InMemoryJournal.addEntry(persistenceId, 1, replLogEntry);
-
- TestActorRef<MockRaftActor> ref = factory.createTestActor(
- MockRaftActor.props(persistenceId, peerAddresses, config));
-
- MockRaftActor mockRaftActor = ref.underlyingActor();
-
- mockRaftActor.waitForRecoveryComplete();
-
- mockRaftActor.waitForInitializeBehaviorComplete();
-
- verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
- }};
- }
-
@Test
public void testSwitchBehavior(){
String persistenceId = factory.generateActorId("leader-");
mockRaftActor.waitForRecoveryComplete();
- // Wait for snapshot after recovery
- verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
-
mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
MessageCollectorActor.clearMessages(notifierActor);
- raftActorRef.tell(LeaderTransitioning.INSTANCE, ActorRef.noSender());
+ raftActorRef.tell(new LeaderTransitioning("leader"), ActorRef.noSender());
leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());