1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertSame;
8 import static org.junit.Assert.assertTrue;
9 import static org.junit.Assert.fail;
10 import static org.mockito.Mockito.doReturn;
11 import static org.mockito.Mockito.mock;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Futures;
19 import akka.japi.Creator;
20 import akka.testkit.JavaTestKit;
21 import akka.testkit.TestActorRef;
22 import akka.util.Timeout;
23 import com.google.common.base.Optional;
24 import com.google.common.collect.Maps;
25 import com.google.common.collect.Sets;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import com.typesafe.config.ConfigFactory;
28 import java.util.Arrays;
30 import java.util.concurrent.TimeUnit;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.mockito.Mockito;
34 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
35 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
36 import org.opendaylight.controller.cluster.datastore.Configuration;
37 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
40 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
45 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
47 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51 import scala.concurrent.Await;
52 import scala.concurrent.Future;
53 import scala.concurrent.duration.Duration;
54 import scala.concurrent.duration.FiniteDuration;
56 public class ActorContextTest extends AbstractActorTest{
58 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
60 private static class TestMessage {
63 private static class MockShardManager extends UntypedActor {
65 private final boolean found;
66 private final ActorRef actorRef;
67 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
69 private MockShardManager(boolean found, ActorRef actorRef){
72 this.actorRef = actorRef;
75 @Override public void onReceive(Object message) throws Exception {
76 if(message instanceof FindPrimary) {
77 FindPrimary fp = (FindPrimary)message;
78 Object resp = findPrimaryResponses.get(fp.getShardName());
80 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
82 getSender().tell(resp, getSelf());
89 getSender().tell(new LocalShardFound(actorRef), getSelf());
91 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
95 void addFindPrimaryResp(String shardName, Object resp) {
96 findPrimaryResponses.put(shardName, resp);
99 private static Props props(final boolean found, final ActorRef actorRef){
100 return Props.create(new MockShardManagerCreator(found, actorRef) );
103 private static Props props(){
104 return Props.create(new MockShardManagerCreator() );
107 @SuppressWarnings("serial")
108 private static class MockShardManagerCreator implements Creator<MockShardManager> {
110 final ActorRef actorRef;
112 MockShardManagerCreator() {
114 this.actorRef = null;
117 MockShardManagerCreator(boolean found, ActorRef actorRef) {
119 this.actorRef = actorRef;
123 public MockShardManager create() throws Exception {
124 return new MockShardManager(found, actorRef);
130 public void testFindLocalShardWithShardFound(){
131 new JavaTestKit(getSystem()) {{
133 new Within(duration("1 seconds")) {
135 protected void run() {
137 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
139 ActorRef shardManagerActorRef = getSystem()
140 .actorOf(MockShardManager.props(true, shardActorRef));
142 ActorContext actorContext =
143 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
144 mock(Configuration.class));
146 Optional<ActorRef> out = actorContext.findLocalShard("default");
148 assertEquals(shardActorRef, out.get());
159 public void testFindLocalShardWithShardNotFound(){
160 new JavaTestKit(getSystem()) {{
161 ActorRef shardManagerActorRef = getSystem()
162 .actorOf(MockShardManager.props(false, null));
164 ActorContext actorContext =
165 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
166 mock(Configuration.class));
168 Optional<ActorRef> out = actorContext.findLocalShard("default");
169 assertTrue(!out.isPresent());
175 public void testExecuteRemoteOperation() {
176 new JavaTestKit(getSystem()) {{
177 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
179 ActorRef shardManagerActorRef = getSystem()
180 .actorOf(MockShardManager.props(true, shardActorRef));
182 ActorContext actorContext =
183 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
184 mock(Configuration.class));
186 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
188 Object out = actorContext.executeOperation(actor, "hello");
190 assertEquals("hello", out);
195 public void testExecuteRemoteOperationAsync() {
196 new JavaTestKit(getSystem()) {{
197 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
199 ActorRef shardManagerActorRef = getSystem()
200 .actorOf(MockShardManager.props(true, shardActorRef));
202 ActorContext actorContext =
203 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
204 mock(Configuration.class));
206 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
208 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
211 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
212 assertEquals("Result", "hello", result);
213 } catch(Exception e) {
214 throw new AssertionError(e);
220 public void testIsPathLocal() {
221 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
222 ActorContext actorContext = null;
224 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
225 assertEquals(false, actorContext.isPathLocal(null));
226 assertEquals(false, actorContext.isPathLocal(""));
228 clusterWrapper.setSelfAddress(null);
229 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
230 assertEquals(false, actorContext.isPathLocal(""));
232 // even if the path is in local format, match the primary path (first 3 elements) and return true
233 clusterWrapper.setSelfAddress(new Address("akka", "test"));
234 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
237 clusterWrapper.setSelfAddress(new Address("akka", "test"));
238 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
241 clusterWrapper.setSelfAddress(new Address("akka", "test"));
242 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
243 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
245 // self address of remote format,but Tx path local format.
246 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
247 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248 assertEquals(true, actorContext.isPathLocal(
249 "akka://system/user/shardmanager/shard/transaction"));
251 // self address of local format,but Tx path remote format.
252 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
253 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
254 assertEquals(false, actorContext.isPathLocal(
255 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
257 //local path but not same
258 clusterWrapper.setSelfAddress(new Address("akka", "test"));
259 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
260 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
263 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
264 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
265 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
267 // forward-slash missing in address
268 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
269 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
270 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
273 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
274 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
275 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
278 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
279 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
280 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
284 public void testResolvePathForRemoteActor() {
285 ActorContext actorContext =
286 new ActorContext(getSystem(), mock(ActorRef.class), mock(
287 ClusterWrapper.class),
288 mock(Configuration.class));
290 String actual = actorContext.resolvePath(
291 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
292 "akka://system/user/shardmanager/shard/transaction");
294 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
296 assertEquals(expected, actual);
300 public void testResolvePathForLocalActor() {
301 ActorContext actorContext =
302 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
303 mock(Configuration.class));
305 String actual = actorContext.resolvePath(
306 "akka://system/user/shardmanager/shard",
307 "akka://system/user/shardmanager/shard/transaction");
309 String expected = "akka://system/user/shardmanager/shard/transaction";
311 assertEquals(expected, actual);
315 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
316 ActorContext actorContext =
317 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
318 mock(Configuration.class));
320 String actual = actorContext.resolvePath(
321 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
322 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
324 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
326 assertEquals(expected, actual);
331 public void testClientDispatcherIsGlobalDispatcher(){
332 ActorContext actorContext =
333 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
334 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
336 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
341 public void testClientDispatcherIsNotGlobalDispatcher(){
342 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
344 ActorContext actorContext =
345 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
346 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
348 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
350 actorSystem.shutdown();
355 public void testSetDatastoreContext() {
356 new JavaTestKit(getSystem()) {{
357 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
358 mock(Configuration.class), DatastoreContext.newBuilder().
359 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
361 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
362 assertEquals("getTransactionCommitOperationTimeout", 7,
363 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
365 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
366 shardTransactionCommitTimeoutInSeconds(8).build();
368 actorContext.setDatastoreContext(newContext);
370 expectMsgClass(duration("5 seconds"), DatastoreContext.class);
372 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
374 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
375 assertEquals("getTransactionCommitOperationTimeout", 8,
376 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
381 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
383 TestActorRef<MessageCollectorActor> shardManager =
384 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
386 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
387 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
389 final String expPrimaryPath = "akka://test-system/find-primary-shard";
390 ActorContext actorContext =
391 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
392 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
394 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
395 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
399 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
400 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
402 assertNotNull(actual);
403 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
404 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
405 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
407 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
409 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
411 assertEquals(cachedInfo, actual);
413 actorContext.getPrimaryShardInfoCache().remove("foobar");
415 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
421 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
423 TestActorRef<MessageCollectorActor> shardManager =
424 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
426 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
427 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
429 final DataTree mockDataTree = Mockito.mock(DataTree.class);
430 final String expPrimaryPath = "akka://test-system/find-primary-shard";
431 ActorContext actorContext =
432 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
433 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
435 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
436 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
440 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
441 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
443 assertNotNull(actual);
444 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
445 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
446 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
447 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
449 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
451 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
453 assertEquals(cachedInfo, actual);
455 actorContext.getPrimaryShardInfoCache().remove("foobar");
457 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
463 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
465 TestActorRef<MessageCollectorActor> shardManager =
466 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
468 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
469 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
471 ActorContext actorContext =
472 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
473 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
475 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
476 return Futures.successful((Object) new PrimaryNotFoundException("not found"));
481 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
484 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
485 fail("Expected PrimaryNotFoundException");
486 } catch(PrimaryNotFoundException e){
490 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
496 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
498 TestActorRef<MessageCollectorActor> shardManager =
499 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
501 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
502 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
504 ActorContext actorContext =
505 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
506 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
508 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
509 return Futures.successful((Object) new NotInitializedException("not iniislized"));
514 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
517 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
518 fail("Expected NotInitializedException");
519 } catch(NotInitializedException e){
523 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
529 public void testBroadcast() {
530 new JavaTestKit(getSystem()) {{
531 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
532 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
534 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
535 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
536 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
537 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
538 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
540 Configuration mockConfig = mock(Configuration.class);
541 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
542 when(mockConfig).getAllShardNames();
544 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
545 mock(ClusterWrapper.class), mockConfig,
546 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
548 actorContext.broadcast(new TestMessage());
550 expectFirstMatching(shardActorRef1, TestMessage.class);
551 expectFirstMatching(shardActorRef2, TestMessage.class);
555 private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
556 int count = 5000 / 50;
557 for(int i = 0; i < count; i++) {
559 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
560 if(message != null) {
563 } catch (Exception e) {}
565 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
568 Assert.fail("Did not receive message of type " + clazz);