2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertFalse;
13 import static org.junit.Assert.assertNotEquals;
14 import static org.junit.Assert.assertNotNull;
15 import static org.junit.Assert.assertNull;
16 import static org.junit.Assert.assertSame;
17 import static org.junit.Assert.assertTrue;
18 import static org.junit.Assert.fail;
19 import static org.mockito.Mockito.doReturn;
20 import static org.mockito.Mockito.mock;
22 import akka.actor.ActorRef;
23 import akka.actor.ActorSelection;
24 import akka.actor.ActorSystem;
25 import akka.actor.Address;
26 import akka.actor.Props;
27 import akka.actor.UntypedAbstractActor;
28 import akka.dispatch.Futures;
29 import akka.japi.Creator;
30 import akka.testkit.TestActorRef;
31 import akka.testkit.javadsl.TestKit;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.collect.Maps;
35 import com.google.common.collect.Sets;
36 import com.typesafe.config.ConfigFactory;
37 import java.util.Arrays;
39 import java.util.concurrent.TimeUnit;
40 import org.junit.Assert;
41 import org.junit.Test;
42 import org.mockito.Mockito;
43 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
44 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
45 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
47 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
48 import org.opendaylight.controller.cluster.datastore.config.Configuration;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
51 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
52 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
53 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
56 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
57 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
58 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
59 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
60 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
61 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.concurrent.Await;
66 import scala.concurrent.Future;
67 import scala.concurrent.duration.Duration;
68 import scala.concurrent.duration.FiniteDuration;
70 public class ActorContextTest extends AbstractActorTest {
72 static final Logger LOG = LoggerFactory.getLogger(ActorContextTest.class);
74 private static class TestMessage {
77 private static final class MockShardManager extends UntypedAbstractActor {
79 private final boolean found;
80 private final ActorRef actorRef;
81 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
83 private MockShardManager(final boolean found, final ActorRef actorRef) {
86 this.actorRef = actorRef;
89 @Override public void onReceive(final Object message) {
90 if (message instanceof FindPrimary) {
91 FindPrimary fp = (FindPrimary)message;
92 Object resp = findPrimaryResponses.get(fp.getShardName());
94 LOG.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
96 getSender().tell(resp, getSelf());
103 getSender().tell(new LocalShardFound(actorRef), getSelf());
105 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
109 void addFindPrimaryResp(final String shardName, final Object resp) {
110 findPrimaryResponses.put(shardName, resp);
113 private static Props props(final boolean found, final ActorRef actorRef) {
114 return Props.create(new MockShardManagerCreator(found, actorRef));
117 private static Props props() {
118 return Props.create(new MockShardManagerCreator());
121 @SuppressWarnings("serial")
122 private static class MockShardManagerCreator implements Creator<MockShardManager> {
124 final ActorRef actorRef;
126 MockShardManagerCreator() {
128 this.actorRef = null;
131 MockShardManagerCreator(final boolean found, final ActorRef actorRef) {
133 this.actorRef = actorRef;
137 public MockShardManager create() {
138 return new MockShardManager(found, actorRef);
144 public void testFindLocalShardWithShardFound() {
145 final TestKit testKit = new TestKit(getSystem());
146 testKit.within(testKit.duration("1 seconds"), () -> {
147 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
149 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
151 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
152 mock(ClusterWrapper.class), mock(Configuration.class));
154 Optional<ActorRef> out = actorContext.findLocalShard("default");
156 assertEquals(shardActorRef, out.get());
158 testKit.expectNoMessage();
164 public void testFindLocalShardWithShardNotFound() {
165 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(false, null));
167 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class),
168 mock(Configuration.class));
170 Optional<ActorRef> out = actorContext.findLocalShard("default");
171 assertFalse(out.isPresent());
175 public void testExecuteRemoteOperation() {
176 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
178 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
180 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
181 mock(ClusterWrapper.class), mock(Configuration.class));
183 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
185 Object out = actorContext.executeOperation(actor, "hello");
187 assertEquals("hello", out);
191 public void testExecuteRemoteOperationAsync() throws Exception {
192 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
194 ActorRef shardManagerActorRef = getSystem().actorOf(MockShardManager.props(true, shardActorRef));
196 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
197 mock(ClusterWrapper.class), mock(Configuration.class));
199 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
201 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
203 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
204 assertEquals("Result", "hello", result);
208 public void testIsPathLocal() {
209 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
210 ActorContext actorContext = null;
212 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
213 assertEquals(false, actorContext.isPathLocal(null));
214 assertEquals(false, actorContext.isPathLocal(""));
216 clusterWrapper.setSelfAddress(null);
217 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
218 assertEquals(false, actorContext.isPathLocal(""));
220 // even if the path is in local format, match the primary path (first 3 elements) and return true
221 clusterWrapper.setSelfAddress(new Address("akka", "test"));
222 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
223 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
225 clusterWrapper.setSelfAddress(new Address("akka", "test"));
226 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
227 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
229 clusterWrapper.setSelfAddress(new Address("akka", "test"));
230 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
231 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
233 // self address of remote format,but Tx path local format.
234 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
235 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236 assertEquals(true, actorContext.isPathLocal(
237 "akka://system/user/shardmanager/shard/transaction"));
239 // self address of local format,but Tx path remote format.
240 clusterWrapper.setSelfAddress(new Address("akka", "system"));
241 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
242 assertEquals(false, actorContext.isPathLocal(
243 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
245 //local path but not same
246 clusterWrapper.setSelfAddress(new Address("akka", "test"));
247 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
251 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
252 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
253 assertEquals(true, actorContext.isPathLocal("akka://system@127.0.0.1:2550/"));
255 // forward-slash missing in address
256 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
257 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
258 assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2550"));
261 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
262 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
263 assertEquals(false, actorContext.isPathLocal("akka://system@127.1.0.1:2550/"));
266 clusterWrapper.setSelfAddress(new Address("akka", "system", "127.0.0.1", 2550));
267 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
268 assertEquals(false, actorContext.isPathLocal("akka://system@127.0.0.1:2551/"));
272 public void testClientDispatcherIsGlobalDispatcher() {
273 ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
274 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
276 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
280 public void testClientDispatcherIsNotGlobalDispatcher() {
281 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers",
282 ConfigFactory.load("application-with-custom-dispatchers.conf"));
284 ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
285 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
287 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
289 actorSystem.terminate();
293 public void testSetDatastoreContext() {
294 final TestKit testKit = new TestKit(getSystem());
295 ActorContext actorContext = new ActorContext(getSystem(), testKit.getRef(),
296 mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder()
297 .operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(),
298 new PrimaryShardInfoFutureCache());
300 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
301 assertEquals("getTransactionCommitOperationTimeout", 7,
302 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
304 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6)
305 .shardTransactionCommitTimeoutInSeconds(8).build();
307 DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
308 Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
310 actorContext.setDatastoreContext(mockContextFactory);
312 testKit.expectMsgClass(testKit.duration("5 seconds"), DatastoreContextFactory.class);
314 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
316 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
317 assertEquals("getTransactionCommitOperationTimeout", 8,
318 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
322 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
324 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
326 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
327 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
328 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
330 final String expPrimaryPath = "akka://test-system/find-primary-shard";
331 final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
332 ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
333 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
335 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
336 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
340 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
341 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
343 assertNotNull(actual);
344 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
345 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
346 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
347 assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
349 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
351 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
353 assertEquals(cachedInfo, actual);
355 actorContext.getPrimaryShardInfoCache().remove("foobar");
357 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
363 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
365 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
367 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
368 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
369 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
371 final DataTree mockDataTree = Mockito.mock(DataTree.class);
372 final String expPrimaryPath = "akka://test-system/find-primary-shard";
373 ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
374 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
376 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
377 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
381 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
382 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
384 assertNotNull(actual);
385 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
386 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
387 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
388 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
389 assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
391 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
393 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
395 assertEquals(cachedInfo, actual);
397 actorContext.getPrimaryShardInfoCache().remove("foobar");
399 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
405 public void testFindPrimaryShardAsyncPrimaryNotFound() {
406 testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
410 public void testFindPrimaryShardAsyncActorNotInitialized() {
411 testFindPrimaryExceptions(new NotInitializedException("not initialized"));
414 @SuppressWarnings("checkstyle:IllegalCatch")
415 private static void testFindPrimaryExceptions(final Object expectedException) {
416 ActorRef shardManager = getSystem().actorOf(MessageCollectorActor.props());
418 DatastoreContext dataStoreContext = DatastoreContext.newBuilder()
419 .logicalStoreType(LogicalDatastoreType.CONFIGURATION)
420 .shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
422 ActorContext actorContext =
423 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
424 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
426 protected Future<Object> doAsk(final ActorRef actorRef, final Object message, final Timeout timeout) {
427 return Futures.successful(expectedException);
431 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
434 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
435 fail("Expected" + expectedException.getClass().toString());
436 } catch (Exception e) {
437 if (!expectedException.getClass().isInstance(e)) {
438 fail("Expected Exception of type " + expectedException.getClass().toString());
442 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
448 public void testBroadcast() {
449 ActorRef shardActorRef1 = getSystem().actorOf(MessageCollectorActor.props());
450 ActorRef shardActorRef2 = getSystem().actorOf(MessageCollectorActor.props());
452 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(),
453 MockShardManager.props());
454 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
455 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(
456 shardActorRef1.path().toString(), DataStoreVersions.CURRENT_VERSION));
457 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(
458 shardActorRef2.path().toString(), DataStoreVersions.CURRENT_VERSION));
459 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
461 Configuration mockConfig = mock(Configuration.class);
462 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).when(mockConfig)
465 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
466 mock(ClusterWrapper.class), mockConfig,
467 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(),
468 new PrimaryShardInfoFutureCache());
470 actorContext.broadcast(v -> new TestMessage(), TestMessage.class);
472 MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
473 MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);