1 package org.opendaylight.controller.remote.rpc.registry;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorRef;
5 import akka.actor.ActorSelection;
6 import akka.actor.ActorSystem;
7 import akka.actor.ChildActorPath;
8 import akka.actor.Props;
10 import akka.testkit.JavaTestKit;
11 import com.typesafe.config.ConfigFactory;
12 import org.junit.After;
13 import org.junit.AfterClass;
14 import org.junit.Assert;
15 import org.junit.Before;
16 import org.junit.BeforeClass;
17 import org.junit.Test;
18 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.api.RpcRouter;
20 import org.opendaylight.yangtools.yang.common.QName;
21 import scala.concurrent.Await;
22 import scala.concurrent.Future;
23 import scala.concurrent.duration.FiniteDuration;
26 import java.net.URISyntaxException;
27 import java.util.List;
28 import java.util.concurrent.TimeUnit;
30 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoute;
31 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
35 public class RpcRegistryTest {
37 private static ActorSystem node1;
38 private static ActorSystem node2;
39 private static ActorSystem node3;
41 private ActorRef registry1;
42 private ActorRef registry2;
43 private ActorRef registry3;
46 public static void setup() throws InterruptedException {
47 Thread.sleep(1000); //give some time for previous test to close netty ports
48 node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
49 node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
50 node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
54 public static void teardown(){
55 JavaTestKit.shutdownActorSystem(node1);
56 JavaTestKit.shutdownActorSystem(node2);
57 JavaTestKit.shutdownActorSystem(node3);
68 public void createRpcRegistry() throws InterruptedException {
69 registry1 = node1.actorOf(Props.create(RpcRegistry.class));
70 registry2 = node2.actorOf(Props.create(RpcRegistry.class));
71 registry3 = node3.actorOf(Props.create(RpcRegistry.class));
75 public void stopRpcRegistry() throws InterruptedException {
76 if (registry1 != null)
77 node1.stop(registry1);
78 if (registry2 != null)
79 node2.stop(registry2);
80 if (registry3 != null)
81 node3.stop(registry3);
86 * Register rpc. Ensure router can be found
88 * @throws URISyntaxException
89 * @throws InterruptedException
92 public void testWhenRpcAddedOneNodeShouldAppearOnSameNode() throws URISyntaxException, InterruptedException {
94 final JavaTestKit mockBroker = new JavaTestKit(node1);
97 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
98 registry1.tell(getAddRouteMessage(), mockBroker.getRef());
100 Thread.sleep(1000);//
102 //find the route on node 1's registry
103 registry1.tell(new FindRouters(createRouteId()), mockBroker.getRef());
104 FindRoutersReply message = mockBroker.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
105 List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
107 validateRouterReceived(pairs, mockBroker.getRef());
111 * Three node cluster.
112 * Register rpc on 1 node. Ensure its router can be found on other 2.
114 * @throws URISyntaxException
115 * @throws InterruptedException
118 public void testWhenRpcAddedOneNodeShouldAppearOnAnother() throws URISyntaxException, InterruptedException {
120 validateSystemStartup();
122 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
123 final JavaTestKit mockBroker2 = new JavaTestKit(node2);
124 final JavaTestKit mockBroker3 = new JavaTestKit(node3);
127 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
128 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
130 Thread.sleep(5000);// give some time for bucket store data sync
132 //find the route in node 2's registry
133 registry2.tell(new FindRouters(createRouteId()), mockBroker2.getRef());
134 FindRoutersReply message = mockBroker2.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
135 List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
137 validateRouterReceived(pairs, mockBroker1.getRef());
139 //find the route in node 3's registry
140 registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef());
141 message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
142 pairs = message.getRouterWithUpdateTime();
144 validateRouterReceived(pairs, mockBroker1.getRef());
149 * Three node cluster.
150 * Register rpc on 2 nodes. Ensure 2 routers are found on 3rd.
155 public void testAnRpcAddedOnMultiNodesShouldReturnMultiRouter() throws Exception {
157 validateSystemStartup();
159 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
160 final JavaTestKit mockBroker2 = new JavaTestKit(node2);
161 final JavaTestKit mockBroker3 = new JavaTestKit(node3);
163 //Thread.sleep(5000);//let system come up
166 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
167 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
169 //Add same rpc on node 2
170 registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
171 registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
173 registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
174 Thread.sleep(5000);// give some time for bucket store data sync
176 //find the route in node 3's registry
177 registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef());
178 FindRoutersReply message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
179 List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
181 validateMultiRouterReceived(pairs, mockBroker1.getRef(), mockBroker2.getRef());
185 private void validateMultiRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef... expected) {
186 Assert.assertTrue(actual != null);
187 Assert.assertTrue(actual.size() == expected.length);
190 private void validateRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef expected){
191 Assert.assertTrue(actual != null);
192 Assert.assertTrue(actual.size() == 1);
194 for (Pair<ActorRef, Long> pair : actual){
195 Assert.assertTrue(expected.path().uid() == pair.first().path().uid());
199 private void validateSystemStartup() throws InterruptedException {
202 ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
203 ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
204 ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
206 ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
207 ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
208 ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
211 if (!resolveReference(gossiper1, gossiper2, gossiper3))
212 Assert.fail("Could not find gossipers");
215 private Boolean resolveReference(ActorSelection... gossipers) throws InterruptedException {
217 Boolean resolved = true;
219 for (int i=0; i< 5; i++) {
221 for (ActorSelection gossiper : gossipers) {
222 Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(5000, TimeUnit.MILLISECONDS));
226 ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
227 } catch (Exception e) {
240 private AddOrUpdateRoute getAddRouteMessage() throws URISyntaxException {
241 return new AddOrUpdateRoute(createRouteId());
244 private RpcRouter.RouteIdentifier<?,?,?> createRouteId() throws URISyntaxException {
245 QName type = new QName(new URI("/mockrpc"), "mockrpc");
246 return new RouteIdentifierImpl(null, type, null);