1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.mock;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.UntypedActor;
17 import akka.dispatch.Futures;
18 import akka.japi.Creator;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Arrays;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang.time.StopWatch;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
41 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import scala.concurrent.Await;
49 import scala.concurrent.Future;
50 import scala.concurrent.duration.Duration;
51 import scala.concurrent.duration.FiniteDuration;
53 public class ActorContextTest extends AbstractActorTest{
55 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
57 private static class TestMessage {
60 private static class MockShardManager extends UntypedActor {
62 private final boolean found;
63 private final ActorRef actorRef;
64 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
66 private MockShardManager(boolean found, ActorRef actorRef){
69 this.actorRef = actorRef;
72 @Override public void onReceive(Object message) throws Exception {
73 if(message instanceof FindPrimary) {
74 FindPrimary fp = (FindPrimary)message;
75 Object resp = findPrimaryResponses.get(fp.getShardName());
77 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
79 getSender().tell(resp, getSelf());
86 getSender().tell(new LocalShardFound(actorRef), getSelf());
88 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
92 void addFindPrimaryResp(String shardName, Object resp) {
93 findPrimaryResponses.put(shardName, resp);
96 private static Props props(final boolean found, final ActorRef actorRef){
97 return Props.create(new MockShardManagerCreator(found, actorRef) );
100 private static Props props(){
101 return Props.create(new MockShardManagerCreator() );
104 @SuppressWarnings("serial")
105 private static class MockShardManagerCreator implements Creator<MockShardManager> {
107 final ActorRef actorRef;
109 MockShardManagerCreator() {
111 this.actorRef = null;
114 MockShardManagerCreator(boolean found, ActorRef actorRef) {
116 this.actorRef = actorRef;
120 public MockShardManager create() throws Exception {
121 return new MockShardManager(found, actorRef);
127 public void testFindLocalShardWithShardFound(){
128 new JavaTestKit(getSystem()) {{
130 new Within(duration("1 seconds")) {
132 protected void run() {
134 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
136 ActorRef shardManagerActorRef = getSystem()
137 .actorOf(MockShardManager.props(true, shardActorRef));
139 ActorContext actorContext =
140 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
141 mock(Configuration.class));
143 Optional<ActorRef> out = actorContext.findLocalShard("default");
145 assertEquals(shardActorRef, out.get());
156 public void testFindLocalShardWithShardNotFound(){
157 new JavaTestKit(getSystem()) {{
158 ActorRef shardManagerActorRef = getSystem()
159 .actorOf(MockShardManager.props(false, null));
161 ActorContext actorContext =
162 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
163 mock(Configuration.class));
165 Optional<ActorRef> out = actorContext.findLocalShard("default");
166 assertTrue(!out.isPresent());
172 public void testExecuteRemoteOperation() {
173 new JavaTestKit(getSystem()) {{
174 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
176 ActorRef shardManagerActorRef = getSystem()
177 .actorOf(MockShardManager.props(true, shardActorRef));
179 ActorContext actorContext =
180 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
181 mock(Configuration.class));
183 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
185 Object out = actorContext.executeOperation(actor, "hello");
187 assertEquals("hello", out);
192 public void testExecuteRemoteOperationAsync() {
193 new JavaTestKit(getSystem()) {{
194 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
196 ActorRef shardManagerActorRef = getSystem()
197 .actorOf(MockShardManager.props(true, shardActorRef));
199 ActorContext actorContext =
200 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
201 mock(Configuration.class));
203 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
205 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
208 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
209 assertEquals("Result", "hello", result);
210 } catch(Exception e) {
211 throw new AssertionError(e);
217 public void testIsPathLocal() {
218 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
219 ActorContext actorContext = null;
221 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
222 assertEquals(false, actorContext.isPathLocal(null));
223 assertEquals(false, actorContext.isPathLocal(""));
225 clusterWrapper.setSelfAddress(null);
226 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
227 assertEquals(false, actorContext.isPathLocal(""));
229 // even if the path is in local format, match the primary path (first 3 elements) and return true
230 clusterWrapper.setSelfAddress(new Address("akka", "test"));
231 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
232 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
234 clusterWrapper.setSelfAddress(new Address("akka", "test"));
235 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
236 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
238 clusterWrapper.setSelfAddress(new Address("akka", "test"));
239 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
240 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
242 // self address of remote format,but Tx path local format.
243 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
244 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
245 assertEquals(true, actorContext.isPathLocal(
246 "akka://system/user/shardmanager/shard/transaction"));
248 // self address of local format,but Tx path remote format.
249 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
250 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
251 assertEquals(false, actorContext.isPathLocal(
252 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
254 //local path but not same
255 clusterWrapper.setSelfAddress(new Address("akka", "test"));
256 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
257 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
260 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
261 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
262 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
264 // forward-slash missing in address
265 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
266 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
267 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
270 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
271 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
272 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
275 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
276 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
277 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
281 public void testResolvePathForRemoteActor() {
282 ActorContext actorContext =
283 new ActorContext(getSystem(), mock(ActorRef.class), mock(
284 ClusterWrapper.class),
285 mock(Configuration.class));
287 String actual = actorContext.resolvePath(
288 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
289 "akka://system/user/shardmanager/shard/transaction");
291 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
293 assertEquals(expected, actual);
297 public void testResolvePathForLocalActor() {
298 ActorContext actorContext =
299 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
300 mock(Configuration.class));
302 String actual = actorContext.resolvePath(
303 "akka://system/user/shardmanager/shard",
304 "akka://system/user/shardmanager/shard/transaction");
306 String expected = "akka://system/user/shardmanager/shard/transaction";
308 assertEquals(expected, actual);
312 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
313 ActorContext actorContext =
314 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
315 mock(Configuration.class));
317 String actual = actorContext.resolvePath(
318 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
319 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
321 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
323 assertEquals(expected, actual);
327 public void testRateLimiting(){
328 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
329 transactionCreationInitialRateLimit(155L).build();
331 ActorContext actorContext =
332 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
333 mock(Configuration.class), dataStoreContext);
335 // Check that the initial value is being picked up from DataStoreContext
336 assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
338 actorContext.setTxCreationLimit(1.0);
340 assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
343 StopWatch watch = new StopWatch();
347 actorContext.acquireTxCreationPermit();
348 actorContext.acquireTxCreationPermit();
349 actorContext.acquireTxCreationPermit();
353 assertTrue("did not take as much time as expected", watch.getTime() > 1000);
357 public void testClientDispatcherIsGlobalDispatcher(){
358 ActorContext actorContext =
359 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
360 mock(Configuration.class), DatastoreContext.newBuilder().build());
362 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
367 public void testClientDispatcherIsNotGlobalDispatcher(){
368 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
370 ActorContext actorContext =
371 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
372 mock(Configuration.class), DatastoreContext.newBuilder().build());
374 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
376 actorSystem.shutdown();
381 public void testSetDatastoreContext() {
382 new JavaTestKit(getSystem()) {{
383 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
384 mock(Configuration.class), DatastoreContext.newBuilder().
385 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
387 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
388 assertEquals("getTransactionCommitOperationTimeout", 7,
389 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
391 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
392 shardTransactionCommitTimeoutInSeconds(8).build();
394 actorContext.setDatastoreContext(newContext);
396 expectMsgClass(duration("5 seconds"), DatastoreContext.class);
398 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
400 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
401 assertEquals("getTransactionCommitOperationTimeout", 8,
402 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
407 public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
409 TestActorRef<MessageCollectorActor> shardManager =
410 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
412 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
413 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
415 final String expPrimaryPath = "akka://test-system/find-primary-shard";
416 ActorContext actorContext =
417 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
418 mock(Configuration.class), dataStoreContext) {
420 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
421 return Futures.successful((Object) new PrimaryFound(expPrimaryPath));
426 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
427 PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
429 assertNotNull(actual);
430 assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
431 assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
432 expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
434 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
436 PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
438 assertEquals(cachedInfo, actual);
440 // Wait for 200 Milliseconds. The cached entry should have been removed.
442 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
444 cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
451 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
453 TestActorRef<MessageCollectorActor> shardManager =
454 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
456 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
457 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
459 ActorContext actorContext =
460 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
461 mock(Configuration.class), dataStoreContext) {
463 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
464 return Futures.successful((Object) new PrimaryNotFoundException("not found"));
469 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
472 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
473 fail("Expected PrimaryNotFoundException");
474 } catch(PrimaryNotFoundException e){
478 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
484 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
486 TestActorRef<MessageCollectorActor> shardManager =
487 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
489 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
490 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
492 ActorContext actorContext =
493 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
494 mock(Configuration.class), dataStoreContext) {
496 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
497 return Futures.successful((Object) new NotInitializedException("not iniislized"));
502 Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
505 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
506 fail("Expected NotInitializedException");
507 } catch(NotInitializedException e){
511 Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
517 public void testBroadcast() {
518 new JavaTestKit(getSystem()) {{
519 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
520 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
522 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
523 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
524 shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
525 shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
526 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
528 Configuration mockConfig = mock(Configuration.class);
529 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
530 when(mockConfig).getAllShardNames();
532 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
533 mock(ClusterWrapper.class), mockConfig,
534 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
536 actorContext.broadcast(new TestMessage());
538 expectFirstMatching(shardActorRef1, TestMessage.class);
539 expectFirstMatching(shardActorRef2, TestMessage.class);
543 private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
544 int count = 5000 / 50;
545 for(int i = 0; i < count; i++) {
547 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
548 if(message != null) {
551 } catch (Exception e) {}
553 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
556 Assert.fail("Did not receive message of type " + clazz);