2 * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNotEquals;
13 import static org.junit.Assert.assertNotNull;
14 import static org.junit.Assert.assertNull;
15 import static org.junit.Assert.assertSame;
16 import static org.junit.Assert.assertTrue;
17 import static org.junit.Assert.fail;
18 import static org.mockito.Mockito.doReturn;
19 import static org.mockito.Mockito.mock;
20 import akka.actor.ActorRef;
21 import akka.actor.ActorSelection;
22 import akka.actor.ActorSystem;
23 import akka.actor.Address;
24 import akka.actor.Props;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Futures;
27 import akka.japi.Creator;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import akka.util.Timeout;
31 import com.google.common.base.Optional;
32 import com.google.common.collect.Maps;
33 import com.google.common.collect.Sets;
34 import com.typesafe.config.ConfigFactory;
35 import java.util.Arrays;
37 import java.util.concurrent.TimeUnit;
38 import java.util.function.Function;
39 import org.junit.Assert;
40 import org.junit.Test;
41 import org.mockito.Mockito;
42 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
47 import org.opendaylight.controller.cluster.datastore.config.Configuration;
48 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
49 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
50 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
51 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
52 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
53 import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
54 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
55 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
56 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
57 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
58 import org.opendaylight.controller.cluster.raft.utils.EchoActor;
59 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
60 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64 import scala.concurrent.Await;
65 import scala.concurrent.Future;
66 import scala.concurrent.duration.Duration;
67 import scala.concurrent.duration.FiniteDuration;
69 public class ActorContextTest extends AbstractActorTest{
71 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
73 private static class TestMessage {
76 private static class MockShardManager extends UntypedActor {
78 private final boolean found;
79 private final ActorRef actorRef;
80 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
82 private MockShardManager(boolean found, ActorRef actorRef){
85 this.actorRef = actorRef;
88 @Override public void onReceive(Object message) throws Exception {
89 if(message instanceof FindPrimary) {
90 FindPrimary fp = (FindPrimary)message;
91 Object resp = findPrimaryResponses.get(fp.getShardName());
93 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
95 getSender().tell(resp, getSelf());
102 getSender().tell(new LocalShardFound(actorRef), getSelf());
104 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
108 void addFindPrimaryResp(String shardName, Object resp) {
109 findPrimaryResponses.put(shardName, resp);
112 private static Props props(final boolean found, final ActorRef actorRef){
113 return Props.create(new MockShardManagerCreator(found, actorRef) );
116 private static Props props(){
117 return Props.create(new MockShardManagerCreator() );
120 @SuppressWarnings("serial")
121 private static class MockShardManagerCreator implements Creator<MockShardManager> {
123 final ActorRef actorRef;
125 MockShardManagerCreator() {
127 this.actorRef = null;
130 MockShardManagerCreator(boolean found, ActorRef actorRef) {
132 this.actorRef = actorRef;
136 public MockShardManager create() throws Exception {
137 return new MockShardManager(found, actorRef);
143 public void testFindLocalShardWithShardFound(){
144 new JavaTestKit(getSystem()) {{
146 new Within(duration("1 seconds")) {
148 protected void run() {
150 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
152 ActorRef shardManagerActorRef = getSystem()
153 .actorOf(MockShardManager.props(true, shardActorRef));
155 ActorContext actorContext =
156 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
157 mock(Configuration.class));
159 Optional<ActorRef> out = actorContext.findLocalShard("default");
161 assertEquals(shardActorRef, out.get());
172 public void testFindLocalShardWithShardNotFound(){
173 new JavaTestKit(getSystem()) {{
174 ActorRef shardManagerActorRef = getSystem()
175 .actorOf(MockShardManager.props(false, null));
177 ActorContext actorContext =
178 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
179 mock(Configuration.class));
181 Optional<ActorRef> out = actorContext.findLocalShard("default");
182 assertTrue(!out.isPresent());
188 public void testExecuteRemoteOperation() {
189 new JavaTestKit(getSystem()) {{
190 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
192 ActorRef shardManagerActorRef = getSystem()
193 .actorOf(MockShardManager.props(true, shardActorRef));
195 ActorContext actorContext =
196 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
197 mock(Configuration.class));
199 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
201 Object out = actorContext.executeOperation(actor, "hello");
203 assertEquals("hello", out);
208 public void testExecuteRemoteOperationAsync() {
209 new JavaTestKit(getSystem()) {{
210 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
212 ActorRef shardManagerActorRef = getSystem()
213 .actorOf(MockShardManager.props(true, shardActorRef));
215 ActorContext actorContext =
216 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
217 mock(Configuration.class));
219 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
221 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
224 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
225 assertEquals("Result", "hello", result);
226 } catch(Exception e) {
227 throw new AssertionError(e);
233 public void testIsPathLocal() {
234 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
235 ActorContext actorContext = null;
237 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
238 assertEquals(false, actorContext.isPathLocal(null));
239 assertEquals(false, actorContext.isPathLocal(""));
241 clusterWrapper.setSelfAddress(null);
242 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
243 assertEquals(false, actorContext.isPathLocal(""));
245 // even if the path is in local format, match the primary path (first 3 elements) and return true
246 clusterWrapper.setSelfAddress(new Address("akka", "test"));
247 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
248 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
250 clusterWrapper.setSelfAddress(new Address("akka", "test"));
251 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
252 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
254 clusterWrapper.setSelfAddress(new Address("akka", "test"));
255 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
256 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
258 // self address of remote format,but Tx path local format.
259 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
260 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
261 assertEquals(true, actorContext.isPathLocal(
262 "akka://system/user/shardmanager/shard/transaction"));
264 // self address of local format,but Tx path remote format.
265 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
266 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
267 assertEquals(false, actorContext.isPathLocal(
268 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
270 //local path but not same
271 clusterWrapper.setSelfAddress(new Address("akka", "test"));
272 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
273 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
276 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
277 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
278 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
280 // forward-slash missing in address
281 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
282 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
283 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
286 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
287 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
288 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
291 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
292 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
293 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
297 public void testClientDispatcherIsGlobalDispatcher(){
298 ActorContext actorContext =
299 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
300 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
302 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
307 public void testClientDispatcherIsNotGlobalDispatcher(){
308 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
310 ActorContext actorContext =
311 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
312 mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache());
314 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
316 actorSystem.shutdown();
321 public void testSetDatastoreContext() {
322 new JavaTestKit(getSystem()) {{
323 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
324 mock(Configuration.class), DatastoreContext.newBuilder().
325 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache());
327 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
328 assertEquals("getTransactionCommitOperationTimeout", 7,
329 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
331 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
332 shardTransactionCommitTimeoutInSeconds(8).build();
334 DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class);
335 Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext();
337 actorContext.setDatastoreContext(mockContextFactory);
339 expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class);
341 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
343 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
344 assertEquals("getTransactionCommitOperationTimeout", 8,
345 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
350 public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
352 TestActorRef<MessageCollectorActor> shardManager =
353 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
355 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
356 logicalStoreType(LogicalDatastoreType.CONFIGURATION).
357 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
359 final String expPrimaryPath = "akka://test-system/find-primary-shard";
360 final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION;
361 ActorContext actorContext =
362 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
363 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
365 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
366 return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion));
370 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
371 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
373 assertNotNull(actual);
374 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
375 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
376 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
377 assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion());
379 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
381 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
383 assertEquals(cachedInfo, actual);
385 actorContext.getPrimaryShardInfoCache().remove("foobar");
387 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
393 public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
395 TestActorRef<MessageCollectorActor> shardManager =
396 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
398 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
399 logicalStoreType(LogicalDatastoreType.CONFIGURATION).
400 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
402 final DataTree mockDataTree = Mockito.mock(DataTree.class);
403 final String expPrimaryPath = "akka://test-system/find-primary-shard";
404 ActorContext actorContext =
405 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
406 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
408 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
409 return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
413 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
414 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
416 assertNotNull(actual);
417 assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
418 assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
419 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
420 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
421 assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion());
423 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
425 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
427 assertEquals(cachedInfo, actual);
429 actorContext.getPrimaryShardInfoCache().remove("foobar");
431 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
437 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
438 testFindPrimaryExceptions(new PrimaryNotFoundException("not found"));
442 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
443 testFindPrimaryExceptions(new NotInitializedException("not initialized"));
446 private void testFindPrimaryExceptions(final Object expectedException) throws Exception {
447 TestActorRef<MessageCollectorActor> shardManager =
448 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
450 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().
451 logicalStoreType(LogicalDatastoreType.CONFIGURATION).
452 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
454 ActorContext actorContext =
455 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
456 mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) {
458 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
459 return Futures.successful(expectedException);
463 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
466 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
467 fail("Expected" + expectedException.getClass().toString());
468 } catch(Exception e){
469 if(!expectedException.getClass().isInstance(e)) {
470 fail("Expected Exception of type " + expectedException.getClass().toString());
474 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
480 public void testBroadcast() {
481 new JavaTestKit(getSystem()) {{
482 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
483 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
485 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
486 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
487 shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(),
488 DataStoreVersions.CURRENT_VERSION));
489 shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(),
490 DataStoreVersions.CURRENT_VERSION));
491 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
493 Configuration mockConfig = mock(Configuration.class);
494 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
495 when(mockConfig).getAllShardNames();
497 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
498 mock(ClusterWrapper.class), mockConfig,
499 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache());
501 actorContext.broadcast(new Function<Short, Object>() {
503 public Object apply(Short v) {
504 return new TestMessage();
508 MessageCollectorActor.expectFirstMatching(shardActorRef1, TestMessage.class);
509 MessageCollectorActor.expectFirstMatching(shardActorRef2, TestMessage.class);