1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertNotNull;
6 import static org.junit.Assert.assertNull;
7 import static org.junit.Assert.assertTrue;
8 import static org.junit.Assert.fail;
9 import static org.mockito.Mockito.doReturn;
10 import static org.mockito.Mockito.mock;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSelection;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.Props;
16 import akka.actor.UntypedActor;
17 import akka.dispatch.Futures;
18 import akka.japi.Creator;
19 import akka.testkit.JavaTestKit;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.concurrent.TimeUnit;
26 import org.apache.commons.lang.time.StopWatch;
27 import org.junit.Assert;
28 import org.junit.Test;
29 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
30 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
31 import org.opendaylight.controller.cluster.datastore.Configuration;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
34 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
35 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
36 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
37 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
38 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
39 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
40 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
41 import scala.concurrent.Await;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.Duration;
44 import scala.concurrent.duration.FiniteDuration;
46 public class ActorContextTest extends AbstractActorTest{
48 private static class MockShardManager extends UntypedActor {
50 private final boolean found;
51 private final ActorRef actorRef;
53 private MockShardManager(boolean found, ActorRef actorRef){
56 this.actorRef = actorRef;
59 @Override public void onReceive(Object message) throws Exception {
61 getSender().tell(new LocalShardFound(actorRef), getSelf());
63 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
67 private static Props props(final boolean found, final ActorRef actorRef){
68 return Props.create(new MockShardManagerCreator(found, actorRef) );
71 @SuppressWarnings("serial")
72 private static class MockShardManagerCreator implements Creator<MockShardManager> {
74 final ActorRef actorRef;
76 MockShardManagerCreator(boolean found, ActorRef actorRef) {
78 this.actorRef = actorRef;
82 public MockShardManager create() throws Exception {
83 return new MockShardManager(found, actorRef);
89 public void testFindLocalShardWithShardFound(){
90 new JavaTestKit(getSystem()) {{
92 new Within(duration("1 seconds")) {
94 protected void run() {
96 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
98 ActorRef shardManagerActorRef = getSystem()
99 .actorOf(MockShardManager.props(true, shardActorRef));
101 ActorContext actorContext =
102 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
103 mock(Configuration.class));
105 Optional<ActorRef> out = actorContext.findLocalShard("default");
107 assertEquals(shardActorRef, out.get());
118 public void testFindLocalShardWithShardNotFound(){
119 new JavaTestKit(getSystem()) {{
120 ActorRef shardManagerActorRef = getSystem()
121 .actorOf(MockShardManager.props(false, null));
123 ActorContext actorContext =
124 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
125 mock(Configuration.class));
127 Optional<ActorRef> out = actorContext.findLocalShard("default");
128 assertTrue(!out.isPresent());
134 public void testExecuteRemoteOperation() {
135 new JavaTestKit(getSystem()) {{
136 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
138 ActorRef shardManagerActorRef = getSystem()
139 .actorOf(MockShardManager.props(true, shardActorRef));
141 ActorContext actorContext =
142 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
143 mock(Configuration.class));
145 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
147 Object out = actorContext.executeOperation(actor, "hello");
149 assertEquals("hello", out);
154 public void testExecuteRemoteOperationAsync() {
155 new JavaTestKit(getSystem()) {{
156 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
158 ActorRef shardManagerActorRef = getSystem()
159 .actorOf(MockShardManager.props(true, shardActorRef));
161 ActorContext actorContext =
162 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
163 mock(Configuration.class));
165 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
167 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
170 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
171 assertEquals("Result", "hello", result);
172 } catch(Exception e) {
173 throw new AssertionError(e);
179 public void testIsPathLocal() {
180 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
181 ActorContext actorContext = null;
183 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
184 assertEquals(false, actorContext.isPathLocal(null));
185 assertEquals(false, actorContext.isPathLocal(""));
187 clusterWrapper.setSelfAddress(null);
188 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
189 assertEquals(false, actorContext.isPathLocal(""));
191 // even if the path is in local format, match the primary path (first 3 elements) and return true
192 clusterWrapper.setSelfAddress(new Address("akka", "test"));
193 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
194 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
196 clusterWrapper.setSelfAddress(new Address("akka", "test"));
197 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
198 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
200 clusterWrapper.setSelfAddress(new Address("akka", "test"));
201 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
202 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
204 // self address of remote format,but Tx path local format.
205 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
206 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
207 assertEquals(true, actorContext.isPathLocal(
208 "akka://system/user/shardmanager/shard/transaction"));
210 // self address of local format,but Tx path remote format.
211 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
212 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
213 assertEquals(false, actorContext.isPathLocal(
214 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
216 //local path but not same
217 clusterWrapper.setSelfAddress(new Address("akka", "test"));
218 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
219 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
222 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
223 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
224 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
226 // forward-slash missing in address
227 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
228 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
229 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
232 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
233 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
234 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
237 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
238 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
239 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
243 public void testResolvePathForRemoteActor() {
244 ActorContext actorContext =
245 new ActorContext(getSystem(), mock(ActorRef.class), mock(
246 ClusterWrapper.class),
247 mock(Configuration.class));
249 String actual = actorContext.resolvePath(
250 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
251 "akka://system/user/shardmanager/shard/transaction");
253 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
255 assertEquals(expected, actual);
259 public void testResolvePathForLocalActor() {
260 ActorContext actorContext =
261 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
262 mock(Configuration.class));
264 String actual = actorContext.resolvePath(
265 "akka://system/user/shardmanager/shard",
266 "akka://system/user/shardmanager/shard/transaction");
268 String expected = "akka://system/user/shardmanager/shard/transaction";
270 assertEquals(expected, actual);
274 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
275 ActorContext actorContext =
276 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
277 mock(Configuration.class));
279 String actual = actorContext.resolvePath(
280 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
281 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
283 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
285 assertEquals(expected, actual);
289 public void testRateLimiting(){
290 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
292 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
293 doReturn("config").when(mockDataStoreContext).getDataStoreType();
294 doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
296 ActorContext actorContext =
297 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
298 mock(Configuration.class), mockDataStoreContext);
300 // Check that the initial value is being picked up from DataStoreContext
301 assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
303 actorContext.setTxCreationLimit(1.0);
305 assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
308 StopWatch watch = new StopWatch();
312 actorContext.acquireTxCreationPermit();
313 actorContext.acquireTxCreationPermit();
314 actorContext.acquireTxCreationPermit();
318 assertTrue("did not take as much time as expected", watch.getTime() > 1000);
322 public void testClientDispatcherIsGlobalDispatcher(){
324 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
326 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
327 doReturn("config").when(mockDataStoreContext).getDataStoreType();
328 doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
330 ActorContext actorContext =
331 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
332 mock(Configuration.class), mockDataStoreContext);
334 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
339 public void testClientDispatcherIsNotGlobalDispatcher(){
341 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
343 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
344 doReturn("config").when(mockDataStoreContext).getDataStoreType();
345 doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
347 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
349 ActorContext actorContext =
350 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
351 mock(Configuration.class), mockDataStoreContext);
353 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
355 actorSystem.shutdown();
360 public void testSetDatastoreContext() {
361 new JavaTestKit(getSystem()) {{
362 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
363 mock(Configuration.class), DatastoreContext.newBuilder().
364 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
366 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
367 assertEquals("getTransactionCommitOperationTimeout", 7,
368 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
370 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
371 shardTransactionCommitTimeoutInSeconds(8).build();
373 actorContext.setDatastoreContext(newContext);
375 expectMsgClass(duration("5 seconds"), DatastoreContext.class);
377 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
379 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
380 assertEquals("getTransactionCommitOperationTimeout", 8,
381 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
386 public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
388 TestActorRef<MessageCollectorActor> shardManager =
389 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
391 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
393 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
394 doReturn("config").when(mockDataStoreContext).getDataStoreType();
395 doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
397 ActorContext actorContext =
398 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
399 mock(Configuration.class), mockDataStoreContext) {
401 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
402 return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
407 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
408 ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
410 assertNotNull(actual);
412 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
414 ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
416 assertEquals(cachedSelection, actual);
418 // Wait for 200 Milliseconds. The cached entry should have been removed.
420 Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
422 cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
429 public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
431 TestActorRef<MessageCollectorActor> shardManager =
432 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
434 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
436 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
437 doReturn("config").when(mockDataStoreContext).getDataStoreType();
438 doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
440 ActorContext actorContext =
441 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
442 mock(Configuration.class), mockDataStoreContext) {
444 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
445 return Futures.successful((Object) new PrimaryNotFound("foobar"));
450 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
453 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
454 fail("Expected PrimaryNotFoundException");
455 } catch(PrimaryNotFoundException e){
459 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
466 public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
468 TestActorRef<MessageCollectorActor> shardManager =
469 TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
471 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
473 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
474 doReturn("config").when(mockDataStoreContext).getDataStoreType();
475 doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
477 ActorContext actorContext =
478 new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
479 mock(Configuration.class), mockDataStoreContext) {
481 protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
482 return Futures.successful((Object) new ActorNotInitialized());
487 Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
490 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
491 fail("Expected NotInitializedException");
492 } catch(NotInitializedException e){
496 Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");