1 package org.opendaylight.controller.remote.rpc.registry;
4 import akka.actor.ActorPath;
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSelection;
7 import akka.actor.ActorSystem;
8 import akka.actor.ChildActorPath;
9 import akka.actor.Props;
10 import akka.testkit.JavaTestKit;
11 import com.google.common.base.Predicate;
12 import com.typesafe.config.ConfigFactory;
13 import org.junit.After;
14 import org.junit.AfterClass;
15 import org.junit.Before;
16 import org.junit.BeforeClass;
17 import org.junit.Test;
18 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
19 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
20 import org.opendaylight.controller.sal.connector.api.RpcRouter;
21 import org.opendaylight.controller.utils.ConditionalProbe;
22 import org.opendaylight.yangtools.yang.common.QName;
23 import scala.concurrent.duration.FiniteDuration;
25 import javax.annotation.Nullable;
27 import java.net.URISyntaxException;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.TimeUnit;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
36 public class RpcRegistryTest {
38 private static ActorSystem node1;
39 private static ActorSystem node2;
40 private static ActorSystem node3;
42 private ActorRef registry1;
43 private ActorRef registry2;
44 private ActorRef registry3;
47 public static void setup() throws InterruptedException {
48 node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
49 node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
50 node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
54 public static void teardown() {
55 JavaTestKit.shutdownActorSystem(node1);
56 JavaTestKit.shutdownActorSystem(node2);
57 JavaTestKit.shutdownActorSystem(node3);
68 public void createRpcRegistry() throws InterruptedException {
69 registry1 = node1.actorOf(Props.create(RpcRegistry.class));
70 registry2 = node2.actorOf(Props.create(RpcRegistry.class));
71 registry3 = node3.actorOf(Props.create(RpcRegistry.class));
75 public void stopRpcRegistry() throws InterruptedException {
76 if (registry1 != null)
77 node1.stop(registry1);
78 if (registry2 != null)
79 node2.stop(registry2);
80 if (registry3 != null)
81 node3.stop(registry3);
86 * 1. Register rpc, ensure router can be found
87 * 2. Then remove rpc, ensure its deleted
89 * @throws URISyntaxException
90 * @throws InterruptedException
93 public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
95 final JavaTestKit mockBroker = new JavaTestKit(node1);
97 final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
100 final JavaTestKit probe1 = createProbeForMessage(
101 node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
104 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
105 registry1.tell(getAddRouteMessage(), mockBroker.getRef());
107 //Bucket store should get an update bucket message. Updated bucket contains added rpc.
108 probe1.expectMsgClass(
109 FiniteDuration.apply(10, TimeUnit.SECONDS),
110 Messages.BucketStoreMessages.UpdateBucket.class);
113 registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
115 //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
116 probe1.expectMsgClass(
117 FiniteDuration.apply(10, TimeUnit.SECONDS),
118 Messages.BucketStoreMessages.UpdateBucket.class);
125 * Three node cluster.
126 * 1. Register rpc on 1 node, ensure 2nd node gets updated
127 * 2. Remove rpc on 1 node, ensure 2nd node gets updated
129 * @throws URISyntaxException
130 * @throws InterruptedException
133 public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
135 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
137 //install probe on node2's bucket store
138 final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
139 final JavaTestKit probe2 = createProbeForMessage(
140 node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
143 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
144 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
146 //Bucket store on node2 should get a message to update its local copy of remote buckets
147 probe2.expectMsgClass(
148 FiniteDuration.apply(10, TimeUnit.SECONDS),
149 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
152 registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
154 //Bucket store on node2 should get a message to update its local copy of remote buckets
155 probe2.expectMsgClass(
156 FiniteDuration.apply(10, TimeUnit.SECONDS),
157 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
162 * Three node cluster.
163 * Register rpc on 2 nodes. Ensure 3rd gets updated.
168 public void testRpcAddedOnMultiNodes() throws Exception {
170 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
171 final JavaTestKit mockBroker2 = new JavaTestKit(node2);
172 final JavaTestKit mockBroker3 = new JavaTestKit(node3);
174 registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
176 //install probe on node 3
177 final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
178 final JavaTestKit probe3 = createProbeForMessage(
179 node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
183 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
184 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
186 probe3.expectMsgClass(
187 FiniteDuration.apply(10, TimeUnit.SECONDS),
188 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
191 //Add same rpc on node 2
192 registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
193 registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
195 probe3.expectMsgClass(
196 FiniteDuration.apply(10, TimeUnit.SECONDS),
197 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
200 private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) {
201 final JavaTestKit probe = new JavaTestKit(node);
203 ConditionalProbe conditionalProbe =
204 new ConditionalProbe(probe.getRef(), new Predicate() {
206 public boolean apply(@Nullable Object input) {
207 return clazz.equals(input.getClass());
211 ActorSelection subject = node.actorSelection(subjectPath);
212 subject.tell(conditionalProbe, ActorRef.noSender());
218 private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
219 return new AddOrUpdateRoutes(createRouteIds());
222 private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
223 return new RemoveRoutes(createRouteIds());
226 private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
227 QName type = new QName(new URI("/mockrpc"), "mockrpc");
228 List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
229 routeIds.add(new RouteIdentifierImpl(null, type, null));