Remove deprecated PreLithium Tx context classes and related code
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.typesafe.config.ConfigFactory;
35 import java.util.Arrays;
36 import java.util.Map;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.Assert;
39 import org.junit.Test;
40 import org.mockito.Mockito;
41 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
42 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
43 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
57 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
58 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
59 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63 import scala.concurrent.Await;
64 import scala.concurrent.Future;
65 import scala.concurrent.duration.Duration;
66 import scala.concurrent.duration.FiniteDuration;
67
68 public class ActorContextTest extends AbstractActorTest{
69
70     static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
71
72     private static class TestMessage {
73     }
74
75     private static class MockShardManager extends UntypedActor {
76
77         private final boolean found;
78         private final ActorRef actorRef;
79         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
80
81         private MockShardManager(boolean found, ActorRef actorRef){
82
83             this.found = found;
84             this.actorRef = actorRef;
85         }
86
87         @Override public void onReceive(Object message) throws Exception {
88             if(message instanceof FindPrimary) {
89                 FindPrimary fp = (FindPrimary)message;
90                 Object resp = findPrimaryResponses.get(fp.getShardName());
91                 if(resp == null) {
92                     log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
93                 } else {
94                     getSender().tell(resp, getSelf());
95                 }
96
97                 return;
98             }
99
100             if(found){
101                 getSender().tell(new LocalShardFound(actorRef), getSelf());
102             } else {
103                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
104             }
105         }
106
107         void addFindPrimaryResp(String shardName, Object resp) {
108             findPrimaryResponses.put(shardName, resp);
109         }
110
111         private static Props props(final boolean found, final ActorRef actorRef){
112             return Props.create(new MockShardManagerCreator(found, actorRef) );
113         }
114
115         private static Props props(){
116             return Props.create(new MockShardManagerCreator() );
117         }
118
119         @SuppressWarnings("serial")
120         private static class MockShardManagerCreator implements Creator<MockShardManager> {
121             final boolean found;
122             final ActorRef actorRef;
123
124             MockShardManagerCreator() {
125                 this.found = false;
126                 this.actorRef = null;
127             }
128
129             MockShardManagerCreator(boolean found, ActorRef actorRef) {
130                 this.found = found;
131                 this.actorRef = actorRef;
132             }
133
134             @Override
135             public MockShardManager create() throws Exception {
136                 return new MockShardManager(found, actorRef);
137             }
138         }
139     }
140
141     @Test
142     public void testFindLocalShardWithShardFound(){
143         new JavaTestKit(getSystem()) {{
144
145             new Within(duration("1 seconds")) {
146                 @Override
147                 protected void run() {
148
149                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
150
151                     ActorRef shardManagerActorRef = getSystem()
152                         .actorOf(MockShardManager.props(true, shardActorRef));
153
154                     ActorContext actorContext =
155                         new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
156                             mock(Configuration.class));
157
158                     Optional<ActorRef> out = actorContext.findLocalShard("default");
159
160                     assertEquals(shardActorRef, out.get());
161
162
163                     expectNoMsg();
164                 }
165             };
166         }};
167
168     }
169
170     @Test
171     public void testFindLocalShardWithShardNotFound(){
172         new JavaTestKit(getSystem()) {{
173             ActorRef shardManagerActorRef = getSystem()
174                     .actorOf(MockShardManager.props(false, null));
175
176             ActorContext actorContext =
177                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
178                             mock(Configuration.class));
179
180             Optional<ActorRef> out = actorContext.findLocalShard("default");
181             assertTrue(!out.isPresent());
182         }};
183
184     }
185
186     @Test
187     public void testExecuteRemoteOperation() {
188         new JavaTestKit(getSystem()) {{
189             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
190
191             ActorRef shardManagerActorRef = getSystem()
192                     .actorOf(MockShardManager.props(true, shardActorRef));
193
194             ActorContext actorContext =
195                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
196                             mock(Configuration.class));
197
198             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
199
200             Object out = actorContext.executeOperation(actor, "hello");
201
202             assertEquals("hello", out);
203         }};
204     }
205
206     @Test
207     public void testExecuteRemoteOperationAsync() {
208         new JavaTestKit(getSystem()) {{
209             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
210
211             ActorRef shardManagerActorRef = getSystem()
212                     .actorOf(MockShardManager.props(true, shardActorRef));
213
214             ActorContext actorContext =
215                     new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
216                             mock(Configuration.class));
217
218             ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
219
220             Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
221
222             try {
223                 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
224                 assertEquals("Result", "hello", result);
225             } catch(Exception e) {
226                 throw new AssertionError(e);
227             }
228         }};
229     }
230
231     @Test
232     public void testIsPathLocal() {
233         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
234         ActorContext actorContext = null;
235
236         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
237         assertEquals(false, actorContext.isPathLocal(null));
238         assertEquals(false, actorContext.isPathLocal(""));
239
240         clusterWrapper.setSelfAddress(null);
241         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
242         assertEquals(false, actorContext.isPathLocal(""));
243
244         // even if the path is in local format, match the primary path (first 3 elements) and return true
245         clusterWrapper.setSelfAddress(new Address("akka", "test"));
246         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
247         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
248
249         clusterWrapper.setSelfAddress(new Address("akka", "test"));
250         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
251         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
252
253         clusterWrapper.setSelfAddress(new Address("akka", "test"));
254         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
255         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
256
257         // self address of remote format,but Tx path local format.
258         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
259         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
260         assertEquals(true, actorContext.isPathLocal(
261             "akka://system/user/shardmanager/shard/transaction"));
262
263         // self address of local format,but Tx path remote format.
264         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
265         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
266         assertEquals(false, actorContext.isPathLocal(
267             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
268
269         //local path but not same
270         clusterWrapper.setSelfAddress(new Address("akka", "test"));
271         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
272         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
273
274         //ip and port same
275         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
276         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
277         assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
278
279         // forward-slash missing in address
280         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
281         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
282         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
283
284         //ips differ
285         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
286         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
287         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
288
289         //ports differ
290         clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
291         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
292         assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
293     }
294
295     @Test
296     public void testClientDispatcherIsGlobalDispatcher(){
297         ActorContext actorContext =
298                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
299                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
300
301         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
302
303     }
304
305     @Test
306     public void testClientDispatcherIsNotGlobalDispatcher(){
307         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
308
309         ActorContext actorContext =
310                 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
311                         mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
312
313         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
314
315         actorSystem.shutdown();
316
317     }
318
319     @Test
320     public void testSetDatastoreContext() {
321         new JavaTestKit(getSystem()) {{
322             ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
323                             mock(Configuration.class), DatastoreContext.newBuilder().
324                                 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
325
326             assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
327             assertEquals("getTransactionCommitOperationTimeout", 7,
328                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
329
330             DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
331                     shardTransactionCommitTimeoutInSeconds(8).build();
332
333             DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
334             Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
335
336             actorContext.setDatastoreContext(mockContextFactory);
337
338             expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
339
340             Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
341
342             assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
343             assertEquals("getTransactionCommitOperationTimeout", 8,
344                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
345         }};
346     }
347
348     @Test
349     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
350
351             TestActorRef<MessageCollectorActor> shardManager =
352                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
353
354             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
355                     logicalStoreType(LogicalDatastoreType.CONFIGURATION).
356                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
357
358             final String expPrimaryPath = "akka://test-system/find-primary-shard";
359             final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
360             ActorContext actorContext =
361                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
362                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
363                         @Override
364                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
365                             return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
366                         }
367                     };
368
369             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
370             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
371
372             assertNotNull(actual);
373             assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
374             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
375                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
376             assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
377
378             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
379
380             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
381
382             assertEquals(cachedInfo, actual);
383
384             actorContext.getPrimaryShardInfoCache().remove("foobar");
385
386             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
387
388             assertNull(cached);
389     }
390
391     @Test
392     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
393
394             TestActorRef<MessageCollectorActor> shardManager =
395                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
396
397             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
398                     logicalStoreType(LogicalDatastoreType.CONFIGURATION).
399                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
400
401             final DataTree mockDataTree = Mockito.mock(DataTree.class);
402             final String expPrimaryPath = "akka://test-system/find-primary-shard";
403             ActorContext actorContext =
404                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
405                             mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
406                         @Override
407                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
408                             return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
409                         }
410                     };
411
412             Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
413             PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
414
415             assertNotNull(actual);
416             assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
417             assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
418             assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
419                     expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
420             assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
421
422             Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
423
424             PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
425
426             assertEquals(cachedInfo, actual);
427
428             actorContext.getPrimaryShardInfoCache().remove("foobar");
429
430             cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
431
432             assertNull(cached);
433     }
434
435     @Test
436     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
437         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
438     }
439
440     @Test
441     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
442         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
443     }
444
445     private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
446         TestActorRef<MessageCollectorActor> shardManager =
447             TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
448
449         DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
450             logicalStoreType(LogicalDatastoreType.CONFIGURATION).
451             shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
452
453         ActorContext actorContext =
454             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
455                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
456                 @Override
457                 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
458                     return Futures.successful(expectedException);
459                 }
460             };
461
462         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
463
464         try {
465             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
466             fail("Expected" + expectedException.getClass().toString());
467         } catch(Exception e){
468             if(!expectedException.getClass().isInstance(e)) {
469                 fail("Expected Exception of type " + expectedException.getClass().toString());
470             }
471         }
472
473         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
474
475         assertNull(cached);
476     }
477
478     @Test
479     public void testBroadcast() {
480         new JavaTestKit(getSystem()) {{
481             ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
482             ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
483
484             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
485             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
486             shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
487                     DataStoreVersions.CURRENT_VERSION));
488             shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
489                     DataStoreVersions.CURRENT_VERSION));
490             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
491
492             Configuration mockConfig = mock(Configuration.class);
493             doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
494                     when(mockConfig).getAllShardNames();
495
496             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
497                     mock(ClusterWrapper.class), mockConfig,
498                     DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
499
500             actorContext.broadcast(new TestMessage());
501
502             MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
503             MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
504         }};
505     }
506
507 }