+ //Now remove
+ registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
+
+ //Bucket store on node2 should get a message to update its local copy of remote buckets
+ probe2.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+ }
+
+ /**
+ * Three node cluster.
+ * Register rpc on 2 nodes. Ensure 3rd gets updated.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRpcAddedOnMultiNodes() throws Exception {
+
+ final JavaTestKit mockBroker1 = new JavaTestKit(node1);
+ final JavaTestKit mockBroker2 = new JavaTestKit(node2);
+ final JavaTestKit mockBroker3 = new JavaTestKit(node3);
+
+ registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
+
+ //install probe on node 3
+ final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
+ final JavaTestKit probe3 = createProbeForMessage(
+ node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+
+ //Add rpc on node 1
+ registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
+ registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
+
+ probe3.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+
+
+ //Add same rpc on node 2
+ registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
+ registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
+
+ probe3.expectMsgClass(
+ FiniteDuration.apply(10, TimeUnit.SECONDS),
+ Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
+ }
+
+ private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) {
+ final JavaTestKit probe = new JavaTestKit(node);
+
+ ConditionalProbe conditionalProbe =
+ new ConditionalProbe(probe.getRef(), new Predicate() {
+ @Override
+ public boolean apply(@Nullable Object input) {
+ if (input != null)
+ return clazz.equals(input.getClass());
+ else
+ return false;