1 package org.opendaylight.controller.remote.rpc.registry;
4 import akka.actor.ActorPath;
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSelection;
7 import akka.actor.ActorSystem;
8 import akka.actor.ChildActorPath;
9 import akka.actor.Props;
10 import akka.testkit.JavaTestKit;
11 import com.google.common.base.Predicate;
12 import com.typesafe.config.ConfigFactory;
14 import org.junit.After;
15 import org.junit.AfterClass;
16 import org.junit.Assert;
17 import org.junit.Before;
18 import org.junit.BeforeClass;
19 import org.junit.Test;
20 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
22 import org.opendaylight.controller.sal.connector.api.RpcRouter;
23 import org.opendaylight.controller.utils.ConditionalProbe;
24 import org.opendaylight.yangtools.yang.common.QName;
25 import scala.concurrent.Await;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.FiniteDuration;
29 import javax.annotation.Nullable;
31 import java.net.URISyntaxException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.concurrent.TimeUnit;
36 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
37 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
38 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
40 public class RpcRegistryTest {
42 private static ActorSystem node1;
43 private static ActorSystem node2;
44 private static ActorSystem node3;
46 private ActorRef registry1;
47 private ActorRef registry2;
48 private ActorRef registry3;
51 public static void setup() throws InterruptedException {
52 node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
53 node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
54 node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
58 public static void teardown() {
59 JavaTestKit.shutdownActorSystem(node1);
60 JavaTestKit.shutdownActorSystem(node2);
61 JavaTestKit.shutdownActorSystem(node3);
72 public void createRpcRegistry() throws InterruptedException {
73 registry1 = node1.actorOf(Props.create(RpcRegistry.class));
74 registry2 = node2.actorOf(Props.create(RpcRegistry.class));
75 registry3 = node3.actorOf(Props.create(RpcRegistry.class));
79 public void stopRpcRegistry() throws InterruptedException {
80 if (registry1 != null)
81 node1.stop(registry1);
82 if (registry2 != null)
83 node2.stop(registry2);
84 if (registry3 != null)
85 node3.stop(registry3);
90 * 1. Register rpc, ensure router can be found
91 * 2. Then remove rpc, ensure its deleted
93 * @throws URISyntaxException
94 * @throws InterruptedException
97 public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
98 validateSystemStartup();
100 final JavaTestKit mockBroker = new JavaTestKit(node1);
102 final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
105 final JavaTestKit probe1 = createProbeForMessage(
106 node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
109 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
110 registry1.tell(getAddRouteMessage(), mockBroker.getRef());
112 //Bucket store should get an update bucket message. Updated bucket contains added rpc.
113 probe1.expectMsgClass(
114 FiniteDuration.apply(10, TimeUnit.SECONDS),
115 Messages.BucketStoreMessages.UpdateBucket.class);
118 registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
120 //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
121 probe1.expectMsgClass(
122 FiniteDuration.apply(10, TimeUnit.SECONDS),
123 Messages.BucketStoreMessages.UpdateBucket.class);
130 * Three node cluster.
131 * 1. Register rpc on 1 node, ensure 2nd node gets updated
132 * 2. Remove rpc on 1 node, ensure 2nd node gets updated
134 * @throws URISyntaxException
135 * @throws InterruptedException
138 public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
140 validateSystemStartup();
142 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
144 //install probe on node2's bucket store
145 final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
146 final JavaTestKit probe2 = createProbeForMessage(
147 node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
151 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
152 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
154 //Bucket store on node2 should get a message to update its local copy of remote buckets
155 probe2.expectMsgClass(
156 FiniteDuration.apply(10, TimeUnit.SECONDS),
157 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
160 registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
162 //Bucket store on node2 should get a message to update its local copy of remote buckets
163 probe2.expectMsgClass(
164 FiniteDuration.apply(10, TimeUnit.SECONDS),
165 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
170 * Three node cluster.
171 * Register rpc on 2 nodes. Ensure 3rd gets updated.
176 public void testRpcAddedOnMultiNodes() throws Exception {
178 validateSystemStartup();
180 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
181 final JavaTestKit mockBroker2 = new JavaTestKit(node2);
182 final JavaTestKit mockBroker3 = new JavaTestKit(node3);
184 registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
186 //install probe on node 3
187 final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
188 final JavaTestKit probe3 = createProbeForMessage(
189 node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
193 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
194 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
196 probe3.expectMsgClass(
197 FiniteDuration.apply(10, TimeUnit.SECONDS),
198 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
201 //Add same rpc on node 2
202 registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
203 registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
205 probe3.expectMsgClass(
206 FiniteDuration.apply(10, TimeUnit.SECONDS),
207 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
210 private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) {
211 final JavaTestKit probe = new JavaTestKit(node);
213 ConditionalProbe conditionalProbe =
214 new ConditionalProbe(probe.getRef(), new Predicate() {
216 public boolean apply(@Nullable Object input) {
217 return clazz.equals(input.getClass());
221 ActorSelection subject = node.actorSelection(subjectPath);
222 subject.tell(conditionalProbe, ActorRef.noSender());
228 private void validateSystemStartup() throws InterruptedException {
230 ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
231 ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
232 ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
234 ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
235 ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
236 ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
239 if (!resolveReference(gossiper1, gossiper2, gossiper3))
240 Assert.fail("Could not find gossipers");
243 private Boolean resolveReference(ActorSelection... gossipers) {
245 Boolean resolved = true;
246 for (int i = 0; i < 5; i++) {
249 System.out.println(System.currentTimeMillis() + " Resolving gossipers; trial #" + i);
251 for (ActorSelection gossiper : gossipers) {
255 Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(15000, TimeUnit.MILLISECONDS));
256 ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
257 } catch (Exception e) {
258 System.out.println("Could not find gossiper in attempt#" + i + ". Got exception " + e.getMessage());
271 private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
272 return new AddOrUpdateRoutes(createRouteIds());
275 private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
276 return new RemoveRoutes(createRouteIds());
279 private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
280 QName type = new QName(new URI("/mockrpc"), "mockrpc");
281 List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
282 routeIds.add(new RouteIdentifierImpl(null, type, null));