2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSelection;
23 import akka.actor.ActorSystem;
24 import akka.actor.Address;
25 import akka.actor.Props;
26 import akka.actor.UntypedActor;
27 import akka.dispatch.Futures;
28 import akka.japi.Creator;
29 import akka.testkit.TestActorRef;
30 import akka.testkit.javadsl.TestKit;
31 import akka.util.Timeout;
32 import com.google.common.base.Optional;
33 import com.google.common.collect.Maps;
34 import com.google.common.collect.Sets;
35 import com.typesafe.config.ConfigFactory;
36 import java.util.Arrays;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.mockito.Mockito;
42 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
47 import org.opendaylight.controller.cluster.datastore.config.Configuration;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
56 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
57 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
58 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
59 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
60 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.Await;
65 import scala.concurrent.Future;
66 import scala.concurrent.duration.Duration;
67 import scala.concurrent.duration.FiniteDuration;
69 public class ActorContextTest extends AbstractActorTest {
71 static final Logger LOG = LoggerFactory.getLogger(ActorContextTest.class);
73 private static class TestMessage {
76 private static final class MockShardManager extends UntypedActor {
78 private final boolean found;
79 private final ActorRef actorRef;
80 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
82 private MockShardManager(final boolean found, final ActorRef actorRef) {
85 this.actorRef = actorRef;
88 @Override public void onReceive(final Object message) throws Exception {
89 if (message instanceof FindPrimary) {
90 FindPrimary fp = (FindPrimary)message;
91 Object resp = findPrimaryResponses.get(fp.getShardName());
93 LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
95 getSender().tell(resp, getSelf());
102 getSender().tell(new LocalShardFound(actorRef), getSelf());
104 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
108 void addFindPrimaryResp(final String shardName, final Object resp) {
109 findPrimaryResponses.put(shardName, resp);
112 private static Props props(final boolean found, final ActorRef actorRef) {
113 return Props.create(new MockShardManagerCreator(found, actorRef));
116 private static Props props() {
117 return Props.create(new MockShardManagerCreator());
120 @SuppressWarnings("serial")
121 private static class MockShardManagerCreator implements Creator<MockShardManager> {
123 final ActorRef actorRef;
125 MockShardManagerCreator() {
127 this.actorRef = null;
130 MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
132 this.actorRef = actorRef;
136 public MockShardManager create() throws Exception {
137 return new MockShardManager(found, actorRef);
143 public void testFindLocalShardWithShardFound() {
144 new TestKit(getSystem()) {
146 within(duration("1 seconds"), () -> {
147 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
149 ActorRef shardManagerActorRef = getSystem()
150 .actorOf(MockShardManager.props(true, shardActorRef));
152 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
153 mock(ClusterWrapper.class), mock(Configuration.class));
155 Optional<ActorRef> out = actorContext.findLocalShard("default");
157 assertEquals(shardActorRef, out.get());
168 public void testFindLocalShardWithShardNotFound() {
169 new TestKit(getSystem()) {
171 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
173 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
174 mock(ClusterWrapper.class), mock(Configuration.class));
176 Optional<ActorRef> out = actorContext.findLocalShard("default");
177 assertTrue(!out.isPresent());
184 public void testExecuteRemoteOperation() {
185 new TestKit(getSystem()) {
187 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
189 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
191 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
192 mock(ClusterWrapper.class), mock(Configuration.class));
194 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
196 Object out = actorContext.executeOperation(actor, "hello");
198 assertEquals("hello", out);
204 @SuppressWarnings("checkstyle:IllegalCatch")
205 public void testExecuteRemoteOperationAsync() {
206 new TestKit(getSystem()) {
208 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
210 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
212 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
213 mock(ClusterWrapper.class), mock(Configuration.class));
215 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
217 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
220 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
221 assertEquals("Result", "hello", result);
222 } catch (Exception e) {
223 throw new AssertionError(e);
230 public void testIsPathLocal() {
231 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
232 ActorContext actorContext = null;
234 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235 assertEquals(false, actorContext.isPathLocal(null));
236 assertEquals(false, actorContext.isPathLocal(""));
238 clusterWrapper.setSelfAddress(null);
239 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240 assertEquals(false, actorContext.isPathLocal(""));
242 // even if the path is in local format, match the primary path (first 3 elements) and return true
243 clusterWrapper.setSelfAddress(new Address("akka", "test"));
244 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
245 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
247 clusterWrapper.setSelfAddress(new Address("akka", "test"));
248 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
249 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
251 clusterWrapper.setSelfAddress(new Address("akka", "test"));
252 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
253 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
255 // self address of remote format,but Tx path local format.
256 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
257 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
258 assertEquals(true, actorContext.isPathLocal(
259 "akka://system/user/shardmanager/shard/transaction"));
261 // self address of local format,but Tx path remote format.
262 clusterWrapper.setSelfAddress(new Address("akka", "system"));
263 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
264 assertEquals(false, actorContext.isPathLocal(
265 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
267 //local path but not same
268 clusterWrapper.setSelfAddress(new Address("akka", "test"));
269 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
270 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
273 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
274 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
275 assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
277 // forward-slash missing in address
278 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
279 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
280 assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
283 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
284 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
285 assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
288 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
289 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
290 assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
294 public void testClientDispatcherIsGlobalDispatcher() {
295 ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
296 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
298 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
302 public void testClientDispatcherIsNotGlobalDispatcher() {
303 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
304 ConfigFactory.load("application-with-custom-dispatchers.conf"));
306 ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
307 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
309 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
311 actorSystem.terminate();
315 public void testSetDatastoreContext() {
316 new TestKit(getSystem()) {
318 ActorContext actorContext = new ActorContext(getSystem(), getRef(),
319 mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
320 .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
321 new PrimaryShardInfoFutureCache());
323 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
324 assertEquals("getTransactionCommitOperationTimeout", 7,
325 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
327 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
328 .shardTransactionCommitTimeoutInSeconds(8).build();
330 DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
331 Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
333 actorContext.setDatastoreContext(mockContextFactory);
335 expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
337 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
339 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
340 assertEquals("getTransactionCommitOperationTimeout", 8,
341 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
347 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
349 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
351 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
352 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
353 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
355 final String expPrimaryPath = "akka://test-system/find-primary-shard";
356 final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
357 ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
358 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
360 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
361 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
365 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
366 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
368 assertNotNull(actual);
369 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
370 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
371 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
372 assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
374 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
376 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
378 assertEquals(cachedInfo, actual);
380 actorContext.getPrimaryShardInfoCache().remove("foobar");
382 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
388 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
390 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
392 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
393 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
394 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
396 final DataTree mockDataTree = Mockito.mock(DataTree.class);
397 final String expPrimaryPath = "akka://test-system/find-primary-shard";
398 ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
399 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
401 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
402 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
406 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
407 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
409 assertNotNull(actual);
410 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
411 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
412 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
413 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
414 assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
416 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
418 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
420 assertEquals(cachedInfo, actual);
422 actorContext.getPrimaryShardInfoCache().remove("foobar");
424 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
430 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
431 testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
435 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
436 testFindPrimaryExceptions(new NotInitializedException("not initialized"));
439 @SuppressWarnings("checkstyle:IllegalCatch")
440 private static void testFindPrimaryExceptions(final Object expectedException) throws Exception {
441 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
443 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
444 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
445 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
447 ActorContext actorContext =
448 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
449 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
451 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
452 return Futures.successful(expectedException);
456 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
459 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
460 fail("Expected" + expectedException.getClass().toString());
461 } catch (Exception e) {
462 if (!expectedException.getClass().isInstance(e)) {
463 fail("Expected Exception of type " + expectedException.getClass().toString());
467 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
473 public void testBroadcast() {
474 new TestKit(getSystem()) {
476 ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
477 ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
479 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
480 MockShardManager.props());
481 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
482 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
483 shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
484 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
485 shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
486 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
488 Configuration mockConfig = mock(Configuration.class);
489 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
492 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
493 mock(ClusterWrapper.class), mockConfig,
494 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
495 new PrimaryShardInfoFutureCache());
497 actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
499 MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
500 MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);