Merge "BUG-2627: do not duplicate descriptions"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 package org.opendaylight.controller.cluster.datastore.utils;
2
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertTrue;
5 import static org.mockito.Mockito.doReturn;
6 import static org.mockito.Mockito.mock;
7 import akka.actor.ActorRef;
8 import akka.actor.ActorSelection;
9 import akka.actor.Address;
10 import akka.actor.Props;
11 import akka.actor.UntypedActor;
12 import akka.japi.Creator;
13 import akka.testkit.JavaTestKit;
14 import com.google.common.base.Optional;
15 import java.util.concurrent.TimeUnit;
16 import org.apache.commons.lang.time.StopWatch;
17 import org.junit.Test;
18 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
19 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
20 import org.opendaylight.controller.cluster.datastore.Configuration;
21 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
22 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
25 import scala.concurrent.Await;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.Duration;
28
29 public class ActorContextTest extends AbstractActorTest{
30
31     private static class MockShardManager extends UntypedActor {
32
33         private final boolean found;
34         private final ActorRef actorRef;
35
36         private MockShardManager(boolean found, ActorRef actorRef){
37
38             this.found = found;
39             this.actorRef = actorRef;
40         }
41
42         @Override public void onReceive(Object message) throws Exception {
43             if(found){
44                 getSender().tell(new LocalShardFound(actorRef), getSelf());
45             } else {
46                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
47             }
48         }
49
50         private static Props props(final boolean found, final ActorRef actorRef){
51             return Props.create(new MockShardManagerCreator(found, actorRef) );
52         }
53
54         @SuppressWarnings("serial")
55         private static class MockShardManagerCreator implements Creator<MockShardManager> {
56             final boolean found;
57             final ActorRef actorRef;
58
59             MockShardManagerCreator(boolean found, ActorRef actorRef) {
60                 this.found = found;
61                 this.actorRef = actorRef;
62             }
63
64             @Override
65             public MockShardManager create() throws Exception {
66                 return new MockShardManager(found, actorRef);
67             }
68         }
69     }
70
71     @Test
72     public void testFindLocalShardWithShardFound(){
73         new JavaTestKit(getSystem()) {{
74
75             new Within(duration("1 seconds")) {
76                 @Override
77                 protected void run() {
78
79                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
80
81                     ActorRef shardManagerActorRef = getSystem()
82                         .actorOf(MockShardManager.props(true, shardActorRef));
83
84                     ActorContext actorContext =
85                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
86                             mock(Configuration.class));
87
88                     Optional<ActorRef> out = actorContext.findLocalShard("default");
89
90                     assertEquals(shardActorRef, out.get());
91
92
93                     expectNoMsg();
94                 }
95             };
96         }};
97
98     }
99
100     @Test
101     public void testFindLocalShardWithShardNotFound(){
102         new JavaTestKit(getSystem()) {{
103             ActorRef shardManagerActorRef = getSystem()
104                     .actorOf(MockShardManager.props(false, null));
105
106             ActorContext actorContext =
107                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
108                             mock(Configuration.class));
109
110             Optional<ActorRef> out = actorContext.findLocalShard("default");
111             assertTrue(!out.isPresent());
112         }};
113
114     }
115
116     @Test
117     public void testExecuteRemoteOperation() {
118         new JavaTestKit(getSystem()) {{
119             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
120
121             ActorRef shardManagerActorRef = getSystem()
122                     .actorOf(MockShardManager.props(true, shardActorRef));
123
124             ActorContext actorContext =
125                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
126                             mock(Configuration.class));
127
128             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
129
130             Object out = actorContext.executeOperation(actor, "hello");
131
132             assertEquals("hello", out);
133         }};
134     }
135
136     @Test
137     public void testExecuteRemoteOperationAsync() {
138         new JavaTestKit(getSystem()) {{
139             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
140
141             ActorRef shardManagerActorRef = getSystem()
142                     .actorOf(MockShardManager.props(true, shardActorRef));
143
144             ActorContext actorContext =
145                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
146                             mock(Configuration.class));
147
148             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
149
150             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
151
152             try {
153                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
154                 assertEquals("Result", "hello", result);
155             } catch(Exception e) {
156                 throw new AssertionError(e);
157             }
158         }};
159     }
160
161     @Test
162     public void testIsPathLocal() {
163         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
164         ActorContext actorContext = null;
165
166         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
167         assertEquals(false, actorContext.isPathLocal(null));
168         assertEquals(false, actorContext.isPathLocal(""));
169
170         clusterWrapper.setSelfAddress(null);
171         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
172         assertEquals(false, actorContext.isPathLocal(""));
173
174         // even if the path is in local format, match the primary path (first 3 elements) and return true
175         clusterWrapper.setSelfAddress(new Address("akka", "test"));
176         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
177         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
178
179         clusterWrapper.setSelfAddress(new Address("akka", "test"));
180         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
181         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
182
183         clusterWrapper.setSelfAddress(new Address("akka", "test"));
184         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
185         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
186
187         // self address of remote format,but Tx path local format.
188         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
189         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
190         assertEquals(true, actorContext.isPathLocal(
191             "akka://system/user/shardmanager/shard/transaction"));
192
193         // self address of local format,but Tx path remote format.
194         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
195         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
196         assertEquals(false, actorContext.isPathLocal(
197             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
198
199         //local path but not same
200         clusterWrapper.setSelfAddress(new Address("akka", "test"));
201         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
202         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
203
204         //ip and port same
205         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
206         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
207         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
208
209         // forward-slash missing in address
210         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
211         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
212         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
213
214         //ips differ
215         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
216         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
217         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
218
219         //ports differ
220         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
221         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
222         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
223     }
224
225     @Test
226     public void testResolvePathForRemoteActor() {
227         ActorContext actorContext =
228                 new ActorContext(getSystem(), mock(ActorRef.class), mock(
229                         ClusterWrapper.class),
230                         mock(Configuration.class));
231
232         String actual = actorContext.resolvePath(
233                 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
234                 "akka://system/user/shardmanager/shard/transaction");
235
236         String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
237
238         assertEquals(expected, actual);
239     }
240
241     @Test
242     public void testResolvePathForLocalActor() {
243         ActorContext actorContext =
244                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
245                         mock(Configuration.class));
246
247         String actual = actorContext.resolvePath(
248                 "akka://system/user/shardmanager/shard",
249                 "akka://system/user/shardmanager/shard/transaction");
250
251         String expected = "akka://system/user/shardmanager/shard/transaction";
252
253         assertEquals(expected, actual);
254     }
255
256     @Test
257     public void testResolvePathForRemoteActorWithProperRemoteAddress() {
258         ActorContext actorContext =
259                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
260                         mock(Configuration.class));
261
262         String actual = actorContext.resolvePath(
263                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
264                 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
265
266         String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
267
268         assertEquals(expected, actual);
269     }
270
271     @Test
272     public void testRateLimiting(){
273         DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
274
275         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
276         doReturn("config").when(mockDataStoreContext).getDataStoreType();
277
278         ActorContext actorContext =
279                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
280                         mock(Configuration.class), mockDataStoreContext);
281
282         // Check that the initial value is being picked up from DataStoreContext
283         assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
284
285         actorContext.setTxCreationLimit(1.0);
286
287         assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
288
289
290         StopWatch watch = new StopWatch();
291
292         watch.start();
293
294         actorContext.acquireTxCreationPermit();
295         actorContext.acquireTxCreationPermit();
296         actorContext.acquireTxCreationPermit();
297
298         watch.stop();
299
300         assertTrue("did not take as much time as expected", watch.getTime() > 1000);
301     }
302 }