BUG 3125 : Set Rate Limit just before acquiring a permit to avoid contention
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
index 6b4f6337785a753e68e1330f8296927aeb70b005..377c05bf8c2eaed0bd6ac6b4278b780d20c95499 100644 (file)
@@ -4,6 +4,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
@@ -27,9 +28,9 @@ import com.typesafe.config.ConfigFactory;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.time.StopWatch;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
@@ -39,9 +40,12 @@ import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedEx
 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -322,35 +326,6 @@ public class ActorContextTest extends AbstractActorTest{
         assertEquals(expected, actual);
     }
 
-    @Test
-    public void testRateLimiting(){
-        DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
-                transactionCreationInitialRateLimit(155L).build();
-
-        ActorContext actorContext =
-                new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
-                        mock(Configuration.class), dataStoreContext);
-
-        // Check that the initial value is being picked up from DataStoreContext
-        assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15);
-
-        actorContext.setTxCreationLimit(1.0);
-
-        assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15);
-
-
-        StopWatch watch = new StopWatch();
-
-        watch.start();
-
-        actorContext.acquireTxCreationPermit();
-        actorContext.acquireTxCreationPermit();
-        actorContext.acquireTxCreationPermit();
-
-        watch.stop();
-
-        assertTrue("did not take as much time as expected", watch.getTime() > 1000);
-    }
 
     @Test
     public void testClientDispatcherIsGlobalDispatcher(){
@@ -403,7 +378,7 @@ public class ActorContextTest extends AbstractActorTest{
     }
 
     @Test
-    public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+    public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception {
 
             TestActorRef<MessageCollectorActor> shardManager =
                     TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
@@ -411,35 +386,81 @@ public class ActorContextTest extends AbstractActorTest{
             DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
                     shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
 
+            final String expPrimaryPath = "akka://test-system/find-primary-shard";
             ActorContext actorContext =
                     new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
                             mock(Configuration.class), dataStoreContext) {
                         @Override
                         protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
-                            return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+                            return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath));
                         }
                     };
 
-
-            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
-            ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+            PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
 
             assertNotNull(actual);
+            assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent());
+            assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+                    expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
 
-            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
-            ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+            PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
 
-            assertEquals(cachedSelection, actual);
+            assertEquals(cachedInfo, actual);
 
             // Wait for 200 Milliseconds. The cached entry should have been removed.
 
             Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
 
-            cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+            cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
             assertNull(cached);
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config").
+                    shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build();
+
+            final DataTree mockDataTree = Mockito.mock(DataTree.class);
+            final String expPrimaryPath = "akka://test-system/find-primary-shard";
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), dataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree));
+                        }
+                    };
+
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
+            PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
 
+            assertNotNull(actual);
+            assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent());
+            assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get());
+            assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(),
+                    expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString()));
+
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+            assertEquals(cachedInfo, actual);
+
+            // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+            cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
+
+            assertNull(cached);
     }
 
     @Test
@@ -461,7 +482,7 @@ public class ActorContextTest extends AbstractActorTest{
                     };
 
 
-            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
 
             try {
                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
@@ -470,7 +491,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             }
 
-            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
             assertNull(cached);
     }
@@ -494,7 +515,7 @@ public class ActorContextTest extends AbstractActorTest{
                     };
 
 
-            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+            Future<PrimaryShardInfo> foobar = actorContext.findPrimaryShardAsync("foobar");
 
             try {
                 Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
@@ -503,7 +524,7 @@ public class ActorContextTest extends AbstractActorTest{
 
             }
 
-            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+            Future<PrimaryShardInfo> cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar");
 
             assertNull(cached);
     }
@@ -516,8 +537,8 @@ public class ActorContextTest extends AbstractActorTest{
 
             TestActorRef<MockShardManager> shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props());
             MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor();
-            shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString()));
-            shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString()));
+            shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString()));
             shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found"));
 
             Configuration mockConfig = mock(Configuration.class);