1 package org.opendaylight.controller.cluster.datastore.utils;
3 import static org.junit.Assert.assertEquals;
4 import static org.junit.Assert.assertNotEquals;
5 import static org.junit.Assert.assertTrue;
6 import static org.mockito.Mockito.doReturn;
7 import static org.mockito.Mockito.mock;
8 import akka.actor.ActorRef;
9 import akka.actor.ActorSelection;
10 import akka.actor.ActorSystem;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.actor.UntypedActor;
14 import akka.japi.Creator;
15 import akka.testkit.JavaTestKit;
16 import com.google.common.base.Optional;
17 import com.typesafe.config.ConfigFactory;
18 import java.util.concurrent.TimeUnit;
19 import org.apache.commons.lang.time.StopWatch;
20 import org.junit.Assert;
21 import org.junit.Test;
22 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
23 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
24 import org.opendaylight.controller.cluster.datastore.Configuration;
25 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
26 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
28 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
29 import scala.concurrent.Await;
30 import scala.concurrent.Future;
31 import scala.concurrent.duration.Duration;
33 public class ActorContextTest extends AbstractActorTest{
35 private static class MockShardManager extends UntypedActor {
37 private final boolean found;
38 private final ActorRef actorRef;
40 private MockShardManager(boolean found, ActorRef actorRef){
43 this.actorRef = actorRef;
46 @Override public void onReceive(Object message) throws Exception {
48 getSender().tell(new LocalShardFound(actorRef), getSelf());
50 getSender().tell(new LocalShardNotFound(((FindLocalShard) message).getShardName()), getSelf());
54 private static Props props(final boolean found, final ActorRef actorRef){
55 return Props.create(new MockShardManagerCreator(found, actorRef) );
58 @SuppressWarnings("serial")
59 private static class MockShardManagerCreator implements Creator<MockShardManager> {
61 final ActorRef actorRef;
63 MockShardManagerCreator(boolean found, ActorRef actorRef) {
65 this.actorRef = actorRef;
69 public MockShardManager create() throws Exception {
70 return new MockShardManager(found, actorRef);
76 public void testFindLocalShardWithShardFound(){
77 new JavaTestKit(getSystem()) {{
79 new Within(duration("1 seconds")) {
81 protected void run() {
83 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
85 ActorRef shardManagerActorRef = getSystem()
86 .actorOf(MockShardManager.props(true, shardActorRef));
88 ActorContext actorContext =
89 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
90 mock(Configuration.class));
92 Optional<ActorRef> out = actorContext.findLocalShard("default");
94 assertEquals(shardActorRef, out.get());
105 public void testFindLocalShardWithShardNotFound(){
106 new JavaTestKit(getSystem()) {{
107 ActorRef shardManagerActorRef = getSystem()
108 .actorOf(MockShardManager.props(false, null));
110 ActorContext actorContext =
111 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
112 mock(Configuration.class));
114 Optional<ActorRef> out = actorContext.findLocalShard("default");
115 assertTrue(!out.isPresent());
121 public void testExecuteRemoteOperation() {
122 new JavaTestKit(getSystem()) {{
123 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
125 ActorRef shardManagerActorRef = getSystem()
126 .actorOf(MockShardManager.props(true, shardActorRef));
128 ActorContext actorContext =
129 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
130 mock(Configuration.class));
132 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
134 Object out = actorContext.executeOperation(actor, "hello");
136 assertEquals("hello", out);
141 public void testExecuteRemoteOperationAsync() {
142 new JavaTestKit(getSystem()) {{
143 ActorRef shardActorRef = getSystem().actorOf(Props.create(EchoActor.class));
145 ActorRef shardManagerActorRef = getSystem()
146 .actorOf(MockShardManager.props(true, shardActorRef));
148 ActorContext actorContext =
149 new ActorContext(getSystem(), shardManagerActorRef , mock(ClusterWrapper.class),
150 mock(Configuration.class));
152 ActorSelection actor = actorContext.actorSelection(shardActorRef.path());
154 Future<Object> future = actorContext.executeOperationAsync(actor, "hello");
157 Object result = Await.result(future, Duration.create(3, TimeUnit.SECONDS));
158 assertEquals("Result", "hello", result);
159 } catch(Exception e) {
160 throw new AssertionError(e);
166 public void testIsPathLocal() {
167 MockClusterWrapper clusterWrapper = new MockClusterWrapper();
168 ActorContext actorContext = null;
170 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
171 assertEquals(false, actorContext.isPathLocal(null));
172 assertEquals(false, actorContext.isPathLocal(""));
174 clusterWrapper.setSelfAddress(null);
175 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
176 assertEquals(false, actorContext.isPathLocal(""));
178 // even if the path is in local format, match the primary path (first 3 elements) and return true
179 clusterWrapper.setSelfAddress(new Address("akka", "test"));
180 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
181 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
183 clusterWrapper.setSelfAddress(new Address("akka", "test"));
184 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
185 assertEquals(true, actorContext.isPathLocal("akka://test/user/$a"));
187 clusterWrapper.setSelfAddress(new Address("akka", "test"));
188 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
189 assertEquals(true, actorContext.isPathLocal("akka://test/user/token2/token3/$a"));
191 // self address of remote format,but Tx path local format.
192 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
193 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
194 assertEquals(true, actorContext.isPathLocal(
195 "akka://system/user/shardmanager/shard/transaction"));
197 // self address of local format,but Tx path remote format.
198 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system"));
199 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
200 assertEquals(false, actorContext.isPathLocal(
201 "akka://system@127.0.0.1:2550/user/shardmanager/shard/transaction"));
203 //local path but not same
204 clusterWrapper.setSelfAddress(new Address("akka", "test"));
205 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
206 assertEquals(true, actorContext.isPathLocal("akka://test1/user/$a"));
209 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
210 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
211 assertEquals(true, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550/"));
213 // forward-slash missing in address
214 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
215 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
216 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2550"));
219 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
220 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
221 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.1.0.1:2550/"));
224 clusterWrapper.setSelfAddress(new Address("akka.tcp", "system", "127.0.0.1", 2550));
225 actorContext = new ActorContext(getSystem(), null, clusterWrapper, mock(Configuration.class));
226 assertEquals(false, actorContext.isPathLocal("akka.tcp://system@127.0.0.1:2551/"));
230 public void testResolvePathForRemoteActor() {
231 ActorContext actorContext =
232 new ActorContext(getSystem(), mock(ActorRef.class), mock(
233 ClusterWrapper.class),
234 mock(Configuration.class));
236 String actual = actorContext.resolvePath(
237 "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
238 "akka://system/user/shardmanager/shard/transaction");
240 String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
242 assertEquals(expected, actual);
246 public void testResolvePathForLocalActor() {
247 ActorContext actorContext =
248 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
249 mock(Configuration.class));
251 String actual = actorContext.resolvePath(
252 "akka://system/user/shardmanager/shard",
253 "akka://system/user/shardmanager/shard/transaction");
255 String expected = "akka://system/user/shardmanager/shard/transaction";
257 assertEquals(expected, actual);
261 public void testResolvePathForRemoteActorWithProperRemoteAddress() {
262 ActorContext actorContext =
263 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
264 mock(Configuration.class));
266 String actual = actorContext.resolvePath(
267 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard",
268 "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction");
270 String expected = "akka.tcp://system@7.0.0.1:2550/user/shardmanager/shard/transaction";
272 assertEquals(expected, actual);
276 public void testRateLimiting(){
277 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
279 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
280 doReturn("config").when(mockDataStoreContext).getDataStoreType();
282 ActorContext actorContext =
283 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
284 mock(Configuration.class), mockDataStoreContext);
286 // Check that the initial value is being picked up from DataStoreContext
287 assertEquals(mockDataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
289 actorContext.setTxCreationLimit(1.0);
291 assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
294 StopWatch watch = new StopWatch();
298 actorContext.acquireTxCreationPermit();
299 actorContext.acquireTxCreationPermit();
300 actorContext.acquireTxCreationPermit();
304 assertTrue("did not take as much time as expected", watch.getTime() > 1000);
308 public void testClientDispatcherIsGlobalDispatcher(){
310 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
312 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
313 doReturn("config").when(mockDataStoreContext).getDataStoreType();
315 ActorContext actorContext =
316 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
317 mock(Configuration.class), mockDataStoreContext);
319 assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
324 public void testClientDispatcherIsNotGlobalDispatcher(){
326 DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
328 doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
329 doReturn("config").when(mockDataStoreContext).getDataStoreType();
331 ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
333 ActorContext actorContext =
334 new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class),
335 mock(Configuration.class), mockDataStoreContext);
337 assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher());
339 actorSystem.shutdown();
344 public void testSetDatastoreContext() {
345 new JavaTestKit(getSystem()) {{
346 ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class),
347 mock(Configuration.class), DatastoreContext.newBuilder().
348 operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build());
350 assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds());
351 assertEquals("getTransactionCommitOperationTimeout", 7,
352 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
354 DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6).
355 shardTransactionCommitTimeoutInSeconds(8).build();
357 actorContext.setDatastoreContext(newContext);
359 expectMsgClass(duration("5 seconds"), DatastoreContext.class);
361 Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext());
363 assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds());
364 assertEquals("getTransactionCommitOperationTimeout", 8,
365 actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());