X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContextTest.java;h=6bd732e038a00055bb2407ccc416c7f192059405;hb=51500b537a2d903acf2794091da8f79cbf082d50;hp=3c6a0cef5c605fbb23e3c9501676f67d95805e9d;hpb=5c008222efa5c0af49cf8a52881a6299b1e249dc;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 3c6a0cef5c..6bd732e038 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -2,7 +2,10 @@ package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; @@ -11,23 +14,34 @@ import akka.actor.ActorSystem; import akka.actor.Address; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.dispatch.Futures; import akka.japi.Creator; import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import akka.util.Timeout; import com.google.common.base.Optional; +import com.google.common.util.concurrent.Uninterruptibles; import com.typesafe.config.ConfigFactory; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.StopWatch; +import org.junit.Assert; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; import org.opendaylight.controller.cluster.datastore.Configuration; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; +import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; public class ActorContextTest extends AbstractActorTest{ @@ -277,6 +291,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), @@ -310,6 +325,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), @@ -326,6 +342,7 @@ public class ActorContextTest extends AbstractActorTest{ doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); ActorSystem actorSystem = ActorSystem.create("with-custom-dispatchers", ConfigFactory.load("application-with-custom-dispatchers.conf")); @@ -339,4 +356,147 @@ public class ActorContextTest extends AbstractActorTest{ } + @Test + public void testSetDatastoreContext() { + new JavaTestKit(getSystem()) {{ + ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), + mock(Configuration.class), DatastoreContext.newBuilder(). + operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build()); + + assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 7, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + + DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6). + shardTransactionCommitTimeoutInSeconds(8).build(); + + actorContext.setDatastoreContext(newContext); + + expectMsgClass(duration("5 seconds"), DatastoreContext.class); + + Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); + + assertEquals("getOperationDuration", 6, actorContext.getOperationDuration().toSeconds()); + assertEquals("getTransactionCommitOperationTimeout", 8, + actorContext.getTransactionCommitOperationTimeout().duration().toSeconds()); + }}; + } + + @Test + public void testFindPrimaryShardAsyncPrimaryFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + + assertNotNull(actual); + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + + assertEquals(cachedSelection, actual); + + // Wait for 200 Milliseconds. The cached entry should have been removed. + + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + + cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + + @Test + public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new PrimaryNotFound("foobar")); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected PrimaryNotFoundException"); + } catch(PrimaryNotFoundException e){ + + } + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + + @Test + public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + DatastoreContext mockDataStoreContext = mock(DatastoreContext.class); + + doReturn(155L).when(mockDataStoreContext).getTransactionCreationInitialRateLimit(); + doReturn("config").when(mockDataStoreContext).getDataStoreType(); + doReturn(Timeout.apply(100, TimeUnit.MILLISECONDS)).when(mockDataStoreContext).getShardLeaderElectionTimeout(); + + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), mockDataStoreContext) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful((Object) new ActorNotInitialized()); + } + }; + + + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected NotInitializedException"); + } catch(NotInitializedException e){ + + } + + Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + + assertNull(cached); + + } + }