Bug 6540: EOS - handle edge case with pruning pending owner change commits
[controller.git] / opendaylight / md-sal / sal-akka-raft / src / test / java / org / opendaylight / controller / cluster / raft / utils / MessageCollectorActor.java
index 6ea7a20924d4480b0497c02e715eefddee81556c..ea54146dc33744c97ad20f3b78376cf6cac62e29 100644 (file)
@@ -14,6 +14,8 @@ import akka.actor.UntypedActor;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
@@ -22,7 +24,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.junit.Assert;
-import org.opendaylight.controller.cluster.raft.SerializationUtils;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -31,7 +32,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 public class MessageCollectorActor extends UntypedActor {
     private static final String ARE_YOU_READY = "ARE_YOU_READY";
-    private static final String GET_ALL_MESSAGES = "get-all-messages";
+    public static final String GET_ALL_MESSAGES = "messages";
     private static final String CLEAR_MESSAGES = "clear-messages";
 
     private final List<Object> messages = new ArrayList<>();
@@ -45,9 +46,9 @@ public class MessageCollectorActor extends UntypedActor {
         if(GET_ALL_MESSAGES.equals(message)) {
             getSender().tell(new ArrayList<>(messages), getSelf());
         } else if(CLEAR_MESSAGES.equals(message)) {
-            messages.clear();
+            clear();
         } else if(message != null) {
-            messages.add(SerializationUtils.fromSerializable(message));
+            messages.add(message);
         }
     }
 
@@ -55,7 +56,7 @@ public class MessageCollectorActor extends UntypedActor {
         messages.clear();
     }
 
-    public static List<Object> getAllMessages(ActorRef actor) throws Exception {
+    private static List<Object> getAllMessages(ActorRef actor) throws Exception {
         FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
         Timeout operationTimeout = new Timeout(operationDuration);
         Future<Object> future = Patterns.ask(actor, GET_ALL_MESSAGES, operationTimeout);
@@ -78,7 +79,7 @@ public class MessageCollectorActor extends UntypedActor {
 
         for(Object message : allMessages){
             if(message.getClass().equals(clazz)){
-                return (T) message;
+                return clazz.cast(message);
             }
         }
 
@@ -86,11 +87,17 @@ public class MessageCollectorActor extends UntypedActor {
     }
 
     public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count) {
+        return expectMatching(actor, clazz, count, msg -> true);
+    }
+
+    public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count,
+            Predicate<T> matcher) {
         int timeout = 5000;
         List<T> messages = Collections.emptyList();
         for(int i = 0; i < timeout / 50; i++) {
             try {
                 messages = getAllMatching(actor, clazz);
+                Iterables.removeIf(messages, Predicates.not(matcher));
                 if(messages.size() >= count) {
                     return messages;
                 }
@@ -108,6 +115,7 @@ public class MessageCollectorActor extends UntypedActor {
         return expectFirstMatching(actor, clazz, 5000);
     }
 
+
     public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz, long timeout) {
         int count = (int) (timeout / 50);
         for(int i = 0; i < count; i++) {
@@ -147,6 +155,28 @@ public class MessageCollectorActor extends UntypedActor {
         return null;
     }
 
+    public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz) {
+        assertNoneMatching(actor, clazz, 5000);
+    }
+
+    public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz, long timeout) {
+        int count = (int) (timeout / 50);
+        for(int i = 0; i < count; i++) {
+            try {
+                T message = getFirstMatching(actor, clazz);
+                if(message != null) {
+                    Assert.fail("Unexpected message received" +  message.toString());
+                    return;
+                }
+            } catch (Exception e) {}
+
+            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+        }
+
+        return;
+    }
+
+
     public static <T> List<T> getAllMatching(ActorRef actor, Class<T> clazz) throws Exception {
         List<Object> allMessages = getAllMessages(actor);
 
@@ -154,7 +184,7 @@ public class MessageCollectorActor extends UntypedActor {
 
         for(Object message : allMessages){
             if(message.getClass().equals(clazz)){
-                output.add((T) message);
+                output.add(clazz.cast(message));
             }
         }