import org.junit.Test;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RoleChanged;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.behaviors.Follower;
import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
-import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
@After
public void tearDown() throws Exception {
factory.close();
- MockAkkaJournal.clearJournal();
- MockSnapshotStore.setMockSnapshot(null);
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
public static class MockRaftActor extends RaftActor {
- private final DataPersistenceProvider dataPersistenceProvider;
+ protected DataPersistenceProvider dataPersistenceProvider;
private final RaftActor delegate;
private final CountDownLatch recoveryComplete = new CountDownLatch(1);
private final List<Object> state;
@Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
delegate.applyState(clientActor, identifier, data);
- LOG.info("applyState called");
+ LOG.info("{}: applyState called", persistenceId());
}
@Override
}
@Override protected void createSnapshot() {
+ LOG.info("{}: createSnapshot called", persistenceId());
delegate.createSnapshot();
}
@Override protected void applySnapshot(byte [] snapshot) {
+ LOG.info("{}: applySnapshot called", persistenceId());
delegate.applySnapshot(snapshot);
}
}
- private static class RaftActorTestKit extends JavaTestKit {
+ public static class RaftActorTestKit extends JavaTestKit {
private final ActorRef raftActor;
public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
waitUntilLeader(raftActor);
}
- protected void waitUntilLeader(ActorRef actorRef) {
+ public static void waitUntilLeader(ActorRef actorRef) {
FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 20 * 5; i++) {
Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
lastAppliedDuringSnapshotCapture, 1);
- MockSnapshotStore.setMockSnapshot(snapshot);
- MockSnapshotStore.setPersistenceId(persistenceId);
+ InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
// add more entries after snapshot is taken
List<ReplicatedLogEntry> entries = new ArrayList<>();
int lastAppliedToState = 5;
int lastIndex = 7;
- MockAkkaJournal.addToJournal(5, entry2);
+ InMemoryJournal.addEntry(persistenceId, 5, entry2);
// 2 entries are applied to state besides the 4 entries in snapshot
- MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
- MockAkkaJournal.addToJournal(7, entry3);
- MockAkkaJournal.addToJournal(8, entry4);
-
+ InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
+ InMemoryJournal.addEntry(persistenceId, 7, entry3);
+ InMemoryJournal.addEntry(persistenceId, 8, entry4);
// kill the actor
followerActor.tell(PoisonPill.getInstance(), null);
new MockRaftActorContext.MockPayload("two"));
long seqNr = 1;
- MockAkkaJournal.addToJournal(seqNr++, entry0);
- MockAkkaJournal.addToJournal(seqNr++, entry1);
- MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
- MockAkkaJournal.addToJournal(seqNr++, entry2);
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
+ InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
+ InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
int lastAppliedToState = 1;
int lastIndex = 2;
@Test
public void testRaftRoleChangeNotifier() throws Exception {
new JavaTestKit(getSystem()) {{
- ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+ TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+ Props.create(MessageCollectorActor.class));
MessageCollectorActor.waitUntilReady(notifierActor);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
String persistenceId = factory.generateActorId("notifier-");
- factory.createTestActor(MockRaftActor.props(persistenceId,
+ TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
- List<RoleChanged> matches = null;
- for(int i = 0; i < 5000 / heartBeatInterval; i++) {
- matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
- assertNotNull(matches);
- if(matches.size() == 3) {
- break;
- }
- Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
- }
-
- assertEquals(3, matches.size());
+ List<RoleChanged> matches = MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
// check if the notifier got a role change from null to Follower
RoleChanged raftRoleChanged = matches.get(0);
assertEquals(persistenceId, raftRoleChanged.getMemberId());
assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+
+ LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+ notifierActor, LeaderStateChanged.class);
+
+ assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+
+ notifierActor.underlyingActor().clear();
+
+ MockRaftActor raftActor = raftActorRef.underlyingActor();
+ final String newLeaderId = "new-leader";
+ Follower follower = new Follower(raftActor.getRaftActorContext()) {
+ @Override
+ public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ leaderId = newLeaderId;
+ return this;
+ }
+ };
+
+ raftActor.changeCurrentBehavior(follower);
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(null, leaderStateChange.getLeaderId());
+
+ raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+ assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+ assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+ notifierActor.underlyingActor().clear();
+
+ raftActor.handleCommand("any");
+
+ leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+ assertEquals(persistenceId, leaderStateChange.getMemberId());
+ assertEquals(newLeaderId, leaderStateChange.getLeaderId());
}};
}