Convert CDS implementation to use msdal APIs
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Address;
25 import akka.actor.Props;
26 import akka.actor.UntypedActor;
27 import akka.dispatch.Futures;
28 import akka.japi.Creator;
29 import akka.testkit.TestActorRef;
30 import akka.testkit.javadsl.TestKit;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.collect.Maps;
34 import com.google.common.collect.Sets;
35 import com.typesafe.config.ConfigFactory;
36 import java.util.Arrays;
37 import java.util.Map;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.mockito.Mockito;
42 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
47 import org.opendaylight.controller.cluster.datastore.config.Configuration;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
56 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
57 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
58 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
59 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
60 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.Await;
65 import scala.concurrent.Future;
66 import scala.concurrent.duration.Duration;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class ActorContextTest extends AbstractActorTest {
70
71     static final Logger LOG = LoggerFactory.getLogger(ActorContextTest.class);
72
73     private static class TestMessage {
74     }
75
76     private static final class MockShardManager extends UntypedActor {
77
78         private final boolean found;
79         private final ActorRef actorRef;
80         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
81
82         private MockShardManager(final boolean found, final ActorRef actorRef) {
83
84             this.found = found;
85             this.actorRef = actorRef;
86         }
87
88         @Override public void onReceive(final Object message) throws Exception {
89             if (message instanceof FindPrimary) {
90                 FindPrimary fp = (FindPrimary)message;
91                 Object resp = findPrimaryResponses.get(fp.getShardName());
92                 if (resp == null) {
93                     LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
94                 } else {
95                     getSender().tell(resp, getSelf());
96                 }
97
98                 return;
99             }
100
101             if (found) {
102                 getSender().tell(new LocalShardFound(actorRef), getSelf());
103             } else {
104                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
105             }
106         }
107
108         void addFindPrimaryResp(final String shardName, final Object resp) {
109             findPrimaryResponses.put(shardName, resp);
110         }
111
112         private static Props props(final boolean found, final ActorRef actorRef) {
113             return Props.create(new MockShardManagerCreator(found, actorRef));
114         }
115
116         private static Props props() {
117             return Props.create(new MockShardManagerCreator());
118         }
119
120         @SuppressWarnings("serial")
121         private static class MockShardManagerCreator implements Creator<MockShardManager> {
122             final boolean found;
123             final ActorRef actorRef;
124
125             MockShardManagerCreator() {
126                 this.found = false;
127                 this.actorRef = null;
128             }
129
130             MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
131                 this.found = found;
132                 this.actorRef = actorRef;
133             }
134
135             @Override
136             public MockShardManager create() throws Exception {
137                 return new MockShardManager(found, actorRef);
138             }
139         }
140     }
141
142     @Test
143     public void testFindLocalShardWithShardFound() {
144         new TestKit(getSystem()) {
145             {
146                 within(duration("1 seconds"), () -> {
147                     ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
148
149                     ActorRef shardManagerActorRef = getSystem()
150                             .actorOf(MockShardManager.props(true, shardActorRef));
151
152                     ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
153                         mock(ClusterWrapper.class), mock(Configuration.class));
154
155                     Optional<ActorRef> out = actorContext.findLocalShard("default");
156
157                     assertEquals(shardActorRef, out.get());
158
159                     expectNoMsg();
160                     return null;
161                 });
162             }
163         };
164
165     }
166
167     @Test
168     public void testFindLocalShardWithShardNotFound() {
169         new TestKit(getSystem()) {
170             {
171                 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
172
173                 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
174                         mock(ClusterWrapper.class), mock(Configuration.class));
175
176                 Optional<ActorRef> out = actorContext.findLocalShard("default");
177                 assertTrue(!out.isPresent());
178             }
179         };
180
181     }
182
183     @Test
184     public void testExecuteRemoteOperation() {
185         new TestKit(getSystem()) {
186             {
187                 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
188
189                 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
190
191                 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
192                         mock(ClusterWrapper.class), mock(Configuration.class));
193
194                 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
195
196                 Object out = actorContext.executeOperation(actor, "hello");
197
198                 assertEquals("hello", out);
199             }
200         };
201     }
202
203     @Test
204     @SuppressWarnings("checkstyle:IllegalCatch")
205     public void testExecuteRemoteOperationAsync() {
206         new TestKit(getSystem()) {
207             {
208                 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
209
210                 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
211
212                 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
213                         mock(ClusterWrapper.class), mock(Configuration.class));
214
215                 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
216
217                 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
218
219                 try {
220                     Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
221                     assertEquals("Result", "hello", result);
222                 } catch (Exception e) {
223                     throw new AssertionError(e);
224                 }
225             }
226         };
227     }
228
229     @Test
230     public void testIsPathLocal() {
231         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
232         ActorContext actorContext = null;
233
234         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235         assertEquals(false, actorContext.isPathLocal(null));
236         assertEquals(false, actorContext.isPathLocal(""));
237
238         clusterWrapper.setSelfAddress(null);
239         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240         assertEquals(false, actorContext.isPathLocal(""));
241
242         // even if the path is in local format, match the primary path (first 3 elements) and return true
243         clusterWrapper.setSelfAddress(new Address("akka", "test"));
244         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
245         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
246
247         clusterWrapper.setSelfAddress(new Address("akka", "test"));
248         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
249         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
250
251         clusterWrapper.setSelfAddress(new Address("akka", "test"));
252         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
253         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
254
255         // self address of remote format,but Tx path local format.
256         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
257         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
258         assertEquals(true, actorContext.isPathLocal(
259             "akka://system/user/shardmanager/shard/transaction"));
260
261         // self address of local format,but Tx path remote format.
262         clusterWrapper.setSelfAddress(new Address("akka", "system"));
263         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
264         assertEquals(false, actorContext.isPathLocal(
265             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
266
267         //local path but not same
268         clusterWrapper.setSelfAddress(new Address("akka", "test"));
269         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
270         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
271
272         //ip and port same
273         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
274         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
275         assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
276
277         // forward-slash missing in address
278         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
279         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
280         assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
281
282         //ips differ
283         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
284         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
285         assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
286
287         //ports differ
288         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
289         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
290         assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
291     }
292
293     @Test
294     public void testClientDispatcherIsGlobalDispatcher() {
295         ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
296                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
297
298         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
299     }
300
301     @Test
302     public void testClientDispatcherIsNotGlobalDispatcher() {
303         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
304                 ConfigFactory.load("application-with-custom-dispatchers.conf"));
305
306         ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
307                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
308
309         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
310
311         actorSystem.terminate();
312     }
313
314     @Test
315     public void testSetDatastoreContext() {
316         new TestKit(getSystem()) {
317             {
318                 ActorContext actorContext = new ActorContext(getSystem(), getRef(),
319                         mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
320                                 .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
321                         new PrimaryShardInfoFutureCache());
322
323                 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
324                 assertEquals("getTransactionCommitOperationTimeout", 7,
325                         actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
326
327                 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
328                         .shardTransactionCommitTimeoutInSeconds(8).build();
329
330                 DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
331                 Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
332
333                 actorContext.setDatastoreContext(mockContextFactory);
334
335                 expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
336
337                 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
338
339                 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
340                 assertEquals("getTransactionCommitOperationTimeout", 8,
341                         actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
342             }
343         };
344     }
345
346     @Test
347     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
348
349         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
350
351         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
352                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
353                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
354
355         final String expPrimaryPath = "akka://test-system/find-primary-shard";
356         final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
357         ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
358                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
359             @Override
360             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
361                 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
362             }
363         };
364
365         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
366         PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
367
368         assertNotNull(actual);
369         assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
370         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
371                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
372         assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
373
374         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
375
376         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
377
378         assertEquals(cachedInfo, actual);
379
380         actorContext.getPrimaryShardInfoCache().remove("foobar");
381
382         cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
383
384         assertNull(cached);
385     }
386
387     @Test
388     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
389
390         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
391
392         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
393                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
394                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
395
396         final DataTree mockDataTree = Mockito.mock(DataTree.class);
397         final String expPrimaryPath = "akka://test-system/find-primary-shard";
398         ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
399                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
400             @Override
401             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
402                 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
403             }
404         };
405
406         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
407         PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
408
409         assertNotNull(actual);
410         assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
411         assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
412         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
413                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
414         assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
415
416         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
417
418         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
419
420         assertEquals(cachedInfo, actual);
421
422         actorContext.getPrimaryShardInfoCache().remove("foobar");
423
424         cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
425
426         assertNull(cached);
427     }
428
429     @Test
430     public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
431         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
432     }
433
434     @Test
435     public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
436         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
437     }
438
439     @SuppressWarnings("checkstyle:IllegalCatch")
440     private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
441         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
442
443         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
444                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
445                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
446
447         ActorContext actorContext =
448             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
449                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
450                 @Override
451                 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
452                     return Futures.successful(expectedException);
453                 }
454             };
455
456         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
457
458         try {
459             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
460             fail("Expected" + expectedException.getClass().toString());
461         } catch (Exception e) {
462             if (!expectedException.getClass().isInstance(e)) {
463                 fail("Expected Exception of type " + expectedException.getClass().toString());
464             }
465         }
466
467         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
468
469         assertNull(cached);
470     }
471
472     @Test
473     public void testBroadcast() {
474         new TestKit(getSystem()) {
475             {
476                 ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
477                 ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
478
479                 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
480                         MockShardManager.props());
481                 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
482                 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
483                         shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
484                 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
485                         shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
486                 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
487
488                 Configuration mockConfig = mock(Configuration.class);
489                 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
490                         .getAllShardNames();
491
492                 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
493                         mock(ClusterWrapper.class), mockConfig,
494                         DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
495                         new PrimaryShardInfoFutureCache());
496
497                 actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
498
499                 MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
500                 MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
501             }
502         };
503     }
504 }