Bug 6540: Fix journal issues on leader changes
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / AbstractRaftActorIntegrationTest.java
index 5d3b2aace82d254f74aa478d5c9921c0abf4d9a1..6e2f25b6892d600f086fa1eccb35c80fc5f03f78 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
+import static akka.pattern.Patterns.ask;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import akka.actor.ActorRef;
@@ -16,6 +17,8 @@ import akka.actor.Terminated;
 import akka.dispatch.Dispatchers;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+import akka.util.Timeout;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
@@ -24,12 +27,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 import org.junit.After;
 import org.opendaylight.controller.cluster.raft.MockRaftActorContext.MockPayload;
 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
 import org.opendaylight.controller.cluster.raft.base.messages.CaptureSnapshotReply;
 import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
 import org.opendaylight.controller.cluster.raft.behaviors.RaftActorBehavior;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
 import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@@ -40,6 +47,7 @@ import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.util.AbstractStringIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -78,7 +86,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     public static class TestRaftActor extends MockRaftActor {
 
         private final TestActorRef<MessageCollectorActor> collectorActor;
-        private final Map<Class<?>, Boolean> dropMessages = new ConcurrentHashMap<>();
+        private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
 
         private TestRaftActor(Builder builder) {
             super(builder);
@@ -86,7 +94,11 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         }
 
         public void startDropMessages(Class<?> msgClass) {
-            dropMessages.put(msgClass, Boolean.TRUE);
+            dropMessages.put(msgClass, msg -> true);
+        }
+
+        <T> void startDropMessages(Class<T> msgClass, Predicate<T> filter) {
+            dropMessages.put(msgClass, filter);
         }
 
         public void stopDropMessages(Class<?> msgClass) {
@@ -97,6 +109,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             getRaftActorContext().setTotalMemoryRetriever(mockTotalMemory > 0 ? () -> mockTotalMemory : null);
         }
 
+        @SuppressWarnings({ "rawtypes", "unchecked" })
         @Override
         public void handleCommand(Object message) {
             if(message instanceof MockPayload) {
@@ -117,7 +130,8 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
             }
 
             try {
-                if(!dropMessages.containsKey(message.getClass())) {
+                Predicate drop = dropMessages.get(message.getClass());
+                if(drop == null || !drop.test(message)) {
                     super.handleCommand(message);
                 }
             } finally {
@@ -206,7 +220,7 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
     protected DefaultConfigParamsImpl newLeaderConfigParams() {
         DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
         configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
-        configParams.setElectionTimeoutFactor(1);
+        configParams.setElectionTimeoutFactor(4);
         configParams.setSnapshotBatchCount(snapshotBatchCount);
         configParams.setSnapshotDataThresholdPercentage(70);
         configParams.setIsolatedLeaderCheckInterval(new FiniteDuration(1, TimeUnit.DAYS));
@@ -355,4 +369,26 @@ public abstract class AbstractRaftActorIntegrationTest extends AbstractActorTest
         assertEquals(name + " replicatedToAllIndex", replicatedToAllIndex,
                 actor.getCurrentBehavior().getReplicatedToAllIndex());
     }
+
+    static void verifyRaftState(ActorRef raftActor, Consumer<OnDemandRaftState> verifier) {
+        Timeout timeout = new Timeout(500, TimeUnit.MILLISECONDS);
+        AssertionError lastError = null;
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+            try {
+                OnDemandRaftState raftState = (OnDemandRaftState)Await.result(ask(raftActor,
+                        GetOnDemandRaftState.INSTANCE, timeout), timeout.duration());
+                verifier.accept(raftState);
+                return;
+            } catch (AssertionError e) {
+                lastError = e;
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            } catch (Exception e) {
+                lastError = new AssertionError("OnDemandRaftState failed", e);
+                Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        throw lastError;
+    }
 }