Avoid IllegalArgument on missing source
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / utils / ActorContextTest.java
index fd41c49390b484fd0d4343befa2f920d542e2f74..6bd732e038a00055bb2407ccc416c7f192059405 100644 (file)
@@ -2,7 +2,10 @@ package org.opendaylight.controller.cluster.datastore.utils;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
@@ -11,9 +14,13 @@ import akka.actor.ActorSystem;
 import akka.actor.Address;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
+import akka.dispatch.Futures;
 import akka.japi.Creator;
 import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import akka.util.Timeout;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.StopWatch;
@@ -23,12 +30,18 @@ import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.Configuration;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 public class ActorContextTest extends AbstractActorTest{
 
@@ -278,6 +291,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
@@ -311,6 +325,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorContext actorContext =
                 new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
@@ -327,6 +342,7 @@ public class ActorContextTest extends AbstractActorTest{
 
         doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
         doReturn("config").when(mockDataStoreContext).getDataStoreType();
+        doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
 
         ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf"));
 
@@ -365,4 +381,122 @@ public class ActorContextTest extends AbstractActorTest{
                     actorContext.getTransactionCommitOperationTimeout().duration().toSeconds());
         }};
     }
+
+    @Test
+    public void testFindPrimaryShardAsyncPrimaryFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new PrimaryFound("akka://test-system/test"));
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+            ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS));
+
+            assertNotNull(actual);
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS));
+
+            assertEquals(cachedSelection, actual);
+
+            // Wait for 200 Milliseconds. The cached entry should have been removed.
+
+            Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);
+
+            cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new PrimaryNotFound("foobar"));
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+            try {
+                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+                fail("Expected PrimaryNotFoundException");
+            } catch(PrimaryNotFoundException e){
+
+            }
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
+    @Test
+    public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception {
+
+            TestActorRef<MessageCollectorActor> shardManager =
+                    TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+            DatastoreContext mockDataStoreContext = mock(DatastoreContext.class);
+
+            doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit();
+            doReturn("config").when(mockDataStoreContext).getDataStoreType();
+            doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout();
+
+            ActorContext actorContext =
+                    new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class),
+                            mock(Configuration.class), mockDataStoreContext) {
+                        @Override
+                        protected Future<Object> doAsk(ActorRef actorRef, Object message, Timeout timeout) {
+                            return Futures.successful((Object) new ActorNotInitialized());
+                        }
+                    };
+
+
+            Future<ActorSelection> foobar = actorContext.findPrimaryShardAsync("foobar");
+
+            try {
+                Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS));
+                fail("Expected NotInitializedException");
+            } catch(NotInitializedException e){
+
+            }
+
+            Future<ActorSelection> cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar");
+
+            assertNull(cached);
+
+    }
+
 }