Merge "Patch to eliminate sleeps in RemoteRpc tests"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / test / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistryTest.java
1 package org.opendaylight.controller.remote.rpc.registry;
2
3
4 import akka.actor.ActorPath;
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSelection;
7 import akka.actor.ActorSystem;
8 import akka.actor.ChildActorPath;
9 import akka.actor.Props;
10 import akka.testkit.JavaTestKit;
11 import com.google.common.base.Predicate;
12 import com.typesafe.config.ConfigFactory;
13
14 import org.junit.After;
15 import org.junit.AfterClass;
16 import org.junit.Assert;
17 import org.junit.Before;
18 import org.junit.BeforeClass;
19 import org.junit.Test;
20 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
22 import org.opendaylight.controller.sal.connector.api.RpcRouter;
23 import org.opendaylight.controller.utils.ConditionalProbe;
24 import org.opendaylight.yangtools.yang.common.QName;
25 import scala.concurrent.Await;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.FiniteDuration;
28
29 import javax.annotation.Nullable;
30 import java.net.URI;
31 import java.net.URISyntaxException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.concurrent.TimeUnit;
35
36 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
37 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
38 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
39
40 public class RpcRegistryTest {
41
42   private static ActorSystem node1;
43   private static ActorSystem node2;
44   private static ActorSystem node3;
45
46   private ActorRef registry1;
47   private ActorRef registry2;
48   private ActorRef registry3;
49
50   @BeforeClass
51   public static void setup() throws InterruptedException {
52     node1 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberA"));
53     node2 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberB"));
54     node3 = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("memberC"));
55   }
56
57   @AfterClass
58   public static void teardown() {
59     JavaTestKit.shutdownActorSystem(node1);
60     JavaTestKit.shutdownActorSystem(node2);
61     JavaTestKit.shutdownActorSystem(node3);
62     if (node1 != null)
63       node1.shutdown();
64     if (node2 != null)
65       node2.shutdown();
66     if (node3 != null)
67       node3.shutdown();
68
69   }
70
71   @Before
72   public void createRpcRegistry() throws InterruptedException {
73     registry1 = node1.actorOf(Props.create(RpcRegistry.class));
74     registry2 = node2.actorOf(Props.create(RpcRegistry.class));
75     registry3 = node3.actorOf(Props.create(RpcRegistry.class));
76   }
77
78   @After
79   public void stopRpcRegistry() throws InterruptedException {
80     if (registry1 != null)
81       node1.stop(registry1);
82     if (registry2 != null)
83       node2.stop(registry2);
84     if (registry3 != null)
85       node3.stop(registry3);
86   }
87
88   /**
89    * One node cluster.
90    * 1. Register rpc, ensure router can be found
91    * 2. Then remove rpc, ensure its deleted
92    *
93    * @throws URISyntaxException
94    * @throws InterruptedException
95    */
96   @Test
97   public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
98     validateSystemStartup();
99
100     final JavaTestKit mockBroker = new JavaTestKit(node1);
101
102     final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
103
104     //install probe
105     final JavaTestKit probe1 = createProbeForMessage(
106         node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
107
108     //Add rpc on node 1
109     registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
110     registry1.tell(getAddRouteMessage(), mockBroker.getRef());
111
112     //Bucket store should get an update bucket message. Updated bucket contains added rpc.
113     probe1.expectMsgClass(
114         FiniteDuration.apply(10, TimeUnit.SECONDS),
115         Messages.BucketStoreMessages.UpdateBucket.class);
116
117     //Now remove rpc
118     registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
119
120     //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
121     probe1.expectMsgClass(
122         FiniteDuration.apply(10, TimeUnit.SECONDS),
123         Messages.BucketStoreMessages.UpdateBucket.class);
124
125
126   }
127
128
129   /**
130    * Three node cluster.
131    * 1. Register rpc on 1 node, ensure 2nd node gets updated
132    * 2. Remove rpc on 1 node, ensure 2nd node gets updated
133    *
134    * @throws URISyntaxException
135    * @throws InterruptedException
136    */
137   @Test
138   public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
139
140     validateSystemStartup();
141
142     final JavaTestKit mockBroker1 = new JavaTestKit(node1);
143
144     //install probe on node2's bucket store
145     final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
146     final JavaTestKit probe2 = createProbeForMessage(
147         node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
148
149
150     //Add rpc on node 1
151     registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
152     registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
153
154     //Bucket store on node2 should get a message to update its local copy of remote buckets
155     probe2.expectMsgClass(
156         FiniteDuration.apply(10, TimeUnit.SECONDS),
157         Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
158
159     //Now remove
160     registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
161
162     //Bucket store on node2 should get a message to update its local copy of remote buckets
163     probe2.expectMsgClass(
164         FiniteDuration.apply(10, TimeUnit.SECONDS),
165         Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
166
167   }
168
169   /**
170    * Three node cluster.
171    * Register rpc on 2 nodes. Ensure 3rd gets updated.
172    *
173    * @throws Exception
174    */
175   @Test
176   public void testRpcAddedOnMultiNodes() throws Exception {
177
178     validateSystemStartup();
179
180     final JavaTestKit mockBroker1 = new JavaTestKit(node1);
181     final JavaTestKit mockBroker2 = new JavaTestKit(node2);
182     final JavaTestKit mockBroker3 = new JavaTestKit(node3);
183
184     registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
185
186     //install probe on node 3
187     final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
188     final JavaTestKit probe3 = createProbeForMessage(
189         node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
190
191
192     //Add rpc on node 1
193     registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
194     registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
195
196     probe3.expectMsgClass(
197         FiniteDuration.apply(10, TimeUnit.SECONDS),
198         Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
199
200
201     //Add same rpc on node 2
202     registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
203     registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
204
205     probe3.expectMsgClass(
206         FiniteDuration.apply(10, TimeUnit.SECONDS),
207         Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
208   }
209
210   private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class clazz) {
211     final JavaTestKit probe = new JavaTestKit(node);
212
213     ConditionalProbe conditionalProbe =
214         new ConditionalProbe(probe.getRef(), new Predicate() {
215           @Override
216           public boolean apply(@Nullable Object input) {
217             return clazz.equals(input.getClass());
218           }
219         });
220
221     ActorSelection subject = node.actorSelection(subjectPath);
222     subject.tell(conditionalProbe, ActorRef.noSender());
223
224     return probe;
225
226   }
227
228   private void validateSystemStartup() throws InterruptedException {
229
230     ActorPath gossiper1Path = new ChildActorPath(new ChildActorPath(registry1.path(), "store"), "gossiper");
231     ActorPath gossiper2Path = new ChildActorPath(new ChildActorPath(registry2.path(), "store"), "gossiper");
232     ActorPath gossiper3Path = new ChildActorPath(new ChildActorPath(registry3.path(), "store"), "gossiper");
233
234     ActorSelection gossiper1 = node1.actorSelection(gossiper1Path);
235     ActorSelection gossiper2 = node2.actorSelection(gossiper2Path);
236     ActorSelection gossiper3 = node3.actorSelection(gossiper3Path);
237
238
239     if (!resolveReference(gossiper1, gossiper2, gossiper3))
240       Assert.fail("Could not find gossipers");
241   }
242
243   private Boolean resolveReference(ActorSelection... gossipers) {
244
245     Boolean resolved = true;
246     for (int i = 0; i < 5; i++) {
247
248       resolved = true;
249       System.out.println(System.currentTimeMillis() + " Resolving gossipers; trial #" + i);
250
251       for (ActorSelection gossiper : gossipers) {
252         ActorRef ref = null;
253
254         try {
255           Future<ActorRef> future = gossiper.resolveOne(new FiniteDuration(15000, TimeUnit.MILLISECONDS));
256           ref = Await.result(future, new FiniteDuration(10000, TimeUnit.MILLISECONDS));
257         } catch (Exception e) {
258           System.out.println("Could not find gossiper in attempt#" + i + ". Got exception " + e.getMessage());
259         }
260
261         if (ref == null)
262           resolved = false;
263       }
264
265       if (resolved) break;
266
267     }
268     return resolved;
269   }
270
271   private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
272     return new AddOrUpdateRoutes(createRouteIds());
273   }
274
275   private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
276     return new RemoveRoutes(createRouteIds());
277   }
278
279   private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
280     QName type = new QName(new URI("/mockrpc"), "mockrpc");
281     List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
282     routeIds.add(new RouteIdentifierImpl(null, type, null));
283     return routeIds;
284   }
285
286 }