1 package org.opendaylight.controller.remote.rpc.registry;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorRef;
5 import akka.actor.ActorSelection;
6 import akka.actor.ActorSystem;
7 import akka.actor.ChildActorPath;
8 import akka.actor.Props;
9 import akka.pattern.Patterns;
10 import akka.testkit.JavaTestKit;
11 import akka.util.Timeout;
12 import com.google.common.base.Predicate;
14 import java.net.URISyntaxException;
15 import java.util.ArrayList;
16 import java.util.List;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.TimeoutException;
19 import javax.annotation.Nullable;
20 import org.junit.After;
21 import org.junit.AfterClass;
22 import org.junit.Before;
23 import org.junit.BeforeClass;
24 import org.junit.Test;
25 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
26 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
31 import org.opendaylight.controller.sal.connector.api.RpcRouter;
32 import org.opendaylight.controller.utils.ConditionalProbe;
33 import org.opendaylight.yangtools.yang.common.QName;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
36 import scala.concurrent.duration.Duration;
37 import scala.concurrent.duration.FiniteDuration;
39 public class RpcRegistryTest {
41 private static ActorSystem node1;
42 private static ActorSystem node2;
43 private static ActorSystem node3;
45 private ActorRef registry1;
46 private ActorRef registry2;
47 private ActorRef registry3;
50 public static void staticSetup() throws InterruptedException {
51 RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
52 RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
53 RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
54 node1 = ActorSystem.create("opendaylight-rpc", config1.get());
55 node2 = ActorSystem.create("opendaylight-rpc", config2.get());
56 node3 = ActorSystem.create("opendaylight-rpc", config3.get());
60 public static void staticTeardown() {
61 JavaTestKit.shutdownActorSystem(node1);
62 JavaTestKit.shutdownActorSystem(node2);
63 JavaTestKit.shutdownActorSystem(node3);
68 registry1 = node1.actorOf(Props.create(RpcRegistry.class));
69 registry2 = node2.actorOf(Props.create(RpcRegistry.class));
70 registry3 = node3.actorOf(Props.create(RpcRegistry.class));
74 public void teardown() {
75 if (registry1 != null) {
76 node1.stop(registry1);
78 if (registry2 != null) {
79 node2.stop(registry2);
81 if (registry3 != null) {
82 node3.stop(registry3);
87 * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its
90 * @throws URISyntaxException
91 * @throws InterruptedException
94 public void testAddRemoveRpcOnSameNode() throws Exception {
96 System.out.println("testAddRemoveRpcOnSameNode starting");
98 final JavaTestKit mockBroker = new JavaTestKit(node1);
100 final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
103 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
106 final JavaTestKit probe1 = createProbeForMessage(node1, bucketStorePath,
107 Messages.BucketStoreMessages.UpdateBucket.class);
109 registry1.tell(getAddRouteMessage(), mockBroker.getRef());
111 // Bucket store should get an update bucket message. Updated bucket contains added rpc.
112 probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
113 Messages.BucketStoreMessages.UpdateBucket.class);
116 registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
118 // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
119 probe1.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
120 Messages.BucketStoreMessages.UpdateBucket.class);
122 System.out.println("testAddRemoveRpcOnSameNode ending");
127 * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on
128 * 1 node, ensure 2nd node gets updated
130 * @throws URISyntaxException
131 * @throws InterruptedException
134 public void testRpcAddRemoveInCluster() throws Exception {
136 System.out.println("testRpcAddRemoveInCluster starting");
138 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
140 // install probe on node2's bucket store
141 final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
142 final JavaTestKit probe2 = createProbeForMessage(node2, bucketStorePath,
143 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
146 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
147 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
149 // Bucket store on node2 should get a message to update its local copy of remote buckets
150 probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
151 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
154 registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
156 // Bucket store on node2 should get a message to update its local copy of remote buckets
157 probe2.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
158 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
160 System.out.println("testRpcAddRemoveInCluster ending");
164 * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
169 public void testRpcAddedOnMultiNodes() throws Exception {
171 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
172 final JavaTestKit mockBroker2 = new JavaTestKit(node2);
173 final JavaTestKit mockBroker3 = new JavaTestKit(node3);
175 registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
177 // install probe on node 3
178 final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
179 final JavaTestKit probe3 = createProbeForMessage(node3, bucketStorePath,
180 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
183 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
184 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
186 probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
187 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
189 // Add same rpc on node 2
190 registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
191 registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
193 probe3.expectMsgClass(FiniteDuration.apply(10, TimeUnit.SECONDS),
194 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
197 private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz)
199 final JavaTestKit probe = new JavaTestKit(node);
201 ConditionalProbe conditionalProbe = new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
203 public boolean apply(@Nullable Object input) {
205 return clazz.equals(input.getClass());
212 FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
213 Timeout timeout = new Timeout(duration);
217 ActorSelection subject = node.actorSelection(subjectPath);
218 Future<Object> future = Patterns.ask(subject, conditionalProbe, timeout);
221 Await.ready(future, duration);
223 } catch (TimeoutException | InterruptedException e) {
234 private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
235 return new AddOrUpdateRoutes(createRouteIds());
238 private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
239 return new RemoveRoutes(createRouteIds());
242 private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
243 QName type = new QName(new URI("/mockrpc"), "mockrpc");
244 List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
245 routeIds.add(new RouteIdentifierImpl(null, type, null));