Bug 2415: Fixed intermittent RpcRegistry unit test failures 51/13151/1
authortpantelis <tpanteli@brocade.com>
Wed, 19 Nov 2014 13:30:20 +0000 (08:30 -0500)
committertpantelis <tpanteli@brocade.com>
Wed, 19 Nov 2014 13:30:20 +0000 (08:30 -0500)
The tests inject a ConditionalProbe into the BucketStore actor via a
message so it can verify the BuckStore received expected messages from
the RpcRegistry actor. The problem was that the BucketStore actor may
not have been created yet, as this is done async by akka, by the time
the test sends the ConditionalProbe message, in which case the message
went to dead letters resulting in a test failure.

To fix it, I modified the BucketStore to return a response on receiving
the ConditionalProbe so the tests can know deterministically that it
received it. I did this in a retry loop in the tests as I still saw
sporadic failures if just trying once.

After the fix, the RpcRegistry tests ran 900 times successfully.

Change-Id: I7094954c4a38493cc910723a5a6950c073e2f03a
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/BucketStore.java
opendaylight/md-sal/sal-remoterpc-connector/src/test/java/org/opendaylight/controller/remote/rpc/registry/RpcRegistryTest.java

index b50dfb1..6ffe147 100644 (file)
@@ -15,26 +15,24 @@ import akka.actor.Props;
 import akka.cluster.ClusterActorRefProvider;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
-import org.opendaylight.controller.utils.ConditionalProbe;
-
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
+import org.opendaylight.controller.utils.ConditionalProbe;
 
 /**
  * A store that syncs its data across nodes in the cluster.
@@ -82,8 +80,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         ActorRefProvider provider = getContext().provider();
         selfAddress = provider.getDefaultAddress();
 
-        if ( provider instanceof ClusterActorRefProvider)
+        if ( provider instanceof ClusterActorRefProvider) {
             getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
+        }
     }
 
 
@@ -94,8 +93,11 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         }
 
         if (message instanceof ConditionalProbe) {
+            // The ConditionalProbe is only used for unit tests.
             log.info("Received probe {} {}", getSelf(), message);
             probe = (ConditionalProbe) message;
+            // Send back any message to tell the caller we got the probe.
+            getSender().tell("Got it", getSelf());
         } else if (message instanceof UpdateBucket) {
             receiveUpdateBucket(((UpdateBucket) message).getBucket());
         } else if (message instanceof GetAllBuckets) {
@@ -184,13 +186,15 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         Map<Address, Bucket> buckets = new HashMap<>();
 
         //first add the local bucket if asked
-        if (members.contains(selfAddress))
+        if (members.contains(selfAddress)) {
             buckets.put(selfAddress, localBucket);
+        }
 
         //then get buckets for requested remote nodes
         for (Address address : members){
-            if (remoteBuckets.containsKey(address))
+            if (remoteBuckets.containsKey(address)) {
                 buckets.put(address, remoteBuckets.get(address));
+            }
         }
 
         return buckets;
@@ -214,7 +218,9 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
 
         if (receivedBuckets == null || receivedBuckets.isEmpty())
+         {
             return; //nothing to do
+        }
 
         //Remote cant update self's bucket
         receivedBuckets.remove(selfAddress);
@@ -222,15 +228,20 @@ public class BucketStore extends AbstractUntypedActorWithMetering {
         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
 
             Long localVersion = versions.get(entry.getKey());
-            if (localVersion == null) localVersion = -1L;
+            if (localVersion == null) {
+                localVersion = -1L;
+            }
 
             Bucket receivedBucket = entry.getValue();
 
-            if (receivedBucket == null)
+            if (receivedBucket == null) {
                 continue;
+            }
 
             Long remoteVersion = receivedBucket.getVersion();
-            if (remoteVersion == null) remoteVersion = -1L;
+            if (remoteVersion == null) {
+                remoteVersion = -1L;
+            }
 
             //update only if remote version is newer
             if ( remoteVersion.longValue() > localVersion.longValue() ) {
index ee96cb8..e0d145d 100644 (file)
@@ -1,14 +1,22 @@
 package org.opendaylight.controller.remote.rpc.registry;
 
-
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.ChildActorPath;
 import akka.actor.Props;
+import akka.pattern.Patterns;
 import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
 import com.google.common.base.Predicate;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -16,224 +24,226 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
+import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.utils.ConditionalProbe;
 import org.opendaylight.yangtools.yang.common.QName;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import javax.annotation.Nullable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
-import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
-
 public class RpcRegistryTest {
 
-  private static ActorSystem node1;
-  private static ActorSystem node2;
-  private static ActorSystem node3;
-
-  private ActorRef registry1;
-  private ActorRef registry2;
-  private ActorRef registry3;
-
-  @BeforeClass
-  public static void setup() throws InterruptedException {
-    RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
-    RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
-    RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
-    node1 = ActorSystem.create("opendaylight-rpc", config1.get());
-    node2 = ActorSystem.create("opendaylight-rpc", config2.get());
-    node3 = ActorSystem.create("opendaylight-rpc", config3.get());
-  }
-
-  @AfterClass
-  public static void teardown() {
-    JavaTestKit.shutdownActorSystem(node1);
-    JavaTestKit.shutdownActorSystem(node2);
-    JavaTestKit.shutdownActorSystem(node3);
-    if (node1 != null)
-      node1.shutdown();
-    if (node2 != null)
-      node2.shutdown();
-    if (node3 != null)
-      node3.shutdown();
-
-  }
-
-  @Before
-  public void createRpcRegistry() throws InterruptedException {
-    registry1 = node1.actorOf(Props.create(RpcRegistry.class));
-    registry2 = node2.actorOf(Props.create(RpcRegistry.class));
-    registry3 = node3.actorOf(Props.create(RpcRegistry.class));
-  }
-
-  @After
-  public void stopRpcRegistry() throws InterruptedException {
-    if (registry1 != null)
-      node1.stop(registry1);
-    if (registry2 != null)
-      node2.stop(registry2);
-    if (registry3 != null)
-      node3.stop(registry3);
-  }
-
-  /**
-   * One node cluster.
-   * 1. Register rpc, ensure router can be found
-   * 2. Then remove rpc, ensure its deleted
-   *
-   * @throws URISyntaxException
-   * @throws InterruptedException
-   */
-  @Test
-  public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
-
-    final JavaTestKit mockBroker = new JavaTestKit(node1);
-
-    final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
-
-    //install probe
-    final JavaTestKit probe1 = createProbeForMessage(
-        node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
-
-    //Add rpc on node 1
-    registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
-    registry1.tell(getAddRouteMessage(), mockBroker.getRef());
-
-    //Bucket store should get an update bucket message. Updated bucket contains added rpc.
-    probe1.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateBucket.class);
-
-    //Now remove rpc
-    registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
-
-    //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
-    probe1.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateBucket.class);
-
-
-  }
-
-
-  /**
-   * Three node cluster.
-   * 1. Register rpc on 1 node, ensure 2nd node gets updated
-   * 2. Remove rpc on 1 node, ensure 2nd node gets updated
-   *
-   * @throws URISyntaxException
-   * @throws InterruptedException
-   */
-  @Test
-  public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
-
-    final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-
-    //install probe on node2's bucket store
-    final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
-    final JavaTestKit probe2 = createProbeForMessage(
-        node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-    //Add rpc on node 1
-    registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-    registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
-    //Bucket store on node2 should get a message to update its local copy of remote buckets
-    probe2.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-    //Now remove
-    registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
-
-    //Bucket store on node2 should get a message to update its local copy of remote buckets
-    probe2.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-  }
-
-  /**
-   * Three node cluster.
-   * Register rpc on 2 nodes. Ensure 3rd gets updated.
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testRpcAddedOnMultiNodes() throws Exception {
-
-    final JavaTestKit mockBroker1 = new JavaTestKit(node1);
-    final JavaTestKit mockBroker2 = new JavaTestKit(node2);
-    final JavaTestKit mockBroker3 = new JavaTestKit(node3);
-
-    registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
-
-    //install probe on node 3
-    final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
-    final JavaTestKit probe3 = createProbeForMessage(
-        node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
-
-    //Add rpc on node 1
-    registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
-    registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
-
-    probe3.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-
+    private static ActorSystem node1;
+    private static ActorSystem node2;
+    private static ActorSystem node3;
+
+    private ActorRef registry1;
+    private ActorRef registry2;
+    private ActorRef registry3;
+
+    @BeforeClass
+    public static void staticSetup() throws InterruptedException {
+      RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
+      RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
+      RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
+      node1 = ActorSystem.create("opendaylight-rpc", config1.get());
+      node2 = ActorSystem.create("opendaylight-rpc", config2.get());
+      node3 = ActorSystem.create("opendaylight-rpc", config3.get());
+    }
+
+    @AfterClass
+    public static void staticTeardown() {
+      JavaTestKit.shutdownActorSystem(node1);
+      JavaTestKit.shutdownActorSystem(node2);
+      JavaTestKit.shutdownActorSystem(node3);
+    }
+
+    @Before
+    public void setup() {
+        registry1 = node1.actorOf(Props.create(RpcRegistry.class));
+        registry2 = node2.actorOf(Props.create(RpcRegistry.class));
+        registry3 = node3.actorOf(Props.create(RpcRegistry.class));
+    }
+
+    @After
+    public void teardown() {
+        if (registry1 != null) {
+            node1.stop(registry1);
+        }
+        if (registry2 != null) {
+            node2.stop(registry2);
+        }
+        if (registry3 != null) {
+            node3.stop(registry3);
+        }
+    }
+
+    /**
+     * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its
+     * deleted
+     *
+     * @throws URISyntaxException
+     * @throws InterruptedException
+     */
+    @Test
+    public void testAddRemoveRpcOnSameNode() throws Exception {
+
+        System.out.println("testAddRemoveRpcOnSameNode starting");
+
+        final JavaTestKit mockBroker = new JavaTestKit(node1);
+
+        final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
+
+        // Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
+
+        // install probe
+        final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath,
+                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        registry1.tell(getAddRouteMessage(), mockBroker.getRef());
+
+        // Bucket store should get an update bucket message. Updated bucket contains added rpc.
+        probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        // Now remove rpc
+        registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
+
+        // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
+        probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateBucket.class);
+
+        System.out.println("testAddRemoveRpcOnSameNode ending");
+
+    }
+
+    /**
+     * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on
+     * 1 node, ensure 2nd node gets updated
+     *
+     * @throws URISyntaxException
+     * @throws InterruptedException
+     */
+    @Test
+    public void testRpcAddRemoveInCluster() throws Exception {
+
+        System.out.println("testRpcAddRemoveInCluster starting");
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+
+        // install probe on node2's bucket store
+        final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
+        final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath,
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        // Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+
+        // Bucket store on node2 should get a message to update its local copy of remote buckets
+        probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        // Now remove
+        registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
+
+        // Bucket store on node2 should get a message to update its local copy of remote buckets
+        probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        System.out.println("testRpcAddRemoveInCluster ending");
+    }
+
+    /**
+     * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRpcAddedOnMultiNodes() throws Exception {
+
+        final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+        final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+        final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+
+        registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
+
+        // install probe on node 3
+        final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
+        final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath,
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+        // Add rpc on node 1
+        registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+        registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
 
-    //Add same rpc on node 2
-    registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
-    registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+        probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
 
-    probe3.expectMsgClass(
-        FiniteDuration.apply(10, TimeUnit.SECONDS),
-        Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
-  }
-
-  private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz) {
-    final JavaTestKit probe = new JavaTestKit(node);
+        // Add same rpc on node 2
+        registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+        registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
 
-    ConditionalProbe conditionalProbe =
-        new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
-          @Override
-          public boolean apply(@Nullable Object input) {
-              if (input != null)
-                return clazz.equals(input.getClass());
-              else
-                  return false;
-          }
-        });
-
-    ActorSelection subject = node.actorSelection(subjectPath);
-    subject.tell(conditionalProbe, ActorRef.noSender());
-
-    return probe;
+        probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
+                Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+    }
 
-  }
+    private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz)
+            throws Exception {
+        final JavaTestKit probe = new JavaTestKit(node);
 
-  private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
-    return new AddOrUpdateRoutes(createRouteIds());
-  }
-
-  private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
-    return new RemoveRoutes(createRouteIds());
-  }
+        ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
+            @Override
+            public boolean apply(@Nullable Object input) {
+                if (input != null) {
+                    return clazz.equals(input.getClass());
+                } else {
+                    return false;
+                }
+            }
+        });
 
-  private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
-    QName type = new QName(new URI("/mockrpc"), "mockrpc");
-    List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
-    routeIds.add(new RouteIdentifierImpl(null, type, null));
-    return routeIds;
-  }
+        FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
+        Timeout timeout = new Timeout(duration);
+        int maxTries = 30;
+        int i = 0;
+        while(true) {
+            ActorSelection subject = node.actorSelection(subjectPath);
+            Future<Object> future = Patterns.ask(subject, conditionalProbe, timeout);
+
+            try {
+                Await.ready(future, duration);
+                break;
+            } catch (TimeoutException | InterruptedException e) {
+                if(++i > maxTries) {
+                    throw e;
+                }
+            }
+        }
+
+        return probe;
+
+    }
+
+    private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
+        return new AddOrUpdateRoutes(createRouteIds());
+    }
+
+    private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
+        return new RemoveRoutes(createRouteIds());
+    }
+
+    private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
+        QName type = new QName(new URI("/mockrpc"), "mockrpc");
+        List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
+        routeIds.add(new RouteIdentifierImpl(null, type, null));
+        return routeIds;
+    }
 
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.