1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.junit.Assert.fail;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Futures;
19 import akka.japi.Creator;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Arrays;
30 import java.util.concurrent.TimeUnit;
31 import org.apache.commons.lang.time.StopWatch;
32 import org.junit.Assert;
33 import org.junit.Test;
34 import org.mockito.Mockito;
35 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
36 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
37 import org.opendaylight.controller.cluster.datastore.Configuration;
38 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
41 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
42 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
43 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
47 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
48 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
49 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52 import scala.concurrent.Await;
53 import scala.concurrent.Future;
54 import scala.concurrent.duration.Duration;
55 import scala.concurrent.duration.FiniteDuration;
57 public class ActorContextTest extends AbstractActorTest{
59 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
61 private static class TestMessage {
64 private static class MockShardManager extends UntypedActor {
66 private final boolean found;
67 private final ActorRef actorRef;
68 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
70 private MockShardManager(boolean found, ActorRef actorRef){
73 this.actorRef = actorRef;
76 @Override public void onReceive(Object message) throws Exception {
77 if(message instanceof FindPrimary) {
78 FindPrimary fp = (FindPrimary)message;
79 Object resp = findPrimaryResponses.get(fp.getShardName());
81 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
83 getSender().tell(resp, getSelf());
90 getSender().tell(new LocalShardFound(actorRef), getSelf());
92 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
96 void addFindPrimaryResp(String shardName, Object resp) {
97 findPrimaryResponses.put(shardName, resp);
100 private static Props props(final boolean found, final ActorRef actorRef){
101 return Props.create(new MockShardManagerCreator(found, actorRef) );
104 private static Props props(){
105 return Props.create(new MockShardManagerCreator() );
108 @SuppressWarnings("serial")
109 private static class MockShardManagerCreator implements Creator<MockShardManager> {
111 final ActorRef actorRef;
113 MockShardManagerCreator() {
115 this.actorRef = null;
118 MockShardManagerCreator(boolean found, ActorRef actorRef) {
120 this.actorRef = actorRef;
124 public MockShardManager create() throws Exception {
125 return new MockShardManager(found, actorRef);
131 public void testFindLocalShardWithShardFound(){
132 new JavaTestKit(getSystem()) {{
134 new Within(duration("1 seconds")) {
136 protected void run() {
138 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
140 ActorRef shardManagerActorRef = getSystem()
141 .actorOf(MockShardManager.props(true, shardActorRef));
143 ActorContext actorContext =
144 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
145 mock(Configuration.class));
147 Optional<ActorRef> out = actorContext.findLocalShard("default");
149 assertEquals(shardActorRef, out.get());
160 public void testFindLocalShardWithShardNotFound(){
161 new JavaTestKit(getSystem()) {{
162 ActorRef shardManagerActorRef = getSystem()
163 .actorOf(MockShardManager.props(false, null));
165 ActorContext actorContext =
166 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
167 mock(Configuration.class));
169 Optional<ActorRef> out = actorContext.findLocalShard("default");
170 assertTrue(!out.isPresent());
176 public void testExecuteRemoteOperation() {
177 new JavaTestKit(getSystem()) {{
178 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
180 ActorRef shardManagerActorRef = getSystem()
181 .actorOf(MockShardManager.props(true, shardActorRef));
183 ActorContext actorContext =
184 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
185 mock(Configuration.class));
187 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
189 Object out = actorContext.executeOperation(actor, "hello");
191 assertEquals("hello", out);
196 public void testExecuteRemoteOperationAsync() {
197 new JavaTestKit(getSystem()) {{
198 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
200 ActorRef shardManagerActorRef = getSystem()
201 .actorOf(MockShardManager.props(true, shardActorRef));
203 ActorContext actorContext =
204 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
205 mock(Configuration.class));
207 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
209 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
212 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
213 assertEquals("Result", "hello", result);
214 } catch(Exception e) {
215 throw new AssertionError(e);
221 public void testIsPathLocal() {
222 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
223 ActorContext actorContext = null;
225 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
226 assertEquals(false, actorContext.isPathLocal(null));
227 assertEquals(false, actorContext.isPathLocal(""));
229 clusterWrapper.setSelfAddress(null);
230 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
231 assertEquals(false, actorContext.isPathLocal(""));
233 // even if the path is in local format, match the primary path (first 3 elements) and return true
234 clusterWrapper.setSelfAddress(new Address("akka", "test"));
235 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
238 clusterWrapper.setSelfAddress(new Address("akka", "test"));
239 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
242 clusterWrapper.setSelfAddress(new Address("akka", "test"));
243 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
244 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
246 // self address of remote format,but Tx path local format.
247 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
248 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
249 assertEquals(true, actorContext.isPathLocal(
250 "akka://system/user/shardmanager/shard/transaction"));
252 // self address of local format,but Tx path remote format.
253 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
254 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
255 assertEquals(false, actorContext.isPathLocal(
256 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
258 //local path but not same
259 clusterWrapper.setSelfAddress(new Address("akka", "test"));
260 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
261 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
264 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
265 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
266 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
268 // forward-slash missing in address
269 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
270 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
271 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
274 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
275 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
276 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
279 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
280 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
281 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
285 public void testResolvePathForRemoteActor() {
286 ActorContext actorContext =
287 new ActorContext(getSystem(), mock(ActorRef.class), mock(
288 ClusterWrapper.class),
289 mock(Configuration.class));
291 String actual = actorContext.resolvePath(
292 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
293 "akka://system/user/shardmanager/shard/transaction");
295 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
297 assertEquals(expected, actual);
301 public void testResolvePathForLocalActor() {
302 ActorContext actorContext =
303 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
304 mock(Configuration.class));
306 String actual = actorContext.resolvePath(
307 "akka://system/user/shardmanager/shard",
308 "akka://system/user/shardmanager/shard/transaction");
310 String expected = "akka://system/user/shardmanager/shard/transaction";
312 assertEquals(expected, actual);
316 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
317 ActorContext actorContext =
318 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
319 mock(Configuration.class));
321 String actual = actorContext.resolvePath(
322 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
323 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
325 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
327 assertEquals(expected, actual);
331 public void testRateLimiting(){
332 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
333 transactionCreationInitialRateLimit(155L).build();
335 ActorContext actorContext =
336 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
337 mock(Configuration.class), dataStoreContext);
339 // Check that the initial value is being picked up from DataStoreContext
340 assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
342 actorContext.setTxCreationLimit(1.0);
344 assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
347 StopWatch watch = new StopWatch();
351 actorContext.acquireTxCreationPermit();
352 actorContext.acquireTxCreationPermit();
353 actorContext.acquireTxCreationPermit();
357 assertTrue("did not take as much time as expected", watch.getTime() > 1000);
361 public void testClientDispatcherIsGlobalDispatcher(){
362 ActorContext actorContext =
363 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
364 mock(Configuration.class), DatastoreContext.newBuilder().build());
366 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
371 public void testClientDispatcherIsNotGlobalDispatcher(){
372 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
374 ActorContext actorContext =
375 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
376 mock(Configuration.class), DatastoreContext.newBuilder().build());
378 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
380 actorSystem.shutdown();
385 public void testSetDatastoreContext() {
386 new JavaTestKit(getSystem()) {{
387 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
388 mock(Configuration.class), DatastoreContext.newBuilder().
389 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
391 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
392 assertEquals("getTransactionCommitOperationTimeout", 7,
393 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
395 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
396 shardTransactionCommitTimeoutInSeconds(8).build();
398 actorContext.setDatastoreContext(newContext);
400 expectMsgClass(duration("5 seconds"), DatastoreContext.class);
402 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
404 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
405 assertEquals("getTransactionCommitOperationTimeout", 8,
406 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
411 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
413 TestActorRef<MessageCollectorActor> shardManager =
414 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
416 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
417 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
419 final String expPrimaryPath = "akka://test-system/find-primary-shard";
420 ActorContext actorContext =
421 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
422 mock(Configuration.class), dataStoreContext) {
424 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
425 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
429 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
430 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
432 assertNotNull(actual);
433 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
434 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
435 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
437 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
439 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
441 assertEquals(cachedInfo, actual);
443 // Wait for 200 Milliseconds. The cached entry should have been removed.
445 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
447 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
453 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
455 TestActorRef<MessageCollectorActor> shardManager =
456 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
458 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
459 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
461 final DataTree mockDataTree = Mockito.mock(DataTree.class);
462 final String expPrimaryPath = "akka://test-system/find-primary-shard";
463 ActorContext actorContext =
464 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
465 mock(Configuration.class), dataStoreContext) {
467 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
468 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
472 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
473 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
475 assertNotNull(actual);
476 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
477 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
478 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
479 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
481 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
483 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
485 assertEquals(cachedInfo, actual);
487 // Wait for 200 Milliseconds. The cached entry should have been removed.
489 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
491 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
497 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
499 TestActorRef<MessageCollectorActor> shardManager =
500 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
502 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
503 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
505 ActorContext actorContext =
506 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
507 mock(Configuration.class), dataStoreContext) {
509 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
510 return Futures.successful((Object) new PrimaryNotFoundException("not found"));
515 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
518 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
519 fail("Expected PrimaryNotFoundException");
520 } catch(PrimaryNotFoundException e){
524 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
530 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
532 TestActorRef<MessageCollectorActor> shardManager =
533 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
535 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
536 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
538 ActorContext actorContext =
539 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
540 mock(Configuration.class), dataStoreContext) {
542 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
543 return Futures.successful((Object) new NotInitializedException("not iniislized"));
548 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
551 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
552 fail("Expected NotInitializedException");
553 } catch(NotInitializedException e){
557 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
563 public void testBroadcast() {
564 new JavaTestKit(getSystem()) {{
565 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
566 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
568 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
569 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
570 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
571 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
572 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
574 Configuration mockConfig = mock(Configuration.class);
575 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
576 when(mockConfig).getAllShardNames();
578 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
579 mock(ClusterWrapper.class), mockConfig,
580 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
582 actorContext.broadcast(new TestMessage());
584 expectFirstMatching(shardActorRef1, TestMessage.class);
585 expectFirstMatching(shardActorRef2, TestMessage.class);
589 private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
590 int count = 5000 / 50;
591 for(int i = 0; i < count; i++) {
593 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
594 if(message != null) {
597 } catch (Exception e) {}
599 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
602 Assert.fail("Did not receive message of type " + clazz);