1 package org.opendaylight.controller.remote.rpc.registry;
3 import akka.actor.ActorPath;
4 import akka.actor.ActorRef;
5 import akka.actor.ActorSelection;
6 import akka.actor.ActorSystem;
7 import akka.actor.ChildActorPath;
8 import akka.actor.Props;
10 import akka.testkit.JavaTestKit;
11 import com.typesafe.config.ConfigFactory;
12 import org.junit.After;
13 import org.junit.AfterClass;
14 import org.junit.Assert;
15 import org.junit.Before;
16 import org.junit.BeforeClass;
17 import org.junit.Test;
18 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
19 import org.opendaylight.controller.sal.connector.api.RpcRouter;
20 import org.opendaylight.yangtools.yang.common.QName;
21 import scala.concurrent.Await;
22 import scala.concurrent.Future;
23 import scala.concurrent.duration.FiniteDuration;
26 import java.net.URISyntaxException;
27 import java.util.ArrayList;
28 import java.util.List;
29 import java.util.concurrent.TimeUnit;
31 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
35 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
37 public class RpcRegistryTest {
39 private static ActorSystem node1;
40 private static ActorSystem node2;
41 private static ActorSystem node3;
43 private ActorRef registry1;
44 private ActorRef registry2;
45 private ActorRef registry3;
48 public static void setup() throws InterruptedException {
49 Thread.sleep(1000); //give some time for previous test to close netty ports
50 node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
51 node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
52 node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
56 public static void teardown(){
57 JavaTestKit.shutdownActorSystem(node1);
58 JavaTestKit.shutdownActorSystem(node2);
59 JavaTestKit.shutdownActorSystem(node3);
70 public void createRpcRegistry() throws InterruptedException {
71 registry1 = node1.actorOf(Props.create(RpcRegistry.class));
72 registry2 = node2.actorOf(Props.create(RpcRegistry.class));
73 registry3 = node3.actorOf(Props.create(RpcRegistry.class));
77 public void stopRpcRegistry() throws InterruptedException {
78 if (registry1 != null)
79 node1.stop(registry1);
80 if (registry2 != null)
81 node2.stop(registry2);
82 if (registry3 != null)
83 node3.stop(registry3);
88 * 1. Register rpc, ensure router can be found
89 * 2. Then remove rpc, ensure its deleted
91 * @throws URISyntaxException
92 * @throws InterruptedException
95 public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
97 final JavaTestKit mockBroker = new JavaTestKit(node1);
100 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
101 registry1.tell(getAddRouteMessage(), mockBroker.getRef());
103 Thread.sleep(1000);//
105 //find the route on node 1's registry
106 registry1.tell(new FindRouters(createRouteId()), mockBroker.getRef());
107 FindRoutersReply message = mockBroker.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
108 List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
110 validateRouterReceived(pairs, mockBroker.getRef());
113 registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
115 //find the route on node 1's registry
116 registry1.tell(new FindRouters(createRouteId()), mockBroker.getRef());
117 message = mockBroker.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
118 pairs = message.getRouterWithUpdateTime();
120 Assert.assertTrue(pairs.isEmpty());
124 * Three node cluster.
125 * 1. Register rpc on 1 node, ensure its router can be found on other 2.
126 * 2. Remove rpc on 1 node, ensure its removed on other 2.
128 * @throws URISyntaxException
129 * @throws InterruptedException
132 public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
134 validateSystemStartup();
136 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
137 final JavaTestKit mockBroker2 = new JavaTestKit(node2);
138 final JavaTestKit mockBroker3 = new JavaTestKit(node3);
141 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
142 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
144 Thread.sleep(1000);// give some time for bucket store data sync
146 //find the route in node 2's registry
147 List<Pair<ActorRef, Long>> pairs = findRouters(registry2, mockBroker2);
148 validateRouterReceived(pairs, mockBroker1.getRef());
150 //find the route in node 3's registry
151 pairs = findRouters(registry3, mockBroker3);
152 validateRouterReceived(pairs, mockBroker1.getRef());
155 registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
156 Thread.sleep(1000);// give some time for bucket store data sync
158 pairs = findRouters(registry2, mockBroker2);
159 Assert.assertTrue(pairs.isEmpty());
161 pairs = findRouters(registry3, mockBroker3);
162 Assert.assertTrue(pairs.isEmpty());
166 * Three node cluster.
167 * Register rpc on 2 nodes. Ensure 2 routers are found on 3rd.
172 public void testAnRpcAddedOnMultiNodesShouldReturnMultiRouter() throws Exception {
174 validateSystemStartup();
176 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
177 final JavaTestKit mockBroker2 = new JavaTestKit(node2);
178 final JavaTestKit mockBroker3 = new JavaTestKit(node3);
180 //Thread.sleep(5000);//let system come up
183 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
184 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
186 //Add same rpc on node 2
187 registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
188 registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
190 registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
191 Thread.sleep(1000);// give some time for bucket store data sync
193 //find the route in node 3's registry
194 registry3.tell(new FindRouters(createRouteId()), mockBroker3.getRef());
195 FindRoutersReply message = mockBroker3.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
196 List<Pair<ActorRef, Long>> pairs = message.getRouterWithUpdateTime();
198 validateMultiRouterReceived(pairs, mockBroker1.getRef(), mockBroker2.getRef());
202 private List<Pair<ActorRef, Long>> findRouters(ActorRef registry, JavaTestKit receivingActor) throws URISyntaxException {
203 registry.tell(new FindRouters(createRouteId()), receivingActor.getRef());
204 FindRoutersReply message = receivingActor.expectMsgClass(JavaTestKit.duration("10 second"), FindRoutersReply.class);
205 return message.getRouterWithUpdateTime();
208 private void validateMultiRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef... expected) {
209 Assert.assertTrue(actual != null);
210 Assert.assertTrue(actual.size() == expected.length);
213 private void validateRouterReceived(List<Pair<ActorRef, Long>> actual, ActorRef expected){
214 Assert.assertTrue(actual != null);
215 Assert.assertTrue(actual.size() == 1);
217 for (Pair<ActorRef, Long> pair : actual){
218 Assert.assertTrue(expected.path().uid() == pair.first().path().uid());
222 private void validateSystemStartup() throws InterruptedException {
225 ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
226 ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
227 ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
229 ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
230 ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
231 ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
234 if (!resolveReference(gossiper1, gossiper2, gossiper3))
235 Assert.fail("Could not find gossipers");
238 private Boolean resolveReference(ActorSelection... gossipers) throws InterruptedException {
240 Boolean resolved = true;
242 for (int i=0; i< 5; i++) {
244 for (ActorSelection gossiper : gossipers) {
245 Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(5000, TimeUnit.MILLISECONDS));
249 ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
250 } catch (Exception e) {
263 private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
264 return new AddOrUpdateRoutes(createRouteIds());
267 private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
268 return new RemoveRoutes(createRouteIds());
271 private List<RpcRouter.RouteIdentifier<?,?,?>> createRouteIds() throws URISyntaxException {
272 QName type = new QName(new URI("/mockrpc"), "mockrpc");
273 List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
274 routeIds.add(new RouteIdentifierImpl(null, type, null));
278 private RpcRouter.RouteIdentifier<?,?,?> createRouteId() throws URISyntaxException {
279 QName type = new QName(new URI("/mockrpc"), "mockrpc");
280 return new RouteIdentifierImpl(null, type, null);