1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.mock;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.UntypedActor;
17 import akka.dispatch.Futures;
18 import akka.japi.Creator;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.Maps;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.Arrays;
29 import java.util.concurrent.TimeUnit;
30 import org.apache.commons.lang.time.StopWatch;
31 import org.junit.Assert;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
34 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
35 import org.opendaylight.controller.cluster.datastore.Configuration;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
37 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
38 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
39 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
40 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
41 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
42 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
43 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
44 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
45 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
46 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49 import scala.concurrent.Await;
50 import scala.concurrent.Future;
51 import scala.concurrent.duration.Duration;
52 import scala.concurrent.duration.FiniteDuration;
54 public class ActorContextTest extends AbstractActorTest{
56 static final Logger log = LoggerFactory.getLogger(ActorContextTest.class);
58 private static class TestMessage {
61 private static class MockShardManager extends UntypedActor {
63 private final boolean found;
64 private final ActorRef actorRef;
65 private final Map<String,Object> findPrimaryResponses = Maps.newHashMap();
67 private MockShardManager(boolean found, ActorRef actorRef){
70 this.actorRef = actorRef;
73 @Override public void onReceive(Object message) throws Exception {
74 if(message instanceof FindPrimary) {
75 FindPrimary fp = (FindPrimary)message;
76 Object resp = findPrimaryResponses.get(fp.getShardName());
78 log.error("No expected FindPrimary response found for shard name {}", fp.getShardName());
80 getSender().tell(resp, getSelf());
87 getSender().tell(new LocalShardFound(actorRef), getSelf());
89 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
93 void addFindPrimaryResp(String shardName, Object resp) {
94 findPrimaryResponses.put(shardName, resp);
97 private static Props props(final boolean found, final ActorRef actorRef){
98 return Props.create(new MockShardManagerCreator(found, actorRef) );
101 private static Props props(){
102 return Props.create(new MockShardManagerCreator() );
105 @SuppressWarnings("serial")
106 private static class MockShardManagerCreator implements Creator<MockShardManager> {
108 final ActorRef actorRef;
110 MockShardManagerCreator() {
112 this.actorRef = null;
115 MockShardManagerCreator(boolean found, ActorRef actorRef) {
117 this.actorRef = actorRef;
121 public MockShardManager create() throws Exception {
122 return new MockShardManager(found, actorRef);
128 public void testFindLocalShardWithShardFound(){
129 new JavaTestKit(getSystem()) {{
131 new Within(duration("1 seconds")) {
133 protected void run() {
135 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
137 ActorRef shardManagerActorRef = getSystem()
138 .actorOf(MockShardManager.props(true, shardActorRef));
140 ActorContext actorContext =
141 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
142 mock(Configuration.class));
144 Optional<ActorRef> out = actorContext.findLocalShard("default");
146 assertEquals(shardActorRef, out.get());
157 public void testFindLocalShardWithShardNotFound(){
158 new JavaTestKit(getSystem()) {{
159 ActorRef shardManagerActorRef = getSystem()
160 .actorOf(MockShardManager.props(false, null));
162 ActorContext actorContext =
163 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
164 mock(Configuration.class));
166 Optional<ActorRef> out = actorContext.findLocalShard("default");
167 assertTrue(!out.isPresent());
173 public void testExecuteRemoteOperation() {
174 new JavaTestKit(getSystem()) {{
175 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
177 ActorRef shardManagerActorRef = getSystem()
178 .actorOf(MockShardManager.props(true, shardActorRef));
180 ActorContext actorContext =
181 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
182 mock(Configuration.class));
184 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
186 Object out = actorContext.executeOperation(actor, "hello");
188 assertEquals("hello", out);
193 public void testExecuteRemoteOperationAsync() {
194 new JavaTestKit(getSystem()) {{
195 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
197 ActorRef shardManagerActorRef = getSystem()
198 .actorOf(MockShardManager.props(true, shardActorRef));
200 ActorContext actorContext =
201 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
202 mock(Configuration.class));
204 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
206 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
209 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
210 assertEquals("Result", "hello", result);
211 } catch(Exception e) {
212 throw new AssertionError(e);
218 public void testIsPathLocal() {
219 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
220 ActorContext actorContext = null;
222 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
223 assertEquals(false, actorContext.isPathLocal(null));
224 assertEquals(false, actorContext.isPathLocal(""));
226 clusterWrapper.setSelfAddress(null);
227 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
228 assertEquals(false, actorContext.isPathLocal(""));
230 // even if the path is in local format, match the primary path (first 3 elements) and return true
231 clusterWrapper.setSelfAddress(new Address("akka", "test"));
232 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
233 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
235 clusterWrapper.setSelfAddress(new Address("akka", "test"));
236 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
237 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
239 clusterWrapper.setSelfAddress(new Address("akka", "test"));
240 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
241 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
243 // self address of remote format,but Tx path local format.
244 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
245 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
246 assertEquals(true, actorContext.isPathLocal(
247 "akka://system/user/shardmanager/shard/transaction"));
249 // self address of local format,but Tx path remote format.
250 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
251 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
252 assertEquals(false, actorContext.isPathLocal(
253 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
255 //local path but not same
256 clusterWrapper.setSelfAddress(new Address("akka", "test"));
257 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
258 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
261 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
262 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
263 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
265 // forward-slash missing in address
266 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
267 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
268 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
271 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
272 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
273 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
276 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
277 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
278 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
282 public void testResolvePathForRemoteActor() {
283 ActorContext actorContext =
284 new ActorContext(getSystem(), mock(ActorRef.class), mock(
285 ClusterWrapper.class),
286 mock(Configuration.class));
288 String actual = actorContext.resolvePath(
289 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
290 "akka://system/user/shardmanager/shard/transaction");
292 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
294 assertEquals(expected, actual);
298 public void testResolvePathForLocalActor() {
299 ActorContext actorContext =
300 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
301 mock(Configuration.class));
303 String actual = actorContext.resolvePath(
304 "akka://system/user/shardmanager/shard",
305 "akka://system/user/shardmanager/shard/transaction");
307 String expected = "akka://system/user/shardmanager/shard/transaction";
309 assertEquals(expected, actual);
313 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
314 ActorContext actorContext =
315 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
316 mock(Configuration.class));
318 String actual = actorContext.resolvePath(
319 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
320 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
322 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
324 assertEquals(expected, actual);
328 public void testRateLimiting(){
329 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
330 transactionCreationInitialRateLimit(155L).build();
332 ActorContext actorContext =
333 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
334 mock(Configuration.class), dataStoreContext);
336 // Check that the initial value is being picked up from DataStoreContext
337 assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
339 actorContext.setTxCreationLimit(1.0);
341 assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
344 StopWatch watch = new StopWatch();
348 actorContext.acquireTxCreationPermit();
349 actorContext.acquireTxCreationPermit();
350 actorContext.acquireTxCreationPermit();
354 assertTrue("did not take as much time as expected", watch.getTime() > 1000);
358 public void testClientDispatcherIsGlobalDispatcher(){
359 ActorContext actorContext =
360 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
361 mock(Configuration.class), DatastoreContext.newBuilder().build());
363 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
368 public void testClientDispatcherIsNotGlobalDispatcher(){
369 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
371 ActorContext actorContext =
372 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
373 mock(Configuration.class), DatastoreContext.newBuilder().build());
375 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
377 actorSystem.shutdown();
382 public void testSetDatastoreContext() {
383 new JavaTestKit(getSystem()) {{
384 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
385 mock(Configuration.class), DatastoreContext.newBuilder().
386 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
388 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
389 assertEquals("getTransactionCommitOperationTimeout", 7,
390 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
392 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
393 shardTransactionCommitTimeoutInSeconds(8).build();
395 actorContext.setDatastoreContext(newContext);
397 expectMsgClass(duration("5 seconds"), DatastoreContext.class);
399 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
401 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
402 assertEquals("getTransactionCommitOperationTimeout", 8,
403 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
408 public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
410 TestActorRef<MessageCollectorActor> shardManager =
411 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
413 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
414 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
416 ActorContext actorContext =
417 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
418 mock(Configuration.class), dataStoreContext) {
420 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
421 return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
426 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
427 ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
429 assertNotNull(actual);
431 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
433 ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
435 assertEquals(cachedSelection, actual);
437 // Wait for 200 Milliseconds. The cached entry should have been removed.
439 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
441 cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
448 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
450 TestActorRef<MessageCollectorActor> shardManager =
451 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
453 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
454 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
456 ActorContext actorContext =
457 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
458 mock(Configuration.class), dataStoreContext) {
460 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
461 return Futures.successful((Object) new PrimaryNotFound("foobar"));
466 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
469 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
470 fail("Expected PrimaryNotFoundException");
471 } catch(PrimaryNotFoundException e){
475 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
481 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
483 TestActorRef<MessageCollectorActor> shardManager =
484 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
486 DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
487 shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
489 ActorContext actorContext =
490 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
491 mock(Configuration.class), dataStoreContext) {
493 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
494 return Futures.successful((Object) new ActorNotInitialized());
499 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
502 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
503 fail("Expected NotInitializedException");
504 } catch(NotInitializedException e){
508 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
514 public void testBroadcast() {
515 new JavaTestKit(getSystem()) {{
516 ActorRef shardActorRef1 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
517 ActorRef shardActorRef2 = getSystem().actorOf(Props.create(MessageCollectorActor.class));
519 TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
520 MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
521 shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()).toSerializable());
522 shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()).toSerializable());
523 shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
525 Configuration mockConfig = mock(Configuration.class);
526 doReturn(Sets.newLinkedHashSet(Arrays.asList("shard1", "shard2", "shard3"))).
527 when(mockConfig).getAllShardNames();
529 ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef,
530 mock(ClusterWrapper.class), mockConfig,
531 DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build());
533 actorContext.broadcast(new TestMessage());
535 expectFirstMatching(shardActorRef1, TestMessage.class);
536 expectFirstMatching(shardActorRef2, TestMessage.class);
540 private <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
541 int count = 5000 / 50;
542 for(int i = 0; i < count; i++) {
544 T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz);
545 if(message != null) {
548 } catch (Exception e) {}
550 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
553 Assert.fail("Did not receive message of type " + clazz);