1 package org.opendaylight.controller.remote.rpc.registry;
4 import akka.actor.ActorPath;
5 import akka.actor.ActorRef;
6 import akka.actor.ActorSelection;
7 import akka.actor.ActorSystem;
8 import akka.actor.ChildActorPath;
9 import akka.actor.Props;
10 import akka.testkit.JavaTestKit;
11 import com.google.common.base.Predicate;
12 import org.junit.After;
13 import org.junit.AfterClass;
14 import org.junit.Before;
15 import org.junit.BeforeClass;
16 import org.junit.Test;
17 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
18 import org.opendaylight.controller.remote.rpc.RouteIdentifierImpl;
19 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages;
20 import org.opendaylight.controller.sal.connector.api.RpcRouter;
21 import org.opendaylight.controller.utils.ConditionalProbe;
22 import org.opendaylight.yangtools.yang.common.QName;
23 import scala.concurrent.duration.FiniteDuration;
25 import javax.annotation.Nullable;
27 import java.net.URISyntaxException;
28 import java.util.ArrayList;
29 import java.util.List;
30 import java.util.concurrent.TimeUnit;
32 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.AddOrUpdateRoutes;
33 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.RemoveRoutes;
34 import static org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
36 public class RpcRegistryTest {
38 private static ActorSystem node1;
39 private static ActorSystem node2;
40 private static ActorSystem node3;
42 private ActorRef registry1;
43 private ActorRef registry2;
44 private ActorRef registry3;
47 public static void setup() throws InterruptedException {
48 RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").build();
49 RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").build();
50 RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").build();
51 node1 = ActorSystem.create("opendaylight-rpc", config1.get());
52 node2 = ActorSystem.create("opendaylight-rpc", config2.get());
53 node3 = ActorSystem.create("opendaylight-rpc", config3.get());
57 public static void teardown() {
58 JavaTestKit.shutdownActorSystem(node1);
59 JavaTestKit.shutdownActorSystem(node2);
60 JavaTestKit.shutdownActorSystem(node3);
71 public void createRpcRegistry() throws InterruptedException {
72 registry1 = node1.actorOf(Props.create(RpcRegistry.class));
73 registry2 = node2.actorOf(Props.create(RpcRegistry.class));
74 registry3 = node3.actorOf(Props.create(RpcRegistry.class));
78 public void stopRpcRegistry() throws InterruptedException {
79 if (registry1 != null)
80 node1.stop(registry1);
81 if (registry2 != null)
82 node2.stop(registry2);
83 if (registry3 != null)
84 node3.stop(registry3);
89 * 1. Register rpc, ensure router can be found
90 * 2. Then remove rpc, ensure its deleted
92 * @throws URISyntaxException
93 * @throws InterruptedException
96 public void testAddRemoveRpcOnSameNode() throws URISyntaxException, InterruptedException {
98 final JavaTestKit mockBroker = new JavaTestKit(node1);
100 final ActorPath bucketStorePath = new ChildActorPath(registry1.path(), "store");
103 final JavaTestKit probe1 = createProbeForMessage(
104 node1, bucketStorePath, Messages.BucketStoreMessages.UpdateBucket.class);
107 registry1.tell(new SetLocalRouter(mockBroker.getRef()), mockBroker.getRef());
108 registry1.tell(getAddRouteMessage(), mockBroker.getRef());
110 //Bucket store should get an update bucket message. Updated bucket contains added rpc.
111 probe1.expectMsgClass(
112 FiniteDuration.apply(10, TimeUnit.SECONDS),
113 Messages.BucketStoreMessages.UpdateBucket.class);
116 registry1.tell(getRemoveRouteMessage(), mockBroker.getRef());
118 //Bucket store should get an update bucket message. Rpc is removed in the updated bucket
119 probe1.expectMsgClass(
120 FiniteDuration.apply(10, TimeUnit.SECONDS),
121 Messages.BucketStoreMessages.UpdateBucket.class);
128 * Three node cluster.
129 * 1. Register rpc on 1 node, ensure 2nd node gets updated
130 * 2. Remove rpc on 1 node, ensure 2nd node gets updated
132 * @throws URISyntaxException
133 * @throws InterruptedException
136 public void testRpcAddRemoveInCluster() throws URISyntaxException, InterruptedException {
138 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
140 //install probe on node2's bucket store
141 final ActorPath bucketStorePath = new ChildActorPath(registry2.path(), "store");
142 final JavaTestKit probe2 = createProbeForMessage(
143 node2, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
146 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
147 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
149 //Bucket store on node2 should get a message to update its local copy of remote buckets
150 probe2.expectMsgClass(
151 FiniteDuration.apply(10, TimeUnit.SECONDS),
152 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
155 registry1.tell(getRemoveRouteMessage(), mockBroker1.getRef());
157 //Bucket store on node2 should get a message to update its local copy of remote buckets
158 probe2.expectMsgClass(
159 FiniteDuration.apply(10, TimeUnit.SECONDS),
160 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
165 * Three node cluster.
166 * Register rpc on 2 nodes. Ensure 3rd gets updated.
171 public void testRpcAddedOnMultiNodes() throws Exception {
173 final JavaTestKit mockBroker1 = new JavaTestKit(node1);
174 final JavaTestKit mockBroker2 = new JavaTestKit(node2);
175 final JavaTestKit mockBroker3 = new JavaTestKit(node3);
177 registry3.tell(new SetLocalRouter(mockBroker3.getRef()), mockBroker3.getRef());
179 //install probe on node 3
180 final ActorPath bucketStorePath = new ChildActorPath(registry3.path(), "store");
181 final JavaTestKit probe3 = createProbeForMessage(
182 node3, bucketStorePath, Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
186 registry1.tell(new SetLocalRouter(mockBroker1.getRef()), mockBroker1.getRef());
187 registry1.tell(getAddRouteMessage(), mockBroker1.getRef());
189 probe3.expectMsgClass(
190 FiniteDuration.apply(10, TimeUnit.SECONDS),
191 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
194 //Add same rpc on node 2
195 registry2.tell(new SetLocalRouter(mockBroker2.getRef()), mockBroker2.getRef());
196 registry2.tell(getAddRouteMessage(), mockBroker2.getRef());
198 probe3.expectMsgClass(
199 FiniteDuration.apply(10, TimeUnit.SECONDS),
200 Messages.BucketStoreMessages.UpdateRemoteBuckets.class);
203 private JavaTestKit createProbeForMessage(ActorSystem node, ActorPath subjectPath, final Class<?> clazz) {
204 final JavaTestKit probe = new JavaTestKit(node);
206 ConditionalProbe conditionalProbe =
207 new ConditionalProbe(probe.getRef(), new Predicate<Object>() {
209 public boolean apply(@Nullable Object input) {
211 return clazz.equals(input.getClass());
217 ActorSelection subject = node.actorSelection(subjectPath);
218 subject.tell(conditionalProbe, ActorRef.noSender());
224 private AddOrUpdateRoutes getAddRouteMessage() throws URISyntaxException {
225 return new AddOrUpdateRoutes(createRouteIds());
228 private RemoveRoutes getRemoveRouteMessage() throws URISyntaxException {
229 return new RemoveRoutes(createRouteIds());
232 private List<RpcRouter.RouteIdentifier<?, ?, ?>> createRouteIds() throws URISyntaxException {
233 QName type = new QName(new URI("/mockrpc"), "mockrpc");
234 List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = new ArrayList<>();
235 routeIds.add(new RouteIdentifierImpl(null, type, null));