2b8b2bb528bd6e4bc2019cfdb8d6d02ee82df502
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore.utils;
10
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotEquals;
14 import static org.junit.Assert.assertNotNull;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertSame;
17 import static org.junit.Assert.assertTrue;
18 import static org.junit.Assert.fail;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
21
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.ActorSystem;
25 import akka.actor.Address;
26 import akka.actor.Props;
27 import akka.actor.UntypedAbstractActor;
28 import akka.dispatch.Futures;
29 import akka.japi.Creator;
30 import akka.testkit.TestActorRef;
31 import akka.testkit.javadsl.TestKit;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Maps;
35 import com.google.common.collect.Sets;
36 import com.typesafe.config.ConfigFactory;
37 import java.util.Arrays;
38 import java.util.Map;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.Assert;
41 import org.junit.Test;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
47 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
48 import org.opendaylight.controller.cluster.datastore.config.Configuration;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
57 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
58 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
59 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
60 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
61 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.Await;
66 import scala.concurrent.Future;
67 import scala.concurrent.duration.Duration;
68 import scala.concurrent.duration.FiniteDuration;
69
70 public class ActorContextTest extends AbstractActorTest {
71
72     static final Logger LOG = LoggerFactory.getLogger(ActorContextTest.class);
73
74     private static class TestMessage {
75     }
76
77     private static final class MockShardManager extends UntypedAbstractActor {
78
79         private final boolean found;
80         private final ActorRef actorRef;
81         private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
82
83         private MockShardManager(final boolean found, final ActorRef actorRef) {
84
85             this.found = found;
86             this.actorRef = actorRef;
87         }
88
89         @Override public void onReceive(final Object message) {
90             if (message instanceof FindPrimary) {
91                 FindPrimary fp = (FindPrimary)message;
92                 Object resp = findPrimaryResponses.get(fp.getShardName());
93                 if (resp == null) {
94                     LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
95                 } else {
96                     getSender().tell(resp, getSelf());
97                 }
98
99                 return;
100             }
101
102             if (found) {
103                 getSender().tell(new LocalShardFound(actorRef), getSelf());
104             } else {
105                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
106             }
107         }
108
109         void addFindPrimaryResp(final String shardName, final Object resp) {
110             findPrimaryResponses.put(shardName, resp);
111         }
112
113         private static Props props(final boolean found, final ActorRef actorRef) {
114             return Props.create(new MockShardManagerCreator(found, actorRef));
115         }
116
117         private static Props props() {
118             return Props.create(new MockShardManagerCreator());
119         }
120
121         @SuppressWarnings("serial")
122         private static class MockShardManagerCreator implements Creator<MockShardManager> {
123             final boolean found;
124             final ActorRef actorRef;
125
126             MockShardManagerCreator() {
127                 this.found = false;
128                 this.actorRef = null;
129             }
130
131             MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
132                 this.found = found;
133                 this.actorRef = actorRef;
134             }
135
136             @Override
137             public MockShardManager create() {
138                 return new MockShardManager(found, actorRef);
139             }
140         }
141     }
142
143     @Test
144     public void testFindLocalShardWithShardFound() {
145         final TestKit testKit = new TestKit(getSystem());
146         testKit.within(testKit.duration("1 seconds"), () -> {
147             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
148
149             ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
150
151             ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
152                 mock(ClusterWrapper.class), mock(Configuration.class));
153
154             Optional<ActorRef> out = actorContext.findLocalShard("default");
155
156             assertEquals(shardActorRef, out.get());
157
158             testKit.expectNoMessage();
159             return null;
160         });
161     }
162
163     @Test
164     public void testFindLocalShardWithShardNotFound() {
165         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
166
167         ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
168             mock(Configuration.class));
169
170         Optional<ActorRef> out = actorContext.findLocalShard("default");
171         assertFalse(out.isPresent());
172     }
173
174     @Test
175     public void testExecuteRemoteOperation() {
176         ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
177
178         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
179
180         ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
181             mock(ClusterWrapper.class), mock(Configuration.class));
182
183         ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
184
185         Object out = actorContext.executeOperation(actor, "hello");
186
187         assertEquals("hello", out);
188     }
189
190     @Test
191     public void testExecuteRemoteOperationAsync() throws Exception {
192         ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
193
194         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
195
196         ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
197             mock(ClusterWrapper.class), mock(Configuration.class));
198
199         ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
200
201         Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
202
203         Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
204         assertEquals("Result", "hello", result);
205     }
206
207     @Test
208     public void testIsPathLocal() {
209         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
210         ActorContext actorContext = null;
211
212         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
213         assertEquals(false, actorContext.isPathLocal(null));
214         assertEquals(false, actorContext.isPathLocal(""));
215
216         clusterWrapper.setSelfAddress(null);
217         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
218         assertEquals(false, actorContext.isPathLocal(""));
219
220         // even if the path is in local format, match the primary path (first 3 elements) and return true
221         clusterWrapper.setSelfAddress(new Address("akka", "test"));
222         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
223         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
224
225         clusterWrapper.setSelfAddress(new Address("akka", "test"));
226         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
227         assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
228
229         clusterWrapper.setSelfAddress(new Address("akka", "test"));
230         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
231         assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
232
233         // self address of remote format,but Tx path local format.
234         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
235         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236         assertEquals(true, actorContext.isPathLocal(
237             "akka://system/user/shardmanager/shard/transaction"));
238
239         // self address of local format,but Tx path remote format.
240         clusterWrapper.setSelfAddress(new Address("akka", "system"));
241         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
242         assertEquals(false, actorContext.isPathLocal(
243             "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
244
245         //local path but not same
246         clusterWrapper.setSelfAddress(new Address("akka", "test"));
247         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248         assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
249
250         //ip and port same
251         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
252         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
253         assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
254
255         // forward-slash missing in address
256         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
257         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
258         assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
259
260         //ips differ
261         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
262         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
263         assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
264
265         //ports differ
266         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
267         actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
268         assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
269     }
270
271     @Test
272     public void testClientDispatcherIsGlobalDispatcher() {
273         ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
274                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
275
276         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
277     }
278
279     @Test
280     public void testClientDispatcherIsNotGlobalDispatcher() {
281         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
282                 ConfigFactory.load("application-with-custom-dispatchers.conf"));
283
284         ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
285                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
286
287         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
288
289         actorSystem.terminate();
290     }
291
292     @Test
293     public void testSetDatastoreContext() {
294         final TestKit testKit = new TestKit(getSystem());
295         ActorContext actorContext = new ActorContext(getSystem(), testKit.getRef(),
296             mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
297             .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
298             new PrimaryShardInfoFutureCache());
299
300         assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
301         assertEquals("getTransactionCommitOperationTimeout", 7,
302             actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
303
304         DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
305                 .shardTransactionCommitTimeoutInSeconds(8).build();
306
307         DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
308         Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
309
310         actorContext.setDatastoreContext(mockContextFactory);
311
312         testKit.expectMsgClass(testKit.duration("5 seconds"), DatastoreContextFactory.class);
313
314         Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
315
316         assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
317         assertEquals("getTransactionCommitOperationTimeout", 8,
318             actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
319     }
320
321     @Test
322     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
323
324         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
325
326         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
327                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
328                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
329
330         final String expPrimaryPath = "akka://test-system/find-primary-shard";
331         final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
332         ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
333                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
334             @Override
335             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
336                 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
337             }
338         };
339
340         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
341         PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
342
343         assertNotNull(actual);
344         assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
345         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
346                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
347         assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
348
349         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
350
351         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
352
353         assertEquals(cachedInfo, actual);
354
355         actorContext.getPrimaryShardInfoCache().remove("foobar");
356
357         cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
358
359         assertNull(cached);
360     }
361
362     @Test
363     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
364
365         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
366
367         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
368                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
369                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
370
371         final DataTree mockDataTree = Mockito.mock(DataTree.class);
372         final String expPrimaryPath = "akka://test-system/find-primary-shard";
373         ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
374                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
375             @Override
376             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
377                 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
378             }
379         };
380
381         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
382         PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
383
384         assertNotNull(actual);
385         assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
386         assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
387         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
388                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
389         assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
390
391         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
392
393         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
394
395         assertEquals(cachedInfo, actual);
396
397         actorContext.getPrimaryShardInfoCache().remove("foobar");
398
399         cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
400
401         assertNull(cached);
402     }
403
404     @Test
405     public void testFindPrimaryShardAsyncPrimaryNotFound() {
406         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
407     }
408
409     @Test
410     public void testFindPrimaryShardAsyncActorNotInitialized() {
411         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
412     }
413
414     @SuppressWarnings("checkstyle:IllegalCatch")
415     private static void testFindPrimaryExceptions(final Object expectedException) {
416         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
417
418         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
419                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
420                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
421
422         ActorContext actorContext =
423             new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
424                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
425                 @Override
426                 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
427                     return Futures.successful(expectedException);
428                 }
429             };
430
431         Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
432
433         try {
434             Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
435             fail("Expected" + expectedException.getClass().toString());
436         } catch (Exception e) {
437             if (!expectedException.getClass().isInstance(e)) {
438                 fail("Expected Exception of type " + expectedException.getClass().toString());
439             }
440         }
441
442         Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
443
444         assertNull(cached);
445     }
446
447     @Test
448     public void testBroadcast() {
449         ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
450         ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
451
452         TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
453             MockShardManager.props());
454         MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
455         shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
456             shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
457         shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
458             shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
459         shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
460
461         Configuration mockConfig = mock(Configuration.class);
462         doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
463         .getAllShardNames();
464
465         ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
466             mock(ClusterWrapper.class), mockConfig,
467             DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
468             new PrimaryShardInfoFutureCache());
469
470         actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
471
472         MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
473         MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
474     }
475 }