Merge "Fix compilation warnings: replace deprecated junit.framework.Assert"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / test / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistryTest.java
1 package org.opendaylight.controller.remote.rpc.registry;
2
3 import akka.actor.ActorRef;
4 import akka.actor.ActorSystem;
5 import akka.actor.Address;
6 import akka.actor.Props;
7 import akka.japi.Pair;
8 import akka.testkit.JavaTestKit;
9 import com.google.common.util.concurrent.Uninterruptibles;
10 import java.net.URI;
11 import java.net.URISyntaxException;
12 import java.util.ArrayList;
13 import java.util.Arrays;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.concurrent.TimeUnit;
18 import org.junit.After;
19 import org.junit.AfterClass;
20 import org.junit.Assert;
21 import org.junit.Before;
22 import org.junit.BeforeClass;
23 import org.junit.Test;
24 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
25 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
26 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
27 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRouters;
28 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.FindRoutersReply;
29 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
30 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
36 import org.opendaylight.controller.sal.connector.api.RpcRouter;
37 import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
38 import org.opendaylight.yangtools.yang.common.QName;
39 import scala.concurrent.duration.Duration;
40 import scala.concurrent.duration.FiniteDuration;
41
42 public class RpcRegistryTest {
43
44     private static ActorSystem node1;
45     private static ActorSystem node2;
46     private static ActorSystem node3;
47
48     private ActorRef registry1;
49     private ActorRef registry2;
50     private ActorRef registry3;
51
52     private int routeIdCounter = 1;
53
54     @BeforeClass
55     public static void staticSetup() throws InterruptedException {
56       RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
57       RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
58       RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
59       node1 = ActorSystem.create("opendaylight-rpc", config1.get());
60       node2 = ActorSystem.create("opendaylight-rpc", config2.get());
61       node3 = ActorSystem.create("opendaylight-rpc", config3.get());
62     }
63
64     @AfterClass
65     public static void staticTeardown() {
66       JavaTestKit.shutdownActorSystem(node1);
67       JavaTestKit.shutdownActorSystem(node2);
68       JavaTestKit.shutdownActorSystem(node3);
69     }
70
71     @Before
72     public void setup() {
73         registry1 = node1.actorOf(Props.create(RpcRegistry.class));
74         registry2 = node2.actorOf(Props.create(RpcRegistry.class));
75         registry3 = node3.actorOf(Props.create(RpcRegistry.class));
76     }
77
78     @After
79     public void teardown() {
80         if (registry1 != null) {
81             node1.stop(registry1);
82         }
83         if (registry2 != null) {
84             node2.stop(registry2);
85         }
86         if (registry3 != null) {
87             node3.stop(registry3);
88         }
89     }
90
91     /**
92      * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its
93      * deleted
94      *
95      * @throws URISyntaxException
96      * @throws InterruptedException
97      */
98     @Test
99     public void testAddRemoveRpcOnSameNode() throws Exception {
100
101         System.out.println("testAddRemoveRpcOnSameNode starting");
102
103         final JavaTestKit mockBroker = new JavaTestKit(node1);
104
105         Address nodeAddress = node1.provider().getDefaultAddress();
106
107         // Add rpc on node 1
108         registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
109
110         List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
111
112         registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker.getRef());
113
114         // Bucket store should get an update bucket message. Updated bucket contains added rpc.
115
116         Map<Address, Bucket> buckets = retrieveBuckets(registry1, mockBroker, nodeAddress);
117         verifyBucket(buckets.get(nodeAddress), addedRouteIds);
118
119         Map<Address, Long> versions = retrieveVersions(registry1, mockBroker);
120         Assert.assertEquals("Version for bucket " + nodeAddress, buckets.get(nodeAddress).getVersion(),
121                 versions.get(nodeAddress));
122
123         // Now remove rpc
124         registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker.getRef());
125
126         // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
127
128         verifyEmptyBucket(mockBroker, registry1, nodeAddress);
129
130         System.out.println("testAddRemoveRpcOnSameNode ending");
131
132     }
133
134     /**
135      * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on
136      * 1 node, ensure 2nd node gets updated
137      *
138      * @throws URISyntaxException
139      * @throws InterruptedException
140      */
141     @Test
142     public void testRpcAddRemoveInCluster() throws Exception {
143
144         System.out.println("testRpcAddRemoveInCluster starting");
145
146         final JavaTestKit mockBroker1 = new JavaTestKit(node1);
147         final JavaTestKit mockBroker2 = new JavaTestKit(node2);
148
149         List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds = createRouteIds();
150
151         Address node1Address = node1.provider().getDefaultAddress();
152
153         // Add rpc on node 1
154         registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
155         registry1.tell(new AddOrUpdateRoutes(addedRouteIds), mockBroker1.getRef());
156
157         // Bucket store on node2 should get a message to update its local copy of remote buckets
158
159         Map<Address, Bucket> buckets = retrieveBuckets(registry2, mockBroker2, node1Address);
160         verifyBucket(buckets.get(node1Address), addedRouteIds);
161
162         // Now remove
163         registry1.tell(new RemoveRoutes(addedRouteIds), mockBroker1.getRef());
164
165         // Bucket store on node2 should get a message to update its local copy of remote buckets.
166         // Wait for the bucket for node1 to be empty.
167
168         verifyEmptyBucket(mockBroker2, registry2, node1Address);
169
170         System.out.println("testRpcAddRemoveInCluster ending");
171     }
172
173     private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
174             throws AssertionError {
175         Map<Address, Bucket> buckets;
176         int nTries = 0;
177         while(true) {
178             buckets = retrieveBuckets(registry1, testKit, address);
179
180             try {
181                 verifyBucket(buckets.get(address), Collections.<RouteIdentifier<?, ?, ?>>emptyList());
182                 break;
183             } catch (AssertionError e) {
184                 if(++nTries >= 50) {
185                     throw e;
186                 }
187             }
188
189             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
190         }
191     }
192
193     /**
194      * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
195      *
196      * @throws Exception
197      */
198     @Test
199     public void testRpcAddedOnMultiNodes() throws Exception {
200
201         final JavaTestKit mockBroker1 = new JavaTestKit(node1);
202         final JavaTestKit mockBroker2 = new JavaTestKit(node2);
203         final JavaTestKit mockBroker3 = new JavaTestKit(node3);
204
205         registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
206
207         // Add rpc on node 1
208         List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds1 = createRouteIds();
209         registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
210         registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), mockBroker1.getRef());
211
212         // Add rpc on node 2
213         List<RpcRouter.RouteIdentifier<?, ?, ?>> addedRouteIds2 = createRouteIds();
214         registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
215         registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), mockBroker2.getRef());
216
217         Address node1Address = node1.provider().getDefaultAddress();
218         Address node2Address = node2.provider().getDefaultAddress();
219
220         Map<Address, Bucket> buckets = retrieveBuckets(registry3, mockBroker3, node1Address,
221                 node2Address);
222
223         verifyBucket(buckets.get(node1Address), addedRouteIds1);
224         verifyBucket(buckets.get(node2Address), addedRouteIds2);
225
226         Map<Address, Long> versions = retrieveVersions(registry3, mockBroker3);
227         Assert.assertEquals("Version for bucket " + node1Address, buckets.get(node1Address).getVersion(),
228                 versions.get(node1Address));
229         Assert.assertEquals("Version for bucket " + node2Address, buckets.get(node2Address).getVersion(),
230                 versions.get(node2Address));
231
232         RouteIdentifier<?, ?, ?> routeID = addedRouteIds1.get(0);
233         registry3.tell(new FindRouters(routeID), mockBroker3.getRef());
234
235         FindRoutersReply reply = mockBroker3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
236                 FindRoutersReply.class);
237
238         List<Pair<ActorRef, Long>> respList = reply.getRouterWithUpdateTime();
239         Assert.assertEquals("getRouterWithUpdateTime size", 1, respList.size());
240
241         respList.get(0).first().tell("hello", ActorRef.noSender());
242         mockBroker1.expectMsgEquals(Duration.create(3, TimeUnit.SECONDS), "hello");
243     }
244
245     private Map<Address, Long> retrieveVersions(ActorRef bucketStore, JavaTestKit testKit) {
246         bucketStore.tell(new GetBucketVersions(), testKit.getRef());
247         GetBucketVersionsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
248                 GetBucketVersionsReply.class);
249         return reply.getVersions();
250     }
251
252     private void verifyBucket(Bucket<RoutingTable> bucket, List<RouteIdentifier<?, ?, ?>> expRouteIds) {
253         RoutingTable table = bucket.getData();
254         Assert.assertNotNull("Bucket RoutingTable is null", table);
255         for(RouteIdentifier<?, ?, ?> r: expRouteIds) {
256             if(!table.contains(r)) {
257                 Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
258             }
259         }
260
261         Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
262     }
263
264     private Map<Address, Bucket> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
265             Address... addresses) {
266         int nTries = 0;
267         while(true) {
268             bucketStore.tell(new GetAllBuckets(), testKit.getRef());
269             GetAllBucketsReply reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
270                     GetAllBucketsReply.class);
271
272             Map<Address, Bucket> buckets = reply.getBuckets();
273             boolean foundAll = true;
274             for(Address addr: addresses) {
275                 Bucket bucket = buckets.get(addr);
276                 if(bucket  == null) {
277                     foundAll = false;
278                     break;
279                 }
280             }
281
282             if(foundAll) {
283                 return buckets;
284             }
285
286             if(++nTries >= 50) {
287                 Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
288                         + ", Actual: " + buckets);
289             }
290
291             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
292         }
293     }
294
295     @SuppressWarnings("unchecked")
296     @Test
297     public void testAddRoutesConcurrency() throws Exception {
298         final JavaTestKit testKit = new JavaTestKit(node1);
299
300         registry1.tell(new SetLocalRouter(testKit.getRef()), ActorRef.noSender());
301
302         final int nRoutes = 500;
303         final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
304         for(int i = 0; i < nRoutes; i++) {
305             final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
306                     new QName(new URI("/mockrpc"), "type" + i), null);
307             added[i] = routeId;
308
309             //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
310             registry1.tell(new AddOrUpdateRoutes(Arrays.<RouteIdentifier<?, ?, ?>>asList(routeId)),
311                     ActorRef.noSender());
312         }
313
314         GetAllBuckets getAllBuckets = new GetAllBuckets();
315         FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
316         int nTries = 0;
317         while(true) {
318             registry1.tell(getAllBuckets, testKit.getRef());
319             GetAllBucketsReply reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
320
321             Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
322             RoutingTable table = localBucket.getData();
323             if(table != null && table.size() == nRoutes) {
324                 for(RouteIdentifier<?, ?, ?> r: added) {
325                     Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
326                 }
327
328                 break;
329             }
330
331             if(++nTries >= 50) {
332                 Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
333             }
334
335             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
336         }
337     }
338
339     private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
340         QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
341         List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
342         routeIds.add(new RouteIdentifierImpl(null, type, null));
343         return routeIds;
344     }
345 }