1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.mock;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.UntypedActor;
17 import akka.dispatch.Futures;
18 import akka.japi.Creator;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Arrays;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang.time.StopWatch;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
41 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
42 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
44 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import scala.concurrent.Await;
48 import scala.concurrent.Future;
49 import scala.concurrent.duration.Duration;
50 import scala.concurrent.duration.FiniteDuration;
52 public class ActorContextTest extends AbstractActorTest{
54 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
56 private static class TestMessage {
59 private static class MockShardManager extends UntypedActor {
61 private final boolean found;
62 private final ActorRef actorRef;
63 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
65 private MockShardManager(boolean found, ActorRef actorRef){
68 this.actorRef = actorRef;
71 @Override public void onReceive(Object message) throws Exception {
72 if(message instanceof FindPrimary) {
73 FindPrimary fp = (FindPrimary)message;
74 Object resp = findPrimaryResponses.get(fp.getShardName());
76 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
78 getSender().tell(resp, getSelf());
85 getSender().tell(new LocalShardFound(actorRef), getSelf());
87 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
91 void addFindPrimaryResp(String shardName, Object resp) {
92 findPrimaryResponses.put(shardName, resp);
95 private static Props props(final boolean found, final ActorRef actorRef){
96 return Props.create(new MockShardManagerCreator(found, actorRef) );
99 private static Props props(){
100 return Props.create(new MockShardManagerCreator() );
103 @SuppressWarnings("serial")
104 private static class MockShardManagerCreator implements Creator<MockShardManager> {
106 final ActorRef actorRef;
108 MockShardManagerCreator() {
110 this.actorRef = null;
113 MockShardManagerCreator(boolean found, ActorRef actorRef) {
115 this.actorRef = actorRef;
119 public MockShardManager create() throws Exception {
120 return new MockShardManager(found, actorRef);
126 public void testFindLocalShardWithShardFound(){
127 new JavaTestKit(getSystem()) {{
129 new Within(duration("1 seconds")) {
131 protected void run() {
133 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
135 ActorRef shardManagerActorRef = getSystem()
136 .actorOf(MockShardManager.props(true, shardActorRef));
138 ActorContext actorContext =
139 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
140 mock(Configuration.class));
142 Optional<ActorRef> out = actorContext.findLocalShard("default");
144 assertEquals(shardActorRef, out.get());
155 public void testFindLocalShardWithShardNotFound(){
156 new JavaTestKit(getSystem()) {{
157 ActorRef shardManagerActorRef = getSystem()
158 .actorOf(MockShardManager.props(false, null));
160 ActorContext actorContext =
161 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
162 mock(Configuration.class));
164 Optional<ActorRef> out = actorContext.findLocalShard("default");
165 assertTrue(!out.isPresent());
171 public void testExecuteRemoteOperation() {
172 new JavaTestKit(getSystem()) {{
173 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
175 ActorRef shardManagerActorRef = getSystem()
176 .actorOf(MockShardManager.props(true, shardActorRef));
178 ActorContext actorContext =
179 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
180 mock(Configuration.class));
182 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
184 Object out = actorContext.executeOperation(actor, "hello");
186 assertEquals("hello", out);
191 public void testExecuteRemoteOperationAsync() {
192 new JavaTestKit(getSystem()) {{
193 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
195 ActorRef shardManagerActorRef = getSystem()
196 .actorOf(MockShardManager.props(true, shardActorRef));
198 ActorContext actorContext =
199 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
200 mock(Configuration.class));
202 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
204 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
207 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
208 assertEquals("Result", "hello", result);
209 } catch(Exception e) {
210 throw new AssertionError(e);
216 public void testIsPathLocal() {
217 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
218 ActorContext actorContext = null;
220 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
221 assertEquals(false, actorContext.isPathLocal(null));
222 assertEquals(false, actorContext.isPathLocal(""));
224 clusterWrapper.setSelfAddress(null);
225 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
226 assertEquals(false, actorContext.isPathLocal(""));
228 // even if the path is in local format, match the primary path (first 3 elements) and return true
229 clusterWrapper.setSelfAddress(new Address("akka", "test"));
230 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
231 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
233 clusterWrapper.setSelfAddress(new Address("akka", "test"));
234 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
235 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
237 clusterWrapper.setSelfAddress(new Address("akka", "test"));
238 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
241 // self address of remote format,but Tx path local format.
242 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
243 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
244 assertEquals(true, actorContext.isPathLocal(
245 "akka://system/user/shardmanager/shard/transaction"));
247 // self address of local format,but Tx path remote format.
248 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
249 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
250 assertEquals(false, actorContext.isPathLocal(
251 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
253 //local path but not same
254 clusterWrapper.setSelfAddress(new Address("akka", "test"));
255 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
256 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
259 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
260 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
261 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
263 // forward-slash missing in address
264 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
265 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
266 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
269 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
270 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
271 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
274 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
275 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
276 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
280 public void testResolvePathForRemoteActor() {
281 ActorContext actorContext =
282 new ActorContext(getSystem(), mock(ActorRef.class), mock(
283 ClusterWrapper.class),
284 mock(Configuration.class));
286 String actual = actorContext.resolvePath(
287 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
288 "akka://system/user/shardmanager/shard/transaction");
290 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
292 assertEquals(expected, actual);
296 public void testResolvePathForLocalActor() {
297 ActorContext actorContext =
298 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
299 mock(Configuration.class));
301 String actual = actorContext.resolvePath(
302 "akka://system/user/shardmanager/shard",
303 "akka://system/user/shardmanager/shard/transaction");
305 String expected = "akka://system/user/shardmanager/shard/transaction";
307 assertEquals(expected, actual);
311 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
312 ActorContext actorContext =
313 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
314 mock(Configuration.class));
316 String actual = actorContext.resolvePath(
317 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
318 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
320 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
322 assertEquals(expected, actual);
326 public void testRateLimiting(){
327 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
328 transactionCreationInitialRateLimit(155L).build();
330 ActorContext actorContext =
331 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
332 mock(Configuration.class), dataStoreContext);
334 // Check that the initial value is being picked up from DataStoreContext
335 assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
337 actorContext.setTxCreationLimit(1.0);
339 assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
342 StopWatch watch = new StopWatch();
346 actorContext.acquireTxCreationPermit();
347 actorContext.acquireTxCreationPermit();
348 actorContext.acquireTxCreationPermit();
352 assertTrue("did not take as much time as expected", watch.getTime() > 1000);
356 public void testClientDispatcherIsGlobalDispatcher(){
357 ActorContext actorContext =
358 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
359 mock(Configuration.class), DatastoreContext.newBuilder().build());
361 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
366 public void testClientDispatcherIsNotGlobalDispatcher(){
367 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
369 ActorContext actorContext =
370 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
371 mock(Configuration.class), DatastoreContext.newBuilder().build());
373 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
375 actorSystem.shutdown();
380 public void testSetDatastoreContext() {
381 new JavaTestKit(getSystem()) {{
382 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
383 mock(Configuration.class), DatastoreContext.newBuilder().
384 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
386 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
387 assertEquals("getTransactionCommitOperationTimeout", 7,
388 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
390 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
391 shardTransactionCommitTimeoutInSeconds(8).build();
393 actorContext.setDatastoreContext(newContext);
395 expectMsgClass(duration("5 seconds"), DatastoreContext.class);
397 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
399 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
400 assertEquals("getTransactionCommitOperationTimeout", 8,
401 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
406 public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
408 TestActorRef<MessageCollectorActor> shardManager =
409 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
411 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
412 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
414 ActorContext actorContext =
415 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
416 mock(Configuration.class), dataStoreContext) {
418 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
419 return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
424 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
425 ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
427 assertNotNull(actual);
429 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
431 ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
433 assertEquals(cachedSelection, actual);
435 // Wait for 200 Milliseconds. The cached entry should have been removed.
437 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
439 cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
446 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
448 TestActorRef<MessageCollectorActor> shardManager =
449 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
451 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
452 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
454 ActorContext actorContext =
455 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
456 mock(Configuration.class), dataStoreContext) {
458 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
459 return Futures.successful((Object) new PrimaryNotFoundException("not found"));
464 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
467 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
468 fail("Expected PrimaryNotFoundException");
469 } catch(PrimaryNotFoundException e){
473 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
479 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
481 TestActorRef<MessageCollectorActor> shardManager =
482 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
484 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
485 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
487 ActorContext actorContext =
488 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
489 mock(Configuration.class), dataStoreContext) {
491 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
492 return Futures.successful((Object) new NotInitializedException("not iniislized"));
497 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
500 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
501 fail("Expected NotInitializedException");
502 } catch(NotInitializedException e){
506 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
512 public void testBroadcast() {
513 new JavaTestKit(getSystem()) {{
514 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
515 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
517 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
518 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
519 shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
520 shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
521 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
523 Configuration mockConfig = mock(Configuration.class);
524 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
525 when(mockConfig).getAllShardNames();
527 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
528 mock(ClusterWrapper.class), mockConfig,
529 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
531 actorContext.broadcast(new TestMessage());
533 expectFirstMatching(shardActorRef1, TestMessage.class);
534 expectFirstMatching(shardActorRef2, TestMessage.class);
538 private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
539 int count = 5000 / 50;
540 for(int i = 0; i < count; i++) {
542 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
543 if(message != null) {
546 } catch (Exception e) {}
548 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
551 Assert.fail("Did not receive message of type " + clazz);