2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.utils;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertFalse;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Address;
25 import akka.actor.Props;
26 import akka.actor.UntypedAbstractActor;
27 import akka.dispatch.Futures;
28 import akka.japi.Creator;
29 import akka.testkit.TestActorRef;
30 import akka.testkit.javadsl.TestKit;
31 import akka.util.Timeout;
32 import com.google.common.collect.Sets;
33 import com.typesafe.config.ConfigFactory;
34 import java.time.Duration;
35 import java.util.Arrays;
36 import java.util.HashMap;
38 import java.util.Optional;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.Assert;
41 import org.junit.Test;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
47 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
48 import org.opendaylight.controller.cluster.datastore.config.Configuration;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
57 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
58 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
59 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
60 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
61 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.Await;
66 import scala.concurrent.Future;
67 import scala.concurrent.duration.FiniteDuration;
69 public class ActorUtilsTest extends AbstractActorTest {
71 static final Logger LOG = LoggerFactory.getLogger(ActorUtilsTest.class);
73 private static class TestMessage {
76 private static final class MockShardManager extends UntypedAbstractActor {
77 private final Map<String,Object> findPrimaryResponses = new HashMap<>();
78 private final boolean found;
79 private final ActorRef actorRef;
81 private MockShardManager(final boolean found, final ActorRef actorRef) {
84 this.actorRef = actorRef;
87 @Override public void onReceive(final Object message) {
88 if (message instanceof FindPrimary) {
89 FindPrimary fp = (FindPrimary)message;
90 Object resp = findPrimaryResponses.get(fp.getShardName());
92 LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
94 getSender().tell(resp, getSelf());
101 getSender().tell(new LocalShardFound(actorRef), getSelf());
103 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
107 void addFindPrimaryResp(final String shardName, final Object resp) {
108 findPrimaryResponses.put(shardName, resp);
111 private static Props props(final boolean found, final ActorRef actorRef) {
112 return Props.create(MockShardManager.class, new MockShardManagerCreator(found, actorRef));
115 private static Props props() {
116 return Props.create(MockShardManager.class, new MockShardManagerCreator());
119 @SuppressWarnings("serial")
120 private static class MockShardManagerCreator implements Creator<MockShardManager> {
122 final ActorRef actorRef;
124 MockShardManagerCreator() {
126 this.actorRef = null;
129 MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
131 this.actorRef = actorRef;
135 public MockShardManager create() {
136 return new MockShardManager(found, actorRef);
142 public void testFindLocalShardWithShardFound() {
143 final TestKit testKit = new TestKit(getSystem());
144 testKit.within(Duration.ofSeconds(1), () -> {
145 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
147 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
149 ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
150 mock(ClusterWrapper.class), mock(Configuration.class));
152 Optional<ActorRef> out = actorUtils.findLocalShard("default");
154 assertEquals(shardActorRef, out.get());
156 testKit.expectNoMessage();
162 public void testFindLocalShardWithShardNotFound() {
163 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
165 ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
166 mock(Configuration.class));
168 Optional<ActorRef> out = actorUtils.findLocalShard("default");
169 assertFalse(out.isPresent());
173 public void testExecuteRemoteOperation() {
174 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
176 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
178 ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
179 mock(ClusterWrapper.class), mock(Configuration.class));
181 ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
183 Object out = actorUtils.executeOperation(actor, "hello");
185 assertEquals("hello", out);
189 public void testExecuteRemoteOperationAsync() throws Exception {
190 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
192 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
194 ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
195 mock(ClusterWrapper.class), mock(Configuration.class));
197 ActorSelection actor = actorUtils.actorSelection(shardActorRef.path());
199 Future<Object> future = actorUtils.executeOperationAsync(actor, "hello");
201 Object result = Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
202 assertEquals("Result", "hello", result);
206 public void testIsPathLocal() {
207 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
208 ActorUtils actorUtils = null;
210 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
211 assertFalse(actorUtils.isPathLocal(null));
212 assertFalse(actorUtils.isPathLocal(""));
214 clusterWrapper.setSelfAddress(null);
215 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
216 assertFalse(actorUtils.isPathLocal(""));
218 // even if the path is in local format, match the primary path (first 3 elements) and return true
219 clusterWrapper.setSelfAddress(new Address("akka", "test"));
220 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
221 assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
223 clusterWrapper.setSelfAddress(new Address("akka", "test"));
224 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
225 assertTrue(actorUtils.isPathLocal("akka://test/user/$a"));
227 clusterWrapper.setSelfAddress(new Address("akka", "test"));
228 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
229 assertTrue(actorUtils.isPathLocal("akka://test/user/token2/token3/$a"));
231 // self address of remote format,but Tx path local format.
232 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
233 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
234 assertTrue(actorUtils.isPathLocal("akka://system/user/shardmanager/shard/transaction"));
236 // self address of local format,but Tx path remote format.
237 clusterWrapper.setSelfAddress(new Address("akka", "system"));
238 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
239 assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
241 //local path but not same
242 clusterWrapper.setSelfAddress(new Address("akka", "test"));
243 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
244 assertTrue(actorUtils.isPathLocal("akka://test1/user/$a"));
247 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
248 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
249 assertTrue(actorUtils.isPathLocal("akka://system@127.0.0.1:2550/"));
251 // forward-slash missing in address
252 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
253 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
254 assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2550"));
257 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
258 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
259 assertFalse(actorUtils.isPathLocal("akka://system@127.1.0.1:2550/"));
262 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
263 actorUtils = new ActorUtils(getSystem(), null, clusterWrapper, mock(Configuration.class));
264 assertFalse(actorUtils.isPathLocal("akka://system@127.0.0.1:2551/"));
268 public void testClientDispatcherIsGlobalDispatcher() {
269 ActorUtils actorUtils = new ActorUtils(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
270 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
272 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
276 public void testClientDispatcherIsNotGlobalDispatcher() {
277 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
278 ConfigFactory.load("application-with-custom-dispatchers.conf"));
280 ActorUtils actorUtils = new ActorUtils(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
281 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
283 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorUtils.getClientDispatcher());
285 actorSystem.terminate();
289 public void testSetDatastoreContext() {
290 final TestKit testKit = new TestKit(getSystem());
291 ActorUtils actorUtils = new ActorUtils(getSystem(), testKit.getRef(),
292 mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
293 .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
294 new PrimaryShardInfoFutureCache());
296 assertEquals("getOperationDuration", 5, actorUtils.getOperationDuration().toSeconds());
297 assertEquals("getTransactionCommitOperationTimeout", 7,
298 actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
300 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
301 .shardTransactionCommitTimeoutInSeconds(8).build();
303 DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
304 Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
306 actorUtils.setDatastoreContext(mockContextFactory);
308 testKit.expectMsgClass(Duration.ofSeconds(5), DatastoreContextFactory.class);
310 Assert.assertSame("getDatastoreContext", newContext, actorUtils.getDatastoreContext());
312 assertEquals("getOperationDuration", 6, actorUtils.getOperationDuration().toSeconds());
313 assertEquals("getTransactionCommitOperationTimeout", 8,
314 actorUtils.getTransactionCommitOperationTimeout().duration().toSeconds());
318 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
320 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
322 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
323 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
324 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
326 final String expPrimaryPath = "akka://test-system/find-primary-shard";
327 final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
328 ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
329 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
331 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
332 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
336 Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
337 PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
339 assertNotNull(actual);
340 assertFalse("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
341 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
342 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
343 assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
345 Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
347 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
349 assertEquals(cachedInfo, actual);
351 actorUtils.getPrimaryShardInfoCache().remove("foobar");
353 cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
359 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
361 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
363 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
364 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
365 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
367 final DataTree mockDataTree = Mockito.mock(DataTree.class);
368 final String expPrimaryPath = "akka://test-system/find-primary-shard";
369 ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
370 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
372 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
373 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
377 Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
378 PrimaryShardInfo actual = Await.result(foobar, FiniteDuration.apply(5000, TimeUnit.MILLISECONDS));
380 assertNotNull(actual);
381 assertTrue("LocalShardDataTree present", actual.getLocalShardDataTree().isPresent());
382 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
383 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
384 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
385 assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
387 Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
389 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
391 assertEquals(cachedInfo, actual);
393 actorUtils.getPrimaryShardInfoCache().remove("foobar");
395 cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
401 public void testFindPrimaryShardAsyncPrimaryNotFound() {
402 testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
406 public void testFindPrimaryShardAsyncActorNotInitialized() {
407 testFindPrimaryExceptions(new NotInitializedException("not initialized"));
410 @SuppressWarnings("checkstyle:IllegalCatch")
411 private static void testFindPrimaryExceptions(final Object expectedException) {
412 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
414 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
415 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
416 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
418 ActorUtils actorUtils = new ActorUtils(getSystem(), shardManager, mock(ClusterWrapper.class),
419 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
421 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
422 return Futures.successful(expectedException);
426 Future<PrimaryShardInfo> foobar = actorUtils.findPrimaryShardAsync("foobar");
429 Await.result(foobar, FiniteDuration.apply(100, TimeUnit.MILLISECONDS));
430 fail("Expected" + expectedException.getClass().toString());
431 } catch (Exception e) {
432 if (!expectedException.getClass().isInstance(e)) {
433 fail("Expected Exception of type " + expectedException.getClass().toString());
437 Future<PrimaryShardInfo> cached = actorUtils.getPrimaryShardInfoCache().getIfPresent("foobar");
443 public void testBroadcast() {
444 ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
445 ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
447 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
448 MockShardManager.props());
449 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
450 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
451 shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
452 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
453 shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
454 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
456 Configuration mockConfig = mock(Configuration.class);
457 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
460 ActorUtils actorUtils = new ActorUtils(getSystem(), shardManagerActorRef,
461 mock(ClusterWrapper.class), mockConfig,
462 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
463 new PrimaryShardInfoFutureCache());
465 actorUtils.broadcast(v -> new TestMessage(), TestMessage.class);
467 MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
468 MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);