2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.typesafe.config.ConfigFactory;
35 import java.util.Arrays;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.Assert;
39 import org.junit.Test;
40 import org.mockito.Mockito;
41 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
42 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
43 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
57 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
58 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
59 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import scala.concurrent.Await;
63 import scala.concurrent.Future;
64 import scala.concurrent.duration.Duration;
65 import scala.concurrent.duration.FiniteDuration;
67 public class ActorContextTest extends AbstractActorTest{
69 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
71 private static class TestMessage {
74 private static class MockShardManager extends UntypedActor {
76 private final boolean found;
77 private final ActorRef actorRef;
78 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
80 private MockShardManager(boolean found, ActorRef actorRef){
83 this.actorRef = actorRef;
86 @Override public void onReceive(Object message) throws Exception {
87 if(message instanceof FindPrimary) {
88 FindPrimary fp = (FindPrimary)message;
89 Object resp = findPrimaryResponses.get(fp.getShardName());
91 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
93 getSender().tell(resp, getSelf());
100 getSender().tell(new LocalShardFound(actorRef), getSelf());
102 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
106 void addFindPrimaryResp(String shardName, Object resp) {
107 findPrimaryResponses.put(shardName, resp);
110 private static Props props(final boolean found, final ActorRef actorRef){
111 return Props.create(new MockShardManagerCreator(found, actorRef) );
114 private static Props props(){
115 return Props.create(new MockShardManagerCreator() );
118 @SuppressWarnings("serial")
119 private static class MockShardManagerCreator implements Creator<MockShardManager> {
121 final ActorRef actorRef;
123 MockShardManagerCreator() {
125 this.actorRef = null;
128 MockShardManagerCreator(boolean found, ActorRef actorRef) {
130 this.actorRef = actorRef;
134 public MockShardManager create() throws Exception {
135 return new MockShardManager(found, actorRef);
141 public void testFindLocalShardWithShardFound(){
142 new JavaTestKit(getSystem()) {{
144 new Within(duration("1 seconds")) {
146 protected void run() {
148 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
150 ActorRef shardManagerActorRef = getSystem()
151 .actorOf(MockShardManager.props(true, shardActorRef));
153 ActorContext actorContext =
154 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
155 mock(Configuration.class));
157 Optional<ActorRef> out = actorContext.findLocalShard("default");
159 assertEquals(shardActorRef, out.get());
170 public void testFindLocalShardWithShardNotFound(){
171 new JavaTestKit(getSystem()) {{
172 ActorRef shardManagerActorRef = getSystem()
173 .actorOf(MockShardManager.props(false, null));
175 ActorContext actorContext =
176 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
177 mock(Configuration.class));
179 Optional<ActorRef> out = actorContext.findLocalShard("default");
180 assertTrue(!out.isPresent());
186 public void testExecuteRemoteOperation() {
187 new JavaTestKit(getSystem()) {{
188 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
190 ActorRef shardManagerActorRef = getSystem()
191 .actorOf(MockShardManager.props(true, shardActorRef));
193 ActorContext actorContext =
194 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
195 mock(Configuration.class));
197 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
199 Object out = actorContext.executeOperation(actor, "hello");
201 assertEquals("hello", out);
206 public void testExecuteRemoteOperationAsync() {
207 new JavaTestKit(getSystem()) {{
208 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
210 ActorRef shardManagerActorRef = getSystem()
211 .actorOf(MockShardManager.props(true, shardActorRef));
213 ActorContext actorContext =
214 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
215 mock(Configuration.class));
217 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
219 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
222 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
223 assertEquals("Result", "hello", result);
224 } catch(Exception e) {
225 throw new AssertionError(e);
231 public void testIsPathLocal() {
232 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
233 ActorContext actorContext = null;
235 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236 assertEquals(false, actorContext.isPathLocal(null));
237 assertEquals(false, actorContext.isPathLocal(""));
239 clusterWrapper.setSelfAddress(null);
240 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
241 assertEquals(false, actorContext.isPathLocal(""));
243 // even if the path is in local format, match the primary path (first 3 elements) and return true
244 clusterWrapper.setSelfAddress(new Address("akka", "test"));
245 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
246 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
248 clusterWrapper.setSelfAddress(new Address("akka", "test"));
249 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
250 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
252 clusterWrapper.setSelfAddress(new Address("akka", "test"));
253 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
254 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
256 // self address of remote format,but Tx path local format.
257 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
258 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
259 assertEquals(true, actorContext.isPathLocal(
260 "akka://system/user/shardmanager/shard/transaction"));
262 // self address of local format,but Tx path remote format.
263 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
264 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
265 assertEquals(false, actorContext.isPathLocal(
266 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
268 //local path but not same
269 clusterWrapper.setSelfAddress(new Address("akka", "test"));
270 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
271 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
274 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
275 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
276 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
278 // forward-slash missing in address
279 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
280 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
281 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
284 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
285 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
286 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
289 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
290 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
291 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
295 public void testResolvePathForRemoteActor() {
296 ActorContext actorContext =
297 new ActorContext(getSystem(), mock(ActorRef.class), mock(
298 ClusterWrapper.class),
299 mock(Configuration.class));
301 String actual = actorContext.resolvePath(
302 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
303 "akka://system/user/shardmanager/shard/transaction");
305 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
307 assertEquals(expected, actual);
311 public void testResolvePathForLocalActor() {
312 ActorContext actorContext =
313 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
314 mock(Configuration.class));
316 String actual = actorContext.resolvePath(
317 "akka://system/user/shardmanager/shard",
318 "akka://system/user/shardmanager/shard/transaction");
320 String expected = "akka://system/user/shardmanager/shard/transaction";
322 assertEquals(expected, actual);
326 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
327 ActorContext actorContext =
328 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
329 mock(Configuration.class));
331 String actual = actorContext.resolvePath(
332 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
333 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
335 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
337 assertEquals(expected, actual);
342 public void testClientDispatcherIsGlobalDispatcher(){
343 ActorContext actorContext =
344 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
345 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
347 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
352 public void testClientDispatcherIsNotGlobalDispatcher(){
353 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
355 ActorContext actorContext =
356 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
357 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
359 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
361 actorSystem.shutdown();
366 public void testSetDatastoreContext() {
367 new JavaTestKit(getSystem()) {{
368 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
369 mock(Configuration.class), DatastoreContext.newBuilder().
370 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
372 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
373 assertEquals("getTransactionCommitOperationTimeout", 7,
374 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
376 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
377 shardTransactionCommitTimeoutInSeconds(8).build();
379 DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
380 Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
382 actorContext.setDatastoreContext(mockContextFactory);
384 expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
386 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
388 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
389 assertEquals("getTransactionCommitOperationTimeout", 8,
390 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
395 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
397 TestActorRef<MessageCollectorActor> shardManager =
398 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
400 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
401 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
403 final String expPrimaryPath = "akka://test-system/find-primary-shard";
404 final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
405 ActorContext actorContext =
406 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
407 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
409 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
410 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
414 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
415 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
417 assertNotNull(actual);
418 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
419 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
420 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
421 assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
423 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
425 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
427 assertEquals(cachedInfo, actual);
429 actorContext.getPrimaryShardInfoCache().remove("foobar");
431 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
437 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
439 TestActorRef<MessageCollectorActor> shardManager =
440 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
442 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
443 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
445 final DataTree mockDataTree = Mockito.mock(DataTree.class);
446 final String expPrimaryPath = "akka://test-system/find-primary-shard";
447 ActorContext actorContext =
448 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
449 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
451 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
452 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
456 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
457 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
459 assertNotNull(actual);
460 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
461 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
462 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
463 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
464 assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
466 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
468 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
470 assertEquals(cachedInfo, actual);
472 actorContext.getPrimaryShardInfoCache().remove("foobar");
474 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
480 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
481 testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
485 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
486 testFindPrimaryExceptions(new NotInitializedException("not initialized"));
489 private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
490 TestActorRef<MessageCollectorActor> shardManager =
491 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
493 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
494 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
496 ActorContext actorContext =
497 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
498 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
500 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
501 return Futures.successful(expectedException);
505 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
508 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
509 fail("Expected" + expectedException.getClass().toString());
510 } catch(Exception e){
511 if(!expectedException.getClass().isInstance(e)) {
512 fail("Expected Exception of type " + expectedException.getClass().toString());
516 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
522 public void testBroadcast() {
523 new JavaTestKit(getSystem()) {{
524 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
525 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
527 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
528 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
529 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
530 DataStoreVersions.CURRENT_VERSION));
531 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
532 DataStoreVersions.CURRENT_VERSION));
533 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
535 Configuration mockConfig = mock(Configuration.class);
536 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
537 when(mockConfig).getAllShardNames();
539 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
540 mock(ClusterWrapper.class), mockConfig,
541 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
543 actorContext.broadcast(new TestMessage());
545 MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
546 MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);