Adjust Tx rate limiter for unused transactions
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / RaftActorTest.java
index c0bdc53c51f27907da2de0e2399d619cee8fdda1..34932c7249a38f753856cdb99addf72d221f0722 100644 (file)
@@ -54,6 +54,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.DataPersistenceProviderMonitor;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RoleChanged;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
@@ -64,14 +65,15 @@ import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotRep
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.Follower;
 import org.opendaylight.controller.cluster.raft.behaviors.Leader;
+import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
 import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
-import org.opendaylight.controller.cluster.raft.utils.MockAkkaJournal;
-import org.opendaylight.controller.cluster.raft.utils.MockSnapshotStore;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -89,13 +91,13 @@ public class RaftActorTest extends AbstractActorTest {
     @After
     public void tearDown() throws Exception {
         factory.close();
-        MockAkkaJournal.clearJournal();
-        MockSnapshotStore.setMockSnapshot(null);
+        InMemoryJournal.clear();
+        InMemorySnapshotStore.clear();
     }
 
     public static class MockRaftActor extends RaftActor {
 
-        private final DataPersistenceProvider dataPersistenceProvider;
+        protected DataPersistenceProvider dataPersistenceProvider;
         private final RaftActor delegate;
         private final CountDownLatch recoveryComplete = new CountDownLatch(1);
         private final List<Object> state;
@@ -178,7 +180,7 @@ public class RaftActorTest extends AbstractActorTest {
 
         @Override protected void applyState(ActorRef clientActor, String identifier, Object data) {
             delegate.applyState(clientActor, identifier, data);
-            LOG.info("applyState called");
+            LOG.info("{}: applyState called", persistenceId());
         }
 
         @Override
@@ -220,10 +222,12 @@ public class RaftActorTest extends AbstractActorTest {
         }
 
         @Override protected void createSnapshot() {
+            LOG.info("{}: createSnapshot called", persistenceId());
             delegate.createSnapshot();
         }
 
         @Override protected void applySnapshot(byte [] snapshot) {
+            LOG.info("{}: applySnapshot called", persistenceId());
             delegate.applySnapshot(snapshot);
         }
 
@@ -271,7 +275,7 @@ public class RaftActorTest extends AbstractActorTest {
     }
 
 
-    private static class RaftActorTestKit extends JavaTestKit {
+    public static class RaftActorTestKit extends JavaTestKit {
         private final ActorRef raftActor;
 
         public RaftActorTestKit(ActorSystem actorSystem, String actorName) {
@@ -307,7 +311,7 @@ public class RaftActorTest extends AbstractActorTest {
             waitUntilLeader(raftActor);
         }
 
-        protected void waitUntilLeader(ActorRef actorRef) {
+        public static void waitUntilLeader(ActorRef actorRef) {
             FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
             for(int i = 0; i < 20 * 5; i++) {
                 Future<Object> future = Patterns.ask(actorRef, new FindLeader(), new Timeout(duration));
@@ -377,8 +381,7 @@ public class RaftActorTest extends AbstractActorTest {
             Snapshot snapshot = Snapshot.create(snapshotBytes.toByteArray(),
                     snapshotUnappliedEntries, lastIndexDuringSnapshotCapture, 1,
                     lastAppliedDuringSnapshotCapture, 1);
-            MockSnapshotStore.setMockSnapshot(snapshot);
-            MockSnapshotStore.setPersistenceId(persistenceId);
+            InMemorySnapshotStore.addSnapshot(persistenceId, snapshot);
 
             // add more entries after snapshot is taken
             List<ReplicatedLogEntry> entries = new ArrayList<>();
@@ -395,12 +398,11 @@ public class RaftActorTest extends AbstractActorTest {
             int lastAppliedToState = 5;
             int lastIndex = 7;
 
-            MockAkkaJournal.addToJournal(5, entry2);
+            InMemoryJournal.addEntry(persistenceId, 5, entry2);
             // 2 entries are applied to state besides the 4 entries in snapshot
-            MockAkkaJournal.addToJournal(6, new ApplyJournalEntries(lastAppliedToState));
-            MockAkkaJournal.addToJournal(7, entry3);
-            MockAkkaJournal.addToJournal(8, entry4);
-
+            InMemoryJournal.addEntry(persistenceId, 6, new ApplyJournalEntries(lastAppliedToState));
+            InMemoryJournal.addEntry(persistenceId, 7, entry3);
+            InMemoryJournal.addEntry(persistenceId, 8, entry4);
 
             // kill the actor
             followerActor.tell(PoisonPill.getInstance(), null);
@@ -442,10 +444,10 @@ public class RaftActorTest extends AbstractActorTest {
                     new MockRaftActorContext.MockPayload("two"));
 
             long seqNr = 1;
-            MockAkkaJournal.addToJournal(seqNr++, entry0);
-            MockAkkaJournal.addToJournal(seqNr++, entry1);
-            MockAkkaJournal.addToJournal(seqNr++, new ApplyLogEntries(1));
-            MockAkkaJournal.addToJournal(seqNr++, entry2);
+            InMemoryJournal.addEntry(persistenceId, seqNr++, entry0);
+            InMemoryJournal.addEntry(persistenceId, seqNr++, entry1);
+            InMemoryJournal.addEntry(persistenceId, seqNr++, new ApplyLogEntries(1));
+            InMemoryJournal.addEntry(persistenceId, seqNr++, entry2);
 
             int lastAppliedToState = 1;
             int lastIndex = 2;
@@ -944,7 +946,8 @@ public class RaftActorTest extends AbstractActorTest {
     @Test
     public void testRaftRoleChangeNotifier() throws Exception {
         new JavaTestKit(getSystem()) {{
-            ActorRef notifierActor = factory.createActor(Props.create(MessageCollectorActor.class));
+            TestActorRef<MessageCollectorActor> notifierActor = factory.createTestActor(
+                    Props.create(MessageCollectorActor.class));
             MessageCollectorActor.waitUntilReady(notifierActor);
 
             DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
@@ -954,20 +957,10 @@ public class RaftActorTest extends AbstractActorTest {
 
             String persistenceId = factory.generateActorId("notifier-");
 
-            factory.createTestActor(MockRaftActor.props(persistenceId,
+            TestActorRef<MockRaftActor> raftActorRef = factory.createTestActor(MockRaftActor.props(persistenceId,
                     Collections.<String, String>emptyMap(), Optional.<ConfigParams>of(config), notifierActor), persistenceId);
 
-            List<RoleChanged> matches =  null;
-            for(int i = 0; i < 5000 / heartBeatInterval; i++) {
-                matches = MessageCollectorActor.getAllMatching(notifierActor, RoleChanged.class);
-                assertNotNull(matches);
-                if(matches.size() == 3) {
-                    break;
-                }
-                Uninterruptibles.sleepUninterruptibly(heartBeatInterval, TimeUnit.MILLISECONDS);
-            }
-
-            assertEquals(3, matches.size());
+            List<RoleChanged> matches =  MessageCollectorActor.expectMatching(notifierActor, RoleChanged.class, 3);
 
             // check if the notifier got a role change from null to Follower
             RoleChanged raftRoleChanged = matches.get(0);
@@ -986,6 +979,41 @@ public class RaftActorTest extends AbstractActorTest {
             assertEquals(persistenceId, raftRoleChanged.getMemberId());
             assertEquals(RaftState.Candidate.name(), raftRoleChanged.getOldRole());
             assertEquals(RaftState.Leader.name(), raftRoleChanged.getNewRole());
+
+            LeaderStateChanged leaderStateChange = MessageCollectorActor.expectFirstMatching(
+                    notifierActor, LeaderStateChanged.class);
+
+            assertEquals(raftRoleChanged.getMemberId(), leaderStateChange.getLeaderId());
+
+            notifierActor.underlyingActor().clear();
+
+            MockRaftActor raftActor = raftActorRef.underlyingActor();
+            final String newLeaderId = "new-leader";
+            Follower follower = new Follower(raftActor.getRaftActorContext()) {
+                @Override
+                public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+                    leaderId = newLeaderId;
+                    return this;
+                }
+            };
+
+            raftActor.changeCurrentBehavior(follower);
+
+            leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+            assertEquals(persistenceId, leaderStateChange.getMemberId());
+            assertEquals(null, leaderStateChange.getLeaderId());
+
+            raftRoleChanged = MessageCollectorActor.expectFirstMatching(notifierActor, RoleChanged.class);
+            assertEquals(RaftState.Leader.name(), raftRoleChanged.getOldRole());
+            assertEquals(RaftState.Follower.name(), raftRoleChanged.getNewRole());
+
+            notifierActor.underlyingActor().clear();
+
+            raftActor.handleCommand("any");
+
+            leaderStateChange = MessageCollectorActor.expectFirstMatching(notifierActor, LeaderStateChanged.class);
+            assertEquals(persistenceId, leaderStateChange.getMemberId());
+            assertEquals(newLeaderId, leaderStateChange.getLeaderId());
         }};
     }