Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorUtilsTest.java
1 /*
2  * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.utils;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Address;
25 import akka.actor.Props;
26 import akka.actor.UntypedAbstractActor;
27 import akka.dispatch.Futures;
28 import akka.japi.Creator;
29 import akka.testkit.TestActorRef;
30 import akka.testkit.javadsl.TestKit;
31 import akka.util.Timeout;
32 import com.google.common.collect.Sets;
33 import com.typesafe.config.ConfigFactory;
34 import java.time.Duration;
35 import java.util.Arrays;
36 import java.util.HashMap;
37 import java.util.Map;
38 import java.util.Optional;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.Assert;
41 import org.junit.Test;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
47 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
48 import org.opendaylight.controller.cluster.datastore.config.Configuration;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
57 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
58 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
59 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
60 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
61 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
62 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.Await;
66 import scala.concurrent.Future;
67 import scala.concurrent.duration.FiniteDuration;
68
69 public class ActorUtilsTest extends AbstractActorTest {
70     static final Logger LOG = LoggerFactory.getLogger(ActorUtilsTest.class);
71
72     private static final class TestMessage {
73
74     }
75
76     private static final class MockShardManager extends UntypedAbstractActor {
77         private final Map<String,Object> findPrimaryResponses = new HashMap<>();
78         private final boolean found;
79         private final ActorRef actorRef;
80
81         private MockShardManager(final boolean found, final ActorRef actorRef) {
82
83             this.found = found;
84             this.actorRef = actorRef;
85         }
86
87         @Override public void onReceive(final Object message) {
88             if (message instanceof FindPrimary fp) {
89                 Object resp = findPrimaryResponses.get(fp.getShardName());
90                 if (resp == null) {
91                     LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
92                 } else {
93                     getSender().tell(resp, getSelf());
94                 }
95
96                 return;
97             }
98
99             if (found) {
100                 getSender().tell(new LocalShardFound(actorRef), getSelf());
101             } else {
102                 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
103             }
104         }
105
106         void addFindPrimaryResp(final String shardName, final Object resp) {
107             findPrimaryResponses.put(shardName, resp);
108         }
109
110         private static Props props(final boolean found, final ActorRef actorRef) {
111             return Props.create(MockShardManager.class, new MockShardManagerCreator(found, actorRef));
112         }
113
114         private static Props props() {
115             return Props.create(MockShardManager.class, new MockShardManagerCreator());
116         }
117
118         @SuppressWarnings("serial")
119         private static class MockShardManagerCreator implements Creator<MockShardManager> {
120             final boolean found;
121             final ActorRef actorRef;
122
123             MockShardManagerCreator() {
124                 found = false;
125                 actorRef = null;
126             }
127
128             MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
129                 this.found = found;
130                 this.actorRef = actorRef;
131             }
132
133             @Override
134             public MockShardManager create() {
135                 return new MockShardManager(found, actorRef);
136             }
137         }
138     }
139
140     @Test
141     public void testFindLocalShardWithShardFound() {
142         final TestKit testKit = new TestKit(getSystem());
143         testKit.within(Duration.ofSeconds(1), () -> {
144             ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
145
146             ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
147
148             ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
149                 mock(ClusterWrapper.class), mock(Configuration.class));
150
151             assertEquals(Optional.of(shardActorRef), actorUtils.findLocalShard("default"));
152
153             testKit.expectNoMessage();
154             return null;
155         });
156     }
157
158     @Test
159     public void testFindLocalShardWithShardNotFound() {
160         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
161
162         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
163             mock(Configuration.class));
164
165         Optional<ActorRef> out = actorUtils.findLocalShard("default");
166         assertFalse(out.isPresent());
167     }
168
169     @Test
170     public void testExecuteRemoteOperation() {
171         ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
172
173         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
174
175         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
176             mock(ClusterWrapper.class), mock(Configuration.class));
177
178         ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
179
180         Object out = actorUtils.executeOperation(actor, "hello");
181
182         assertEquals("hello", out);
183     }
184
185     @Test
186     public void testExecuteRemoteOperationAsync() throws Exception {
187         ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
188
189         ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
190
191         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
192             mock(ClusterWrapper.class), mock(Configuration.class));
193
194         ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
195
196         Future<Object> future = actorUtils.executeOperationAsync(actor, "hello");
197
198         Object result = Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
199         assertEquals("Result", "hello", result);
200     }
201
202     @Test
203     public void testIsPathLocal() {
204         MockClusterWrapper clusterWrapper = new MockClusterWrapper();
205         ActorUtils actorUtils = null;
206
207         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
208         assertFalse(actorUtils.isPathLocal(null));
209         assertFalse(actorUtils.isPathLocal(""));
210
211         clusterWrapper.setSelfAddress(null);
212         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
213         assertFalse(actorUtils.isPathLocal(""));
214
215         // even if the path is in local format, match the primary path (first 3 elements) and return true
216         clusterWrapper.setSelfAddress(new Address("akka", "test"));
217         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
218         assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
219
220         clusterWrapper.setSelfAddress(new Address("akka", "test"));
221         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
222         assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
223
224         clusterWrapper.setSelfAddress(new Address("akka", "test"));
225         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
226         assertTrue(actorUtils.isPathLocal("akka://test/user/token2/token3/$a"));
227
228         // self address of remote format,but Tx path local format.
229         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
230         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
231         assertTrue(actorUtils.isPathLocal("akka://system/user/shardmanager/shard/transaction"));
232
233         // self address of local format,but Tx path remote format.
234         clusterWrapper.setSelfAddress(new Address("akka", "system"));
235         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
236         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
237
238         //local path but not same
239         clusterWrapper.setSelfAddress(new Address("akka", "test"));
240         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
241         assertTrue(actorUtils.isPathLocal("akka://test1/user/$a"));
242
243         //ip and port same
244         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
245         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
246         assertTrue(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/"));
247
248         // forward-slash missing in address
249         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
250         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
251         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550"));
252
253         //ips differ
254         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
255         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
256         assertFalse(actorUtils.isPathLocal("akka://system@127.1.0.1:2550/"));
257
258         //ports differ
259         clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
260         actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
261         assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2551/"));
262     }
263
264     @Test
265     public void testClientDispatcherIsGlobalDispatcher() {
266         ActorUtils actorUtils = new ActorUtils(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
267                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
268
269         assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
270     }
271
272     @Test
273     public void testClientDispatcherIsNotGlobalDispatcher() {
274         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
275                 ConfigFactory.load("application-with-custom-dispatchers.conf"));
276
277         ActorUtils actorUtils = new ActorUtils(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
278                 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
279
280         assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
281
282         actorSystem.terminate();
283     }
284
285     @Test
286     public void testSetDatastoreContext() {
287         final TestKit testKit = new TestKit(getSystem());
288         ActorUtils actorUtils = new ActorUtils(getSystem(), testKit.getRef(),
289             mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
290             .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
291             new PrimaryShardInfoFutureCache());
292
293         assertEquals("getOperationDuration", 5, actorUtils.getOperationDuration().toSeconds());
294         assertEquals("getTransactionCommitOperationTimeout", 7,
295             actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
296
297         DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
298                 .shardTransactionCommitTimeoutInSeconds(8).build();
299
300         DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
301         Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
302
303         actorUtils.setDatastoreContext(mockContextFactory);
304
305         testKit.expectMsgClass(Duration.ofSeconds(5), DatastoreContextFactory.class);
306
307         Assert.assertSame("getDatastoreContext", newContext, actorUtils.getDatastoreContext());
308
309         assertEquals("getOperationDuration", 6, actorUtils.getOperationDuration().toSeconds());
310         assertEquals("getTransactionCommitOperationTimeout", 8,
311             actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
312     }
313
314     @Test
315     public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
316
317         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
318
319         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
320                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
321                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
322
323         final String expPrimaryPath = "akka://test-system/find-primary-shard";
324         final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
325         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
326                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
327             @Override
328             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
329                 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
330             }
331         };
332
333         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
334         PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
335
336         assertNotNull(actual);
337         assertFalse("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
338         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
339                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
340         assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
341
342         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
343
344         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
345
346         assertEquals(cachedInfo, actual);
347
348         actorUtils.getPrimaryShardInfoCache().remove("foobar");
349
350         cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
351
352         assertNull(cached);
353     }
354
355     @Test
356     public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
357
358         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
359
360         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
361                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
362                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
363
364         final DataTree mockDataTree = Mockito.mock(DataTree.class);
365         final String expPrimaryPath = "akka://test-system/find-primary-shard";
366         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
367                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
368             @Override
369             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
370                 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
371             }
372         };
373
374         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
375         PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
376
377         assertNotNull(actual);
378         assertTrue("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
379         assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().orElseThrow());
380         assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
381                 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
382         assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
383
384         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
385
386         PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
387
388         assertEquals(cachedInfo, actual);
389
390         actorUtils.getPrimaryShardInfoCache().remove("foobar");
391
392         cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
393
394         assertNull(cached);
395     }
396
397     @Test
398     public void testFindPrimaryShardAsyncPrimaryNotFound() {
399         testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
400     }
401
402     @Test
403     public void testFindPrimaryShardAsyncActorNotInitialized() {
404         testFindPrimaryExceptions(new NotInitializedException("not initialized"));
405     }
406
407     @SuppressWarnings("checkstyle:IllegalCatch")
408     private static void testFindPrimaryExceptions(final Object expectedException) {
409         ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
410
411         DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
412                 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
413                 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
414
415         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
416                 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
417             @Override
418             protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
419                 return Futures.successful(expectedException);
420             }
421         };
422
423         Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
424
425         try {
426             Await.result(foobar, FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
427             fail("Expected" + expectedException.getClass().toString());
428         } catch (Exception e) {
429             if (!expectedException.getClass().isInstance(e)) {
430                 fail("Expected Exception of type " + expectedException.getClass().toString());
431             }
432         }
433
434         Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
435
436         assertNull(cached);
437     }
438
439     @Test
440     public void testBroadcast() {
441         ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
442         ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
443
444         TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
445             MockShardManager.props());
446         MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
447         shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
448             shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
449         shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
450             shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
451         shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
452
453         Configuration mockConfig = mock(Configuration.class);
454         doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
455         .getAllShardNames();
456
457         ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
458             mock(ClusterWrapper.class), mockConfig,
459             DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
460             new PrimaryShardInfoFutureCache());
461
462         actorUtils.broadcast(v -> new TestMessage(), TestMessage.class);
463
464         MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
465         MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);
466     }
467 }