2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.typesafe.config.ConfigFactory;
35 import java.util.Arrays;
37 import java.util.concurrent.TimeUnit;
38 import org.junit.Assert;
39 import org.junit.Test;
40 import org.mockito.Mockito;
41 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
42 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
43 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
46 import org.opendaylight.controller.cluster.datastore.config.Configuration;
47 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
50 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
51 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
52 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
55 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
56 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
57 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
58 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
59 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63 import scala.concurrent.Await;
64 import scala.concurrent.Future;
65 import scala.concurrent.duration.Duration;
66 import scala.concurrent.duration.FiniteDuration;
68 public class ActorContextTest extends AbstractActorTest{
70 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
72 private static class TestMessage {
75 private static class MockShardManager extends UntypedActor {
77 private final boolean found;
78 private final ActorRef actorRef;
79 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
81 private MockShardManager(boolean found, ActorRef actorRef){
84 this.actorRef = actorRef;
87 @Override public void onReceive(Object message) throws Exception {
88 if(message instanceof FindPrimary) {
89 FindPrimary fp = (FindPrimary)message;
90 Object resp = findPrimaryResponses.get(fp.getShardName());
92 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
94 getSender().tell(resp, getSelf());
101 getSender().tell(new LocalShardFound(actorRef), getSelf());
103 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
107 void addFindPrimaryResp(String shardName, Object resp) {
108 findPrimaryResponses.put(shardName, resp);
111 private static Props props(final boolean found, final ActorRef actorRef){
112 return Props.create(new MockShardManagerCreator(found, actorRef) );
115 private static Props props(){
116 return Props.create(new MockShardManagerCreator() );
119 @SuppressWarnings("serial")
120 private static class MockShardManagerCreator implements Creator<MockShardManager> {
122 final ActorRef actorRef;
124 MockShardManagerCreator() {
126 this.actorRef = null;
129 MockShardManagerCreator(boolean found, ActorRef actorRef) {
131 this.actorRef = actorRef;
135 public MockShardManager create() throws Exception {
136 return new MockShardManager(found, actorRef);
142 public void testFindLocalShardWithShardFound(){
143 new JavaTestKit(getSystem()) {{
145 new Within(duration("1 seconds")) {
147 protected void run() {
149 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
151 ActorRef shardManagerActorRef = getSystem()
152 .actorOf(MockShardManager.props(true, shardActorRef));
154 ActorContext actorContext =
155 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
156 mock(Configuration.class));
158 Optional<ActorRef> out = actorContext.findLocalShard("default");
160 assertEquals(shardActorRef, out.get());
171 public void testFindLocalShardWithShardNotFound(){
172 new JavaTestKit(getSystem()) {{
173 ActorRef shardManagerActorRef = getSystem()
174 .actorOf(MockShardManager.props(false, null));
176 ActorContext actorContext =
177 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
178 mock(Configuration.class));
180 Optional<ActorRef> out = actorContext.findLocalShard("default");
181 assertTrue(!out.isPresent());
187 public void testExecuteRemoteOperation() {
188 new JavaTestKit(getSystem()) {{
189 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
191 ActorRef shardManagerActorRef = getSystem()
192 .actorOf(MockShardManager.props(true, shardActorRef));
194 ActorContext actorContext =
195 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
196 mock(Configuration.class));
198 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
200 Object out = actorContext.executeOperation(actor, "hello");
202 assertEquals("hello", out);
207 public void testExecuteRemoteOperationAsync() {
208 new JavaTestKit(getSystem()) {{
209 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
211 ActorRef shardManagerActorRef = getSystem()
212 .actorOf(MockShardManager.props(true, shardActorRef));
214 ActorContext actorContext =
215 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
216 mock(Configuration.class));
218 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
220 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
223 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
224 assertEquals("Result", "hello", result);
225 } catch(Exception e) {
226 throw new AssertionError(e);
232 public void testIsPathLocal() {
233 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
234 ActorContext actorContext = null;
236 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
237 assertEquals(false, actorContext.isPathLocal(null));
238 assertEquals(false, actorContext.isPathLocal(""));
240 clusterWrapper.setSelfAddress(null);
241 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
242 assertEquals(false, actorContext.isPathLocal(""));
244 // even if the path is in local format, match the primary path (first 3 elements) and return true
245 clusterWrapper.setSelfAddress(new Address("akka", "test"));
246 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
247 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
249 clusterWrapper.setSelfAddress(new Address("akka", "test"));
250 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
251 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
253 clusterWrapper.setSelfAddress(new Address("akka", "test"));
254 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
255 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
257 // self address of remote format,but Tx path local format.
258 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
259 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
260 assertEquals(true, actorContext.isPathLocal(
261 "akka://system/user/shardmanager/shard/transaction"));
263 // self address of local format,but Tx path remote format.
264 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
265 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
266 assertEquals(false, actorContext.isPathLocal(
267 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
269 //local path but not same
270 clusterWrapper.setSelfAddress(new Address("akka", "test"));
271 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
272 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
275 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
276 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
277 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
279 // forward-slash missing in address
280 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
281 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
282 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
285 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
286 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
287 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
290 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
291 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
292 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
296 public void testClientDispatcherIsGlobalDispatcher(){
297 ActorContext actorContext =
298 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
299 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
301 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
306 public void testClientDispatcherIsNotGlobalDispatcher(){
307 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
309 ActorContext actorContext =
310 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
311 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
313 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
315 actorSystem.shutdown();
320 public void testSetDatastoreContext() {
321 new JavaTestKit(getSystem()) {{
322 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
323 mock(Configuration.class), DatastoreContext.newBuilder().
324 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
326 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
327 assertEquals("getTransactionCommitOperationTimeout", 7,
328 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
330 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
331 shardTransactionCommitTimeoutInSeconds(8).build();
333 DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
334 Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
336 actorContext.setDatastoreContext(mockContextFactory);
338 expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
340 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
342 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
343 assertEquals("getTransactionCommitOperationTimeout", 8,
344 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
349 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
351 TestActorRef<MessageCollectorActor> shardManager =
352 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
354 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
355 logicalStoreType(LogicalDatastoreType.CONFIGURATION).
356 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
358 final String expPrimaryPath = "akka://test-system/find-primary-shard";
359 final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
360 ActorContext actorContext =
361 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
362 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
364 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
365 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
369 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
370 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
372 assertNotNull(actual);
373 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
374 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
375 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
376 assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
378 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
380 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
382 assertEquals(cachedInfo, actual);
384 actorContext.getPrimaryShardInfoCache().remove("foobar");
386 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
392 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
394 TestActorRef<MessageCollectorActor> shardManager =
395 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
397 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
398 logicalStoreType(LogicalDatastoreType.CONFIGURATION).
399 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
401 final DataTree mockDataTree = Mockito.mock(DataTree.class);
402 final String expPrimaryPath = "akka://test-system/find-primary-shard";
403 ActorContext actorContext =
404 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
405 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
407 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
408 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
412 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
413 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
415 assertNotNull(actual);
416 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
417 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
418 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
419 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
420 assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
422 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
424 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
426 assertEquals(cachedInfo, actual);
428 actorContext.getPrimaryShardInfoCache().remove("foobar");
430 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
436 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
437 testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
441 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
442 testFindPrimaryExceptions(new NotInitializedException("not initialized"));
445 private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
446 TestActorRef<MessageCollectorActor> shardManager =
447 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
449 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
450 logicalStoreType(LogicalDatastoreType.CONFIGURATION).
451 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
453 ActorContext actorContext =
454 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
455 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
457 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
458 return Futures.successful(expectedException);
462 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
465 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
466 fail("Expected" + expectedException.getClass().toString());
467 } catch(Exception e){
468 if(!expectedException.getClass().isInstance(e)) {
469 fail("Expected Exception of type " + expectedException.getClass().toString());
473 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
479 public void testBroadcast() {
480 new JavaTestKit(getSystem()) {{
481 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
482 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
484 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
485 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
486 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
487 DataStoreVersions.CURRENT_VERSION));
488 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
489 DataStoreVersions.CURRENT_VERSION));
490 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
492 Configuration mockConfig = mock(Configuration.class);
493 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
494 when(mockConfig).getAllShardNames();
496 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
497 mock(ClusterWrapper.class), mockConfig,
498 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
500 actorContext.broadcast(new TestMessage());
502 MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
503 MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);