2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry;
11 import static org.junit.Assert.fail;
12 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
13 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSystem;
17 import akka.actor.Address;
18 import akka.cluster.Cluster;
19 import akka.cluster.ClusterEvent.CurrentClusterState;
20 import akka.cluster.Member;
21 import akka.cluster.MemberStatus;
22 import akka.cluster.UniqueAddress;
23 import akka.testkit.JavaTestKit;
24 import com.google.common.base.Stopwatch;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.typesafe.config.ConfigFactory;
29 import java.net.URISyntaxException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.List;
36 import java.util.Optional;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.After;
40 import org.junit.AfterClass;
41 import org.junit.Assert;
42 import org.junit.Before;
43 import org.junit.BeforeClass;
44 import org.junit.Test;
45 import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
46 import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
47 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
48 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
49 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
50 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.UpdateRemoteEndpoints;
51 import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.RemoteRpcEndpoint;
52 import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
53 import org.opendaylight.yangtools.yang.common.QName;
54 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import scala.concurrent.duration.Duration;
58 import scala.concurrent.duration.FiniteDuration;
60 public class RpcRegistryTest {
61 private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryTest.class);
63 private static ActorSystem node1;
64 private static ActorSystem node2;
65 private static ActorSystem node3;
67 private JavaTestKit invoker1;
68 private JavaTestKit invoker2;
69 private JavaTestKit invoker3;
70 private JavaTestKit registrar1;
71 private JavaTestKit registrar2;
72 private JavaTestKit registrar3;
73 private ActorRef registry1;
74 private ActorRef registry2;
75 private ActorRef registry3;
77 private int routeIdCounter = 1;
80 public static void staticSetup() throws InterruptedException {
81 AkkaConfigurationReader reader = ConfigFactory::load;
83 RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms")
84 .withConfigReader(reader).build();
85 RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms")
86 .withConfigReader(reader).build();
87 RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms")
88 .withConfigReader(reader).build();
89 node1 = ActorSystem.create("opendaylight-rpc", config1.get());
90 node2 = ActorSystem.create("opendaylight-rpc", config2.get());
91 node3 = ActorSystem.create("opendaylight-rpc", config3.get());
93 waitForMembersUp(node1, Cluster.get(node2).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
94 waitForMembersUp(node2, Cluster.get(node1).selfUniqueAddress(), Cluster.get(node3).selfUniqueAddress());
97 static void waitForMembersUp(final ActorSystem node, final UniqueAddress... addresses) {
98 Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
99 Stopwatch sw = Stopwatch.createStarted();
100 while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
101 CurrentClusterState state = Cluster.get(node).state();
102 for (Member m : state.getMembers()) {
103 if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress())
104 && otherMembersSet.isEmpty()) {
109 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
112 fail("Member(s) " + otherMembersSet + " are not Up");
116 public static void staticTeardown() {
117 JavaTestKit.shutdownActorSystem(node1);
118 JavaTestKit.shutdownActorSystem(node2);
119 JavaTestKit.shutdownActorSystem(node3);
123 public void setup() {
124 invoker1 = new JavaTestKit(node1);
125 registrar1 = new JavaTestKit(node1);
126 registry1 = node1.actorOf(RpcRegistry.props(config(node1), invoker1.getRef(), registrar1.getRef()));
127 invoker2 = new JavaTestKit(node2);
128 registrar2 = new JavaTestKit(node2);
129 registry2 = node2.actorOf(RpcRegistry.props(config(node2), invoker2.getRef(), registrar2.getRef()));
130 invoker3 = new JavaTestKit(node3);
131 registrar3 = new JavaTestKit(node3);
132 registry3 = node3.actorOf(RpcRegistry.props(config(node3), invoker3.getRef(), registrar3.getRef()));
135 private static RemoteRpcProviderConfig config(final ActorSystem node) {
136 return new RemoteRpcProviderConfig(node.settings().config());
140 public void teardown() {
141 if (registry1 != null) {
142 node1.stop(registry1);
144 if (registry2 != null) {
145 node2.stop(registry2);
147 if (registry3 != null) {
148 node3.stop(registry3);
151 if (invoker1 != null) {
152 node1.stop(invoker1.getRef());
154 if (invoker2 != null) {
155 node2.stop(invoker2.getRef());
157 if (invoker3 != null) {
158 node3.stop(invoker3.getRef());
161 if (registrar1 != null) {
162 node1.stop(registrar1.getRef());
164 if (registrar2 != null) {
165 node2.stop(registrar2.getRef());
167 if (registrar3 != null) {
168 node3.stop(registrar3.getRef());
173 * One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its
177 public void testAddRemoveRpcOnSameNode() throws Exception {
178 LOG.info("testAddRemoveRpcOnSameNode starting");
180 Address nodeAddress = node1.provider().getDefaultAddress();
184 List<DOMRpcIdentifier> addedRouteIds = createRouteIds();
186 registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender());
188 // Bucket store should get an update bucket message. Updated bucket contains added rpc.
189 final JavaTestKit testKit = new JavaTestKit(node1);
191 Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry1, testKit, nodeAddress);
192 verifyBucket(buckets.get(nodeAddress), addedRouteIds);
194 Map<Address, Long> versions = retrieveVersions(registry1, testKit);
195 Assert.assertEquals("Version for bucket " + nodeAddress, (Long) buckets.get(nodeAddress).getVersion(),
196 versions.get(nodeAddress));
199 registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender());
201 // Bucket store should get an update bucket message. Rpc is removed in the updated bucket
203 verifyEmptyBucket(testKit, registry1, nodeAddress);
205 LOG.info("testAddRemoveRpcOnSameNode ending");
210 * Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on
211 * 1 node, ensure 2nd node gets updated
214 public void testRpcAddRemoveInCluster() throws Exception {
216 LOG.info("testRpcAddRemoveInCluster starting");
218 List<DOMRpcIdentifier> addedRouteIds = createRouteIds();
220 Address node1Address = node1.provider().getDefaultAddress();
223 registry1.tell(new AddOrUpdateRoutes(addedRouteIds), ActorRef.noSender());
225 // Bucket store on node2 should get a message to update its local copy of remote buckets
226 final JavaTestKit testKit = new JavaTestKit(node2);
228 Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry2, testKit, node1Address);
229 verifyBucket(buckets.get(node1Address), addedRouteIds);
232 registry1.tell(new RemoveRoutes(addedRouteIds), ActorRef.noSender());
234 // Bucket store on node2 should get a message to update its local copy of remote buckets.
235 // Wait for the bucket for node1 to be empty.
237 verifyEmptyBucket(testKit, registry2, node1Address);
239 LOG.info("testRpcAddRemoveInCluster ending");
242 private void verifyEmptyBucket(final JavaTestKit testKit, final ActorRef registry, final Address address)
243 throws AssertionError {
244 Map<Address, Bucket<RoutingTable>> buckets;
247 buckets = retrieveBuckets(registry1, testKit, address);
250 verifyBucket(buckets.get(address), Collections.emptyList());
252 } catch (AssertionError e) {
253 if (++numTries >= 50) {
258 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
263 * Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
266 public void testRpcAddedOnMultiNodes() throws Exception {
267 final JavaTestKit testKit = new JavaTestKit(node3);
270 List<DOMRpcIdentifier> addedRouteIds1 = createRouteIds();
271 registry1.tell(new AddOrUpdateRoutes(addedRouteIds1), ActorRef.noSender());
273 final UpdateRemoteEndpoints req1 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
274 UpdateRemoteEndpoints.class);
277 List<DOMRpcIdentifier> addedRouteIds2 = createRouteIds();
278 registry2.tell(new AddOrUpdateRoutes(addedRouteIds2), ActorRef.noSender());
280 final UpdateRemoteEndpoints req2 = registrar3.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
281 UpdateRemoteEndpoints.class);
282 Address node2Address = node2.provider().getDefaultAddress();
283 Address node1Address = node1.provider().getDefaultAddress();
285 Map<Address, Bucket<RoutingTable>> buckets = retrieveBuckets(registry3, testKit, node1Address,
288 verifyBucket(buckets.get(node1Address), addedRouteIds1);
289 verifyBucket(buckets.get(node2Address), addedRouteIds2);
291 Map<Address, Long> versions = retrieveVersions(registry3, testKit);
292 Assert.assertEquals("Version for bucket " + node1Address, (Long) buckets.get(node1Address).getVersion(),
293 versions.get(node1Address));
294 Assert.assertEquals("Version for bucket " + node2Address, (Long) buckets.get(node2Address).getVersion(),
295 versions.get(node2Address));
297 assertEndpoints(req1, node1Address, invoker1);
298 assertEndpoints(req2, node2Address, invoker2);
302 private static void assertEndpoints(final UpdateRemoteEndpoints msg, final Address address,
303 final JavaTestKit invoker) {
304 final Map<Address, Optional<RemoteRpcEndpoint>> endpoints = msg.getEndpoints();
305 Assert.assertEquals(1, endpoints.size());
307 final Optional<RemoteRpcEndpoint> maybeEndpoint = endpoints.get(address);
308 Assert.assertNotNull(maybeEndpoint);
309 Assert.assertTrue(maybeEndpoint.isPresent());
311 final RemoteRpcEndpoint endpoint = maybeEndpoint.get();
312 final ActorRef router = endpoint.getRouter();
313 Assert.assertNotNull(router);
315 router.tell("hello", ActorRef.noSender());
316 final String s = invoker.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), String.class);
317 Assert.assertEquals("hello", s);
320 private static Map<Address, Long> retrieveVersions(final ActorRef bucketStore, final JavaTestKit testKit) {
321 bucketStore.tell(GET_BUCKET_VERSIONS, testKit.getRef());
322 @SuppressWarnings("unchecked")
323 final Map<Address, Long> reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS), Map.class);
327 private static void verifyBucket(final Bucket<RoutingTable> bucket, final List<DOMRpcIdentifier> expRouteIds) {
328 RoutingTable table = bucket.getData();
329 Assert.assertNotNull("Bucket RoutingTable is null", table);
330 for (DOMRpcIdentifier r : expRouteIds) {
331 if (!table.contains(r)) {
332 Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
336 Assert.assertEquals("RoutingTable size", expRouteIds.size(), table.size());
339 private static Map<Address, Bucket<RoutingTable>> retrieveBuckets(final ActorRef bucketStore,
340 final JavaTestKit testKit, final Address... addresses) {
343 bucketStore.tell(GET_ALL_BUCKETS, testKit.getRef());
344 @SuppressWarnings("unchecked")
345 Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
348 boolean foundAll = true;
349 for (Address addr : addresses) {
350 Bucket<RoutingTable> bucket = buckets.get(addr);
351 if (bucket == null) {
361 if (++numTries >= 50) {
362 Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
363 + ", Actual: " + buckets);
366 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
371 public void testAddRoutesConcurrency() throws Exception {
372 final JavaTestKit testKit = new JavaTestKit(node1);
374 final int nRoutes = 500;
375 final Collection<DOMRpcIdentifier> added = new ArrayList<>(nRoutes);
376 for (int i = 0; i < nRoutes; i++) {
377 final DOMRpcIdentifier routeId = DOMRpcIdentifier.create(SchemaPath.create(true,
378 new QName(new URI("/mockrpc"), "type" + i)));
381 //Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
382 registry1.tell(new AddOrUpdateRoutes(Arrays.asList(routeId)),
383 ActorRef.noSender());
386 FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
389 registry1.tell(GET_ALL_BUCKETS, testKit.getRef());
390 @SuppressWarnings("unchecked")
391 Map<Address, Bucket<RoutingTable>> buckets = testKit.expectMsgClass(duration, Map.class);
393 Bucket<RoutingTable> localBucket = buckets.values().iterator().next();
394 RoutingTable table = localBucket.getData();
395 if (table != null && table.size() == nRoutes) {
396 for (DOMRpcIdentifier r : added) {
397 Assert.assertTrue("RoutingTable contains " + r, table.contains(r));
403 if (++numTries >= 50) {
404 Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
407 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
411 private List<DOMRpcIdentifier> createRouteIds() throws URISyntaxException {
412 QName type = new QName(new URI("/mockrpc"), "mockrpc" + routeIdCounter++);
413 List<DOMRpcIdentifier> routeIds = new ArrayList<>(1);
414 routeIds.add(DOMRpcIdentifier.create(SchemaPath.create(true, type)));