import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotMetadata;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
+import org.opendaylight.controller.cluster.raft.ServerConfigurationPayload.ServerInfo;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
import org.opendaylight.controller.cluster.raft.base.messages.DeleteEntries;
+import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.base.messages.UpdateElectionTerm;
factory = new TestActorFactory(getSystem());
}
+ @SuppressWarnings("unchecked")
+ private static DataPersistenceProvider mockPersistenceProvider() {
+ final DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ doReturn(false).when(dataPersistenceProvider).isRecoveryApplicable();
+ doReturn(0L).when(dataPersistenceProvider).getLastSequenceNumber();
+ doNothing().when(dataPersistenceProvider).saveSnapshot(any(Object.class));
+ doNothing().when(dataPersistenceProvider).persist(any(Object.class), any(Procedure.class));
+ doNothing().when(dataPersistenceProvider).deleteSnapshots(any(SnapshotSelectionCriteria.class));
+ doNothing().when(dataPersistenceProvider).deleteMessages(0L);
+
+ return dataPersistenceProvider;
+ }
+
@After
public void tearDown() throws Exception {
factory.close();
mockRaftActor.waitForRecoveryComplete();
RaftActorRecoverySupport mockSupport = mock(RaftActorRecoverySupport.class);
- mockRaftActor.setRaftActorRecoverySupport(mockSupport );
+ doReturn(false).when(mockSupport).handleRecoveryMessage(any(Object.class), any(PersistentDataProvider.class));
+ mockRaftActor.setRaftActorRecoverySupport(mockSupport);
Snapshot snapshot = Snapshot.create(new byte[]{1}, Collections.<ReplicatedLogEntry>emptyList(), 3, 1, 3, 1);
SnapshotOffer snapshotOffer = new SnapshotOffer(new SnapshotMetadata("test", 6, 12345), snapshot);
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
TestActorRef<MockRaftActor> mockActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), config, dataPersistenceProvider), persistenceId);
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(follower1Id, followerActor1.path().toString());
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(leaderId, leaderActor1.path().toString());
config.setHeartBeatInterval(new FiniteDuration(1, TimeUnit.DAYS));
config.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
Map<String, String> peerAddresses = new HashMap<>();
peerAddresses.put(follower1Id, followerActor1.path().toString());
String persistenceId = factory.generateActorId("follower-");
ImmutableMap<String, String> peerAddresses =
ImmutableMap.<String, String>builder().put("member1", "address").build();
- DataPersistenceProvider dataPersistenceProvider = mock(DataPersistenceProvider.class);
+ DataPersistenceProvider dataPersistenceProvider = mockPersistenceProvider();
TestActorRef<MockRaftActor> actorRef = factory.createTestActor(
MockRaftActor.props(persistenceId, peerAddresses, emptyConfig, dataPersistenceProvider), persistenceId);
verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(any(ActorRef.class));
mockRaftActor.snapshotCohortDelegate = mock(RaftActorSnapshotCohort.class);
+ doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
mockRaftActor.getSnapshotMessageSupport().setSnapshotReplyActorTimeout(Duration.create(200, TimeUnit.MILLISECONDS));
reset(mockRaftActor.snapshotCohortDelegate);
+ doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
Failure failure = kit.expectMsgClass(akka.actor.Status.Failure.class);
mockRaftActor.setPersistence(false);
reset(mockRaftActor.snapshotCohortDelegate);
+ doNothing().when(mockRaftActor.snapshotCohortDelegate).createSnapshot(any(ActorRef.class));
raftActorRef.tell(GetSnapshot.INSTANCE, kit.getRef());
reply = kit.expectMsgClass(GetSnapshotReply.class);
TEST_LOG.info("testRestoreFromSnapshotWithRecoveredData ending");
}
+
+ @Test
+ public void testNonVotingOnRecovery() throws Exception {
+ TEST_LOG.info("testNonVotingOnRecovery starting");
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setElectionTimeoutFactor(1);
+ config.setHeartBeatInterval(FiniteDuration.create(1, TimeUnit.MILLISECONDS));
+
+ String persistenceId = factory.generateActorId("test-actor-");
+ InMemoryJournal.addEntry(persistenceId, 1, new MockRaftActorContext.MockReplicatedLogEntry(1, 0,
+ new ServerConfigurationPayload(Arrays.asList(new ServerInfo(persistenceId, false)))));
+
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+ config(config).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
+ // Sleep a bit and verify it didn't get an election timeout and schedule an election.
+
+ Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
+ assertEquals("getRaftState", RaftState.Follower, mockRaftActor.getRaftState());
+
+ TEST_LOG.info("testNonVotingOnRecovery ending");
+ }
+
+ @Test
+ public void testLeaderTransitioning() throws Exception {
+ TEST_LOG.info("testLeaderTransitioning starting");
+
+ TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+ Props.create(MessageCollectorActor.class));
+
+ DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
+ config.setCustomRaftPolicyImplementationClass(DisableElectionsRaftPolicy.class.getName());
+
+ String persistenceId = factory.generateActorId("test-actor-");
+
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.builder().id(persistenceId).
+ config(config).roleChangeNotifier(notifierActor).props().withDispatcher(Dispatchers.DefaultDispatcherId()), persistenceId);
+ MockRaftActor mockRaftActor = raftActorRef.underlyingActor();
+
+ mockRaftActor.waitForInitializeBehaviorComplete();
+
+ raftActorRef.tell(new AppendEntries(1L, "leader", 0L, 1L, Collections.<ReplicatedLogEntry>emptyList(),
+ 0L, -1L, (short)1), ActorRef.noSender());
+ LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+ notifierActor, LeaderStateChanged.class);
+ assertEquals("getLeaderId", "leader", leaderStateChange.getLeaderId());
+
+ MessageCollectorActor.clearMessages(notifierActor);
+
+ raftActorRef.tell(new LeaderTransitioning(), ActorRef.noSender());
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals("getMemberId", persistenceId, leaderStateChange.getMemberId());
+ assertEquals("getLeaderId", null, leaderStateChange.getLeaderId());
+
+ TEST_LOG.info("testLeaderTransitioning ending");
+ }
}