2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.google.common.util.concurrent.Uninterruptibles;
35 import com.typesafe.config.ConfigFactory;
36 import java.util.Arrays;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.mockito.Mockito;
42 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
47 import org.opendaylight.controller.cluster.datastore.config.Configuration;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
56 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
57 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
58 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61 import scala.concurrent.Await;
62 import scala.concurrent.Future;
63 import scala.concurrent.duration.Duration;
64 import scala.concurrent.duration.FiniteDuration;
66 public class ActorContextTest extends AbstractActorTest{
68 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
70 private static class TestMessage {
73 private static class MockShardManager extends UntypedActor {
75 private final boolean found;
76 private final ActorRef actorRef;
77 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
79 private MockShardManager(boolean found, ActorRef actorRef){
82 this.actorRef = actorRef;
85 @Override public void onReceive(Object message) throws Exception {
86 if(message instanceof FindPrimary) {
87 FindPrimary fp = (FindPrimary)message;
88 Object resp = findPrimaryResponses.get(fp.getShardName());
90 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
92 getSender().tell(resp, getSelf());
99 getSender().tell(new LocalShardFound(actorRef), getSelf());
101 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
105 void addFindPrimaryResp(String shardName, Object resp) {
106 findPrimaryResponses.put(shardName, resp);
109 private static Props props(final boolean found, final ActorRef actorRef){
110 return Props.create(new MockShardManagerCreator(found, actorRef) );
113 private static Props props(){
114 return Props.create(new MockShardManagerCreator() );
117 @SuppressWarnings("serial")
118 private static class MockShardManagerCreator implements Creator<MockShardManager> {
120 final ActorRef actorRef;
122 MockShardManagerCreator() {
124 this.actorRef = null;
127 MockShardManagerCreator(boolean found, ActorRef actorRef) {
129 this.actorRef = actorRef;
133 public MockShardManager create() throws Exception {
134 return new MockShardManager(found, actorRef);
140 public void testFindLocalShardWithShardFound(){
141 new JavaTestKit(getSystem()) {{
143 new Within(duration("1 seconds")) {
145 protected void run() {
147 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
149 ActorRef shardManagerActorRef = getSystem()
150 .actorOf(MockShardManager.props(true, shardActorRef));
152 ActorContext actorContext =
153 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
154 mock(Configuration.class));
156 Optional<ActorRef> out = actorContext.findLocalShard("default");
158 assertEquals(shardActorRef, out.get());
169 public void testFindLocalShardWithShardNotFound(){
170 new JavaTestKit(getSystem()) {{
171 ActorRef shardManagerActorRef = getSystem()
172 .actorOf(MockShardManager.props(false, null));
174 ActorContext actorContext =
175 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
176 mock(Configuration.class));
178 Optional<ActorRef> out = actorContext.findLocalShard("default");
179 assertTrue(!out.isPresent());
185 public void testExecuteRemoteOperation() {
186 new JavaTestKit(getSystem()) {{
187 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
189 ActorRef shardManagerActorRef = getSystem()
190 .actorOf(MockShardManager.props(true, shardActorRef));
192 ActorContext actorContext =
193 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
194 mock(Configuration.class));
196 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
198 Object out = actorContext.executeOperation(actor, "hello");
200 assertEquals("hello", out);
205 public void testExecuteRemoteOperationAsync() {
206 new JavaTestKit(getSystem()) {{
207 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
209 ActorRef shardManagerActorRef = getSystem()
210 .actorOf(MockShardManager.props(true, shardActorRef));
212 ActorContext actorContext =
213 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
214 mock(Configuration.class));
216 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
218 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
221 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
222 assertEquals("Result", "hello", result);
223 } catch(Exception e) {
224 throw new AssertionError(e);
230 public void testIsPathLocal() {
231 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
232 ActorContext actorContext = null;
234 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235 assertEquals(false, actorContext.isPathLocal(null));
236 assertEquals(false, actorContext.isPathLocal(""));
238 clusterWrapper.setSelfAddress(null);
239 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240 assertEquals(false, actorContext.isPathLocal(""));
242 // even if the path is in local format, match the primary path (first 3 elements) and return true
243 clusterWrapper.setSelfAddress(new Address("akka", "test"));
244 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
245 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
247 clusterWrapper.setSelfAddress(new Address("akka", "test"));
248 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
249 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
251 clusterWrapper.setSelfAddress(new Address("akka", "test"));
252 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
253 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
255 // self address of remote format,but Tx path local format.
256 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
257 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
258 assertEquals(true, actorContext.isPathLocal(
259 "akka://system/user/shardmanager/shard/transaction"));
261 // self address of local format,but Tx path remote format.
262 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
263 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
264 assertEquals(false, actorContext.isPathLocal(
265 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
267 //local path but not same
268 clusterWrapper.setSelfAddress(new Address("akka", "test"));
269 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
270 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
273 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
274 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
275 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
277 // forward-slash missing in address
278 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
279 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
280 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
283 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
284 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
285 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
288 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
289 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
290 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
294 public void testResolvePathForRemoteActor() {
295 ActorContext actorContext =
296 new ActorContext(getSystem(), mock(ActorRef.class), mock(
297 ClusterWrapper.class),
298 mock(Configuration.class));
300 String actual = actorContext.resolvePath(
301 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
302 "akka://system/user/shardmanager/shard/transaction");
304 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
306 assertEquals(expected, actual);
310 public void testResolvePathForLocalActor() {
311 ActorContext actorContext =
312 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
313 mock(Configuration.class));
315 String actual = actorContext.resolvePath(
316 "akka://system/user/shardmanager/shard",
317 "akka://system/user/shardmanager/shard/transaction");
319 String expected = "akka://system/user/shardmanager/shard/transaction";
321 assertEquals(expected, actual);
325 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
326 ActorContext actorContext =
327 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
328 mock(Configuration.class));
330 String actual = actorContext.resolvePath(
331 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
332 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
334 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
336 assertEquals(expected, actual);
341 public void testClientDispatcherIsGlobalDispatcher(){
342 ActorContext actorContext =
343 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
344 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
346 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
351 public void testClientDispatcherIsNotGlobalDispatcher(){
352 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
354 ActorContext actorContext =
355 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
356 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
358 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
360 actorSystem.shutdown();
365 public void testSetDatastoreContext() {
366 new JavaTestKit(getSystem()) {{
367 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
368 mock(Configuration.class), DatastoreContext.newBuilder().
369 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
371 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
372 assertEquals("getTransactionCommitOperationTimeout", 7,
373 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
375 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
376 shardTransactionCommitTimeoutInSeconds(8).build();
378 DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
379 Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
381 actorContext.setDatastoreContext(mockContextFactory);
383 expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
385 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
387 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
388 assertEquals("getTransactionCommitOperationTimeout", 8,
389 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
394 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
396 TestActorRef<MessageCollectorActor> shardManager =
397 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
399 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
400 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
402 final String expPrimaryPath = "akka://test-system/find-primary-shard";
403 final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
404 ActorContext actorContext =
405 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
406 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
408 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
409 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
413 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
414 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
416 assertNotNull(actual);
417 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
418 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
419 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
420 assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
422 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
424 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
426 assertEquals(cachedInfo, actual);
428 actorContext.getPrimaryShardInfoCache().remove("foobar");
430 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
436 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
438 TestActorRef<MessageCollectorActor> shardManager =
439 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
441 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
442 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
444 final DataTree mockDataTree = Mockito.mock(DataTree.class);
445 final String expPrimaryPath = "akka://test-system/find-primary-shard";
446 ActorContext actorContext =
447 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
448 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
450 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
451 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
455 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
456 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
458 assertNotNull(actual);
459 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
460 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
461 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
462 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
463 assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
465 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
467 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
469 assertEquals(cachedInfo, actual);
471 actorContext.getPrimaryShardInfoCache().remove("foobar");
473 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
479 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
480 testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
484 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
485 testFindPrimaryExceptions(new NotInitializedException("not initialized"));
488 private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
489 TestActorRef<MessageCollectorActor> shardManager =
490 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
492 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
493 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
495 ActorContext actorContext =
496 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
497 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
499 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
500 return Futures.successful(expectedException);
504 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
507 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
508 fail("Expected" + expectedException.getClass().toString());
509 } catch(Exception e){
510 if(!expectedException.getClass().isInstance(e)) {
511 fail("Expected Exception of type " + expectedException.getClass().toString());
515 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
521 public void testBroadcast() {
522 new JavaTestKit(getSystem()) {{
523 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
524 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
526 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
527 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
528 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
529 DataStoreVersions.CURRENT_VERSION));
530 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
531 DataStoreVersions.CURRENT_VERSION));
532 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
534 Configuration mockConfig = mock(Configuration.class);
535 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
536 when(mockConfig).getAllShardNames();
538 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
539 mock(ClusterWrapper.class), mockConfig,
540 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
542 actorContext.broadcast(new TestMessage());
544 expectFirstMatching(shardActorRef1, TestMessage.class);
545 expectFirstMatching(shardActorRef2, TestMessage.class);
549 private static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
550 int count = 5000 / 50;
551 for(int i = 0; i < count; i++) {
553 @SuppressWarnings("unchecked")
554 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
555 if(message != null) {
558 } catch (Exception e) {}
560 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
563 Assert.fail("Did not receive message of type " + clazz);