Merge "Small fix to xsql dependencies"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / test / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistryTest.java
1 package org.opendaylight.controller.remote.rpc.registry;
2
3
4 import akka.actor.ActorPath;
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSelection;
7 import akka.actor.ActorSystem;
8 import akka.actor.ChildActorPath;
9 import akka.actor.Props;
10 import akka.testkit.JavaTestKit;
11 import com.google.common.base.Predicate;
12 import com.typesafe.config.ConfigFactory;
13 import org.junit.After;
14 import org.junit.AfterClass;
15 import org.junit.Before;
16 import org.junit.BeforeClass;
17 import org.junit.Test;
18 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
19 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
20 import org.opendaylight.controller.sal.connector.api.RpcRouter;
21 import org.opendaylight.controller.utils.ConditionalProbe;
22 import org.opendaylight.yangtools.yang.common.QName;
23 import scala.concurrent.duration.FiniteDuration;
24
25 import javax.annotation.Nullable;
26 import java.net.URI;
27 import java.net.URISyntaxException;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.TimeUnit;
31
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
35
36 public class RpcRegistryTest {
37
38   private static ActorSystem node1;
39   private static ActorSystem node2;
40   private static ActorSystem node3;
41
42   private ActorRef registry1;
43   private ActorRef registry2;
44   private ActorRef registry3;
45
46   @BeforeClass
47   public static void setup() throws InterruptedException {
48     node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
49     node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
50     node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
51   }
52
53   @AfterClass
54   public static void teardown() {
55     JavaTestKit.shutdownActorSystem(node1);
56     JavaTestKit.shutdownActorSystem(node2);
57     JavaTestKit.shutdownActorSystem(node3);
58     if (node1 != null)
59       node1.shutdown();
60     if (node2 != null)
61       node2.shutdown();
62     if (node3 != null)
63       node3.shutdown();
64
65   }
66
67   @Before
68   public void createRpcRegistry() throws InterruptedException {
69     registry1 = node1.actorOf(Props.create(RpcRegistry.class));
70     registry2 = node2.actorOf(Props.create(RpcRegistry.class));
71     registry3 = node3.actorOf(Props.create(RpcRegistry.class));
72   }
73
74   @After
75   public void stopRpcRegistry() throws InterruptedException {
76     if (registry1 != null)
77       node1.stop(registry1);
78     if (registry2 != null)
79       node2.stop(registry2);
80     if (registry3 != null)
81       node3.stop(registry3);
82   }
83
84   /**
85    * One node cluster.
86    * 1. Register rpc, ensure router can be found
87    * 2. Then remove rpc, ensure its deleted
88    *
89    * @throws URISyntaxException
90    * @throws InterruptedException
91    */
92   @Test
93   public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
94
95     final JavaTestKit mockBroker = new JavaTestKit(node1);
96
97     final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
98
99     //install probe
100     final JavaTestKit probe1 = createProbeForMessage(
101         node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
102
103     //Add rpc on node 1
104     registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
105     registry1.tell(getAddRouteMessage(), mockBroker.getRef());
106
107     //Bucket store should get an update bucket message. Updated bucket contains added rpc.
108     probe1.expectMsgClass(
109         FiniteDuration.apply(10, TimeUnit.SECONDS),
110         Messages.BucketStoreMessages.UpdateBucket.class);
111
112     //Now remove rpc
113     registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
114
115     //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
116     probe1.expectMsgClass(
117         FiniteDuration.apply(10, TimeUnit.SECONDS),
118         Messages.BucketStoreMessages.UpdateBucket.class);
119
120
121   }
122
123
124   /**
125    * Three node cluster.
126    * 1. Register rpc on 1 node, ensure 2nd node gets updated
127    * 2. Remove rpc on 1 node, ensure 2nd node gets updated
128    *
129    * @throws URISyntaxException
130    * @throws InterruptedException
131    */
132   @Test
133   public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
134
135     final JavaTestKit mockBroker1 = new JavaTestKit(node1);
136
137     //install probe on node2's bucket store
138     final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
139     final JavaTestKit probe2 = createProbeForMessage(
140         node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
141
142     //Add rpc on node 1
143     registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
144     registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
145
146     //Bucket store on node2 should get a message to update its local copy of remote buckets
147     probe2.expectMsgClass(
148         FiniteDuration.apply(10, TimeUnit.SECONDS),
149         Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
150
151     //Now remove
152     registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
153
154     //Bucket store on node2 should get a message to update its local copy of remote buckets
155     probe2.expectMsgClass(
156         FiniteDuration.apply(10, TimeUnit.SECONDS),
157         Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
158
159   }
160
161   /**
162    * Three node cluster.
163    * Register rpc on 2 nodes. Ensure 3rd gets updated.
164    *
165    * @throws Exception
166    */
167   @Test
168   public void testRpcAddedOnMultiNodes() throws Exception {
169
170     final JavaTestKit mockBroker1 = new JavaTestKit(node1);
171     final JavaTestKit mockBroker2 = new JavaTestKit(node2);
172     final JavaTestKit mockBroker3 = new JavaTestKit(node3);
173
174     registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
175
176     //install probe on node 3
177     final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
178     final JavaTestKit probe3 = createProbeForMessage(
179         node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
180
181
182     //Add rpc on node 1
183     registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
184     registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
185
186     probe3.expectMsgClass(
187         FiniteDuration.apply(10, TimeUnit.SECONDS),
188         Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
189
190
191     //Add same rpc on node 2
192     registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
193     registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
194
195     probe3.expectMsgClass(
196         FiniteDuration.apply(10, TimeUnit.SECONDS),
197         Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
198   }
199
200   private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) {
201     final JavaTestKit probe = new JavaTestKit(node);
202
203     ConditionalProbe conditionalProbe =
204         new ConditionalProbe(probe.getRef(), new Predicate() {
205           @Override
206           public boolean apply(@Nullable Object input) {
207             return clazz.equals(input.getClass());
208           }
209         });
210
211     ActorSelection subject = node.actorSelection(subjectPath);
212     subject.tell(conditionalProbe, ActorRef.noSender());
213
214     return probe;
215
216   }
217
218   private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
219     return new AddOrUpdateRoutes(createRouteIds());
220   }
221
222   private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
223     return new RemoveRoutes(createRouteIds());
224   }
225
226   private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
227     QName type = new QName(new URI("/mockrpc"), "mockrpc");
228     List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
229     routeIds.add(new RouteIdentifierImpl(null, type, null));
230     return routeIds;
231   }
232
233 }