2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.google.common.util.concurrent.Uninterruptibles;
35 import com.typesafe.config.ConfigFactory;
36 import java.util.Arrays;
38 import java.util.concurrent.TimeUnit;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.mockito.Mockito;
42 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
57 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60 import scala.concurrent.Await;
61 import scala.concurrent.Future;
62 import scala.concurrent.duration.Duration;
63 import scala.concurrent.duration.FiniteDuration;
65 public class ActorContextTest extends AbstractActorTest{
67 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
69 private static class TestMessage {
72 private static class MockShardManager extends UntypedActor {
74 private final boolean found;
75 private final ActorRef actorRef;
76 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
78 private MockShardManager(boolean found, ActorRef actorRef){
81 this.actorRef = actorRef;
84 @Override public void onReceive(Object message) throws Exception {
85 if(message instanceof FindPrimary) {
86 FindPrimary fp = (FindPrimary)message;
87 Object resp = findPrimaryResponses.get(fp.getShardName());
89 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
91 getSender().tell(resp, getSelf());
98 getSender().tell(new LocalShardFound(actorRef), getSelf());
100 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
104 void addFindPrimaryResp(String shardName, Object resp) {
105 findPrimaryResponses.put(shardName, resp);
108 private static Props props(final boolean found, final ActorRef actorRef){
109 return Props.create(new MockShardManagerCreator(found, actorRef) );
112 private static Props props(){
113 return Props.create(new MockShardManagerCreator() );
116 @SuppressWarnings("serial")
117 private static class MockShardManagerCreator implements Creator<MockShardManager> {
119 final ActorRef actorRef;
121 MockShardManagerCreator() {
123 this.actorRef = null;
126 MockShardManagerCreator(boolean found, ActorRef actorRef) {
128 this.actorRef = actorRef;
132 public MockShardManager create() throws Exception {
133 return new MockShardManager(found, actorRef);
139 public void testFindLocalShardWithShardFound(){
140 new JavaTestKit(getSystem()) {{
142 new Within(duration("1 seconds")) {
144 protected void run() {
146 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
148 ActorRef shardManagerActorRef = getSystem()
149 .actorOf(MockShardManager.props(true, shardActorRef));
151 ActorContext actorContext =
152 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
153 mock(Configuration.class));
155 Optional<ActorRef> out = actorContext.findLocalShard("default");
157 assertEquals(shardActorRef, out.get());
168 public void testFindLocalShardWithShardNotFound(){
169 new JavaTestKit(getSystem()) {{
170 ActorRef shardManagerActorRef = getSystem()
171 .actorOf(MockShardManager.props(false, null));
173 ActorContext actorContext =
174 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
175 mock(Configuration.class));
177 Optional<ActorRef> out = actorContext.findLocalShard("default");
178 assertTrue(!out.isPresent());
184 public void testExecuteRemoteOperation() {
185 new JavaTestKit(getSystem()) {{
186 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
188 ActorRef shardManagerActorRef = getSystem()
189 .actorOf(MockShardManager.props(true, shardActorRef));
191 ActorContext actorContext =
192 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
193 mock(Configuration.class));
195 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
197 Object out = actorContext.executeOperation(actor, "hello");
199 assertEquals("hello", out);
204 public void testExecuteRemoteOperationAsync() {
205 new JavaTestKit(getSystem()) {{
206 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
208 ActorRef shardManagerActorRef = getSystem()
209 .actorOf(MockShardManager.props(true, shardActorRef));
211 ActorContext actorContext =
212 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
213 mock(Configuration.class));
215 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
217 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
220 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
221 assertEquals("Result", "hello", result);
222 } catch(Exception e) {
223 throw new AssertionError(e);
229 public void testIsPathLocal() {
230 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
231 ActorContext actorContext = null;
233 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
234 assertEquals(false, actorContext.isPathLocal(null));
235 assertEquals(false, actorContext.isPathLocal(""));
237 clusterWrapper.setSelfAddress(null);
238 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239 assertEquals(false, actorContext.isPathLocal(""));
241 // even if the path is in local format, match the primary path (first 3 elements) and return true
242 clusterWrapper.setSelfAddress(new Address("akka", "test"));
243 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
244 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
246 clusterWrapper.setSelfAddress(new Address("akka", "test"));
247 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
250 clusterWrapper.setSelfAddress(new Address("akka", "test"));
251 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
252 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
254 // self address of remote format,but Tx path local format.
255 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
256 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
257 assertEquals(true, actorContext.isPathLocal(
258 "akka://system/user/shardmanager/shard/transaction"));
260 // self address of local format,but Tx path remote format.
261 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
262 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
263 assertEquals(false, actorContext.isPathLocal(
264 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
266 //local path but not same
267 clusterWrapper.setSelfAddress(new Address("akka", "test"));
268 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
269 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
272 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
273 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
274 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
276 // forward-slash missing in address
277 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
278 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
279 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
282 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
283 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
284 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
287 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
288 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
289 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
293 public void testResolvePathForRemoteActor() {
294 ActorContext actorContext =
295 new ActorContext(getSystem(), mock(ActorRef.class), mock(
296 ClusterWrapper.class),
297 mock(Configuration.class));
299 String actual = actorContext.resolvePath(
300 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
301 "akka://system/user/shardmanager/shard/transaction");
303 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
305 assertEquals(expected, actual);
309 public void testResolvePathForLocalActor() {
310 ActorContext actorContext =
311 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
312 mock(Configuration.class));
314 String actual = actorContext.resolvePath(
315 "akka://system/user/shardmanager/shard",
316 "akka://system/user/shardmanager/shard/transaction");
318 String expected = "akka://system/user/shardmanager/shard/transaction";
320 assertEquals(expected, actual);
324 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
325 ActorContext actorContext =
326 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
327 mock(Configuration.class));
329 String actual = actorContext.resolvePath(
330 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
331 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
333 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
335 assertEquals(expected, actual);
340 public void testClientDispatcherIsGlobalDispatcher(){
341 ActorContext actorContext =
342 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
343 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
345 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
350 public void testClientDispatcherIsNotGlobalDispatcher(){
351 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
353 ActorContext actorContext =
354 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
355 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
357 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
359 actorSystem.shutdown();
364 public void testSetDatastoreContext() {
365 new JavaTestKit(getSystem()) {{
366 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
367 mock(Configuration.class), DatastoreContext.newBuilder().
368 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
370 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
371 assertEquals("getTransactionCommitOperationTimeout", 7,
372 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
374 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
375 shardTransactionCommitTimeoutInSeconds(8).build();
377 actorContext.setDatastoreContext(newContext);
379 expectMsgClass(duration("5 seconds"), DatastoreContext.class);
381 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
383 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
384 assertEquals("getTransactionCommitOperationTimeout", 8,
385 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
390 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
392 TestActorRef<MessageCollectorActor> shardManager =
393 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
395 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
396 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
398 final String expPrimaryPath = "akka://test-system/find-primary-shard";
399 final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
400 ActorContext actorContext =
401 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
402 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
404 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
405 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
409 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
410 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
412 assertNotNull(actual);
413 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
414 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
415 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
416 assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
418 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
420 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
422 assertEquals(cachedInfo, actual);
424 actorContext.getPrimaryShardInfoCache().remove("foobar");
426 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
432 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
434 TestActorRef<MessageCollectorActor> shardManager =
435 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
437 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
438 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
440 final DataTree mockDataTree = Mockito.mock(DataTree.class);
441 final String expPrimaryPath = "akka://test-system/find-primary-shard";
442 ActorContext actorContext =
443 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
444 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
446 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
447 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
451 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
452 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
454 assertNotNull(actual);
455 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
456 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
457 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
458 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
459 assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
461 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
463 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
465 assertEquals(cachedInfo, actual);
467 actorContext.getPrimaryShardInfoCache().remove("foobar");
469 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
475 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
476 testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
480 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
481 testFindPrimaryExceptions(new NotInitializedException("not initialized"));
484 private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
485 TestActorRef<MessageCollectorActor> shardManager =
486 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
488 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
489 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
491 ActorContext actorContext =
492 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
493 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
495 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
496 return Futures.successful(expectedException);
500 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
503 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
504 fail("Expected" + expectedException.getClass().toString());
505 } catch(Exception e){
506 if(!expectedException.getClass().isInstance(e)) {
507 fail("Expected Exception of type " + expectedException.getClass().toString());
511 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
517 public void testBroadcast() {
518 new JavaTestKit(getSystem()) {{
519 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
520 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
522 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
523 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
524 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
525 DataStoreVersions.CURRENT_VERSION));
526 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
527 DataStoreVersions.CURRENT_VERSION));
528 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
530 Configuration mockConfig = mock(Configuration.class);
531 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
532 when(mockConfig).getAllShardNames();
534 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
535 mock(ClusterWrapper.class), mockConfig,
536 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
538 actorContext.broadcast(new TestMessage());
540 expectFirstMatching(shardActorRef1, TestMessage.class);
541 expectFirstMatching(shardActorRef2, TestMessage.class);
545 private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
546 int count = 5000 / 50;
547 for(int i = 0; i < count; i++) {
549 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
550 if(message != null) {
553 } catch (Exception e) {}
555 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
558 Assert.fail("Did not receive message of type " + clazz);