X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Ftest%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Futils%2FActorContextTest.java;h=7e484ebde3d79bcadbb182162c55c23c56675d78;hb=refs%2Fchanges%2F20%2F28420%2F6;hp=6b4f6337785a753e68e1330f8296927aeb70b005;hpb=3940ce6060e027fe870244346e5309229cc8dc48;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java index 6b4f633778..7e484ebde3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/ActorContextTest.java @@ -1,9 +1,18 @@ +/* + * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore.utils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doReturn; @@ -27,21 +36,26 @@ import com.typesafe.config.ConfigFactory; import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang.time.StopWatch; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.ClusterWrapper; -import org.opendaylight.controller.cluster.datastore.Configuration; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory; +import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException; import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound; import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound; -import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo; +import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Await; @@ -322,41 +336,12 @@ public class ActorContextTest extends AbstractActorTest{ assertEquals(expected, actual); } - @Test - public void testRateLimiting(){ - DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). - transactionCreationInitialRateLimit(155L).build(); - - ActorContext actorContext = - new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext); - - // Check that the initial value is being picked up from DataStoreContext - assertEquals(dataStoreContext.getTransactionCreationInitialRateLimit(), actorContext.getTxCreationLimit(), 1e-15); - - actorContext.setTxCreationLimit(1.0); - - assertEquals(1.0, actorContext.getTxCreationLimit(), 1e-15); - - - StopWatch watch = new StopWatch(); - - watch.start(); - - actorContext.acquireTxCreationPermit(); - actorContext.acquireTxCreationPermit(); - actorContext.acquireTxCreationPermit(); - - watch.stop(); - - assertTrue("did not take as much time as expected", watch.getTime() > 1000); - } @Test public void testClientDispatcherIsGlobalDispatcher(){ ActorContext actorContext = new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), DatastoreContext.newBuilder().build()); + mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); assertEquals(getSystem().dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -368,7 +353,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorContext actorContext = new ActorContext(actorSystem, mock(ActorRef.class), mock(ClusterWrapper.class), - mock(Configuration.class), DatastoreContext.newBuilder().build()); + mock(Configuration.class), DatastoreContext.newBuilder().build(), new PrimaryShardInfoFutureCache()); assertNotEquals(actorSystem.dispatchers().defaultGlobalDispatcher(), actorContext.getClientDispatcher()); @@ -381,7 +366,7 @@ public class ActorContextTest extends AbstractActorTest{ new JavaTestKit(getSystem()) {{ ActorContext actorContext = new ActorContext(getSystem(), getRef(), mock(ClusterWrapper.class), mock(Configuration.class), DatastoreContext.newBuilder(). - operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build()); + operationTimeoutInSeconds(5).shardTransactionCommitTimeoutInSeconds(7).build(), new PrimaryShardInfoFutureCache()); assertEquals("getOperationDuration", 5, actorContext.getOperationDuration().toSeconds()); assertEquals("getTransactionCommitOperationTimeout", 7, @@ -390,9 +375,12 @@ public class ActorContextTest extends AbstractActorTest{ DatastoreContext newContext = DatastoreContext.newBuilder().operationTimeoutInSeconds(6). shardTransactionCommitTimeoutInSeconds(8).build(); - actorContext.setDatastoreContext(newContext); + DatastoreContextFactory mockContextFactory = mock(DatastoreContextFactory.class); + Mockito.doReturn(newContext).when(mockContextFactory).getBaseDatastoreContext(); + + actorContext.setDatastoreContext(mockContextFactory); - expectMsgClass(duration("5 seconds"), DatastoreContext.class); + expectMsgClass(duration("5 seconds"), DatastoreContextFactory.class); Assert.assertSame("getDatastoreContext", newContext, actorContext.getDatastoreContext()); @@ -403,7 +391,7 @@ public class ActorContextTest extends AbstractActorTest{ } @Test - public void testFindPrimaryShardAsyncPrimaryFound() throws Exception { + public void testFindPrimaryShardAsyncRemotePrimaryFound() throws Exception { TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); @@ -411,39 +399,41 @@ public class ActorContextTest extends AbstractActorTest{ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; + final short expPrimaryVersion = DataStoreVersions.CURRENT_VERSION; ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext) { + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryFound("akka://test-system/test")); + return Futures.successful((Object) new RemotePrimaryShardFound(expPrimaryPath, expPrimaryVersion)); } }; - - Future foobar = actorContext.findPrimaryShardAsync("foobar"); - ActorSelection actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); assertNotNull(actual); + assertEquals("LocalShardDataTree present", false, actual.getLocalShardDataTree().isPresent()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + assertEquals("getPrimaryShardVersion", expPrimaryVersion, actual.getPrimaryShardVersion()); - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); - - ActorSelection cachedSelection = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - assertEquals(cachedSelection, actual); + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); - // Wait for 200 Milliseconds. The cached entry should have been removed. + assertEquals(cachedInfo, actual); - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + actorContext.getPrimaryShardInfoCache().remove("foobar"); - cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); - } @Test - public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { + public void testFindPrimaryShardAsyncLocalPrimaryFound() throws Exception { TestActorRef shardManager = TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); @@ -451,61 +441,80 @@ public class ActorContextTest extends AbstractActorTest{ DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); + final DataTree mockDataTree = Mockito.mock(DataTree.class); + final String expPrimaryPath = "akka://test-system/find-primary-shard"; ActorContext actorContext = new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext) { + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { @Override protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new PrimaryNotFoundException("not found")); + return Futures.successful((Object) new LocalPrimaryShardFound(expPrimaryPath, mockDataTree)); } }; + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + PrimaryShardInfo actual = Await.result(foobar, Duration.apply(5000, TimeUnit.MILLISECONDS)); - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + assertNotNull(actual); + assertEquals("LocalShardDataTree present", true, actual.getLocalShardDataTree().isPresent()); + assertSame("LocalShardDataTree", mockDataTree, actual.getLocalShardDataTree().get()); + assertTrue("Unexpected PrimaryShardActor path " + actual.getPrimaryShardActor().path(), + expPrimaryPath.endsWith(actual.getPrimaryShardActor().pathString())); + assertEquals("getPrimaryShardVersion", DataStoreVersions.CURRENT_VERSION, actual.getPrimaryShardVersion()); - try { - Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); - fail("Expected PrimaryNotFoundException"); - } catch(PrimaryNotFoundException e){ + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - } + PrimaryShardInfo cachedInfo = Await.result(cached, FiniteDuration.apply(1, TimeUnit.MILLISECONDS)); + + assertEquals(cachedInfo, actual); + + actorContext.getPrimaryShardInfoCache().remove("foobar"); - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); assertNull(cached); } @Test - public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + public void testFindPrimaryShardAsyncPrimaryNotFound() throws Exception { + testFindPrimaryExceptions(new PrimaryNotFoundException("not found")); + } - TestActorRef shardManager = - TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + @Test + public void testFindPrimaryShardAsyncActorNotInitialized() throws Exception { + testFindPrimaryExceptions(new NotInitializedException("not initialized")); + } - DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). - shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); - - ActorContext actorContext = - new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), - mock(Configuration.class), dataStoreContext) { - @Override - protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { - return Futures.successful((Object) new NotInitializedException("not iniislized")); - } - }; + private void testFindPrimaryExceptions(final Object expectedException) throws Exception { + TestActorRef shardManager = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + DatastoreContext dataStoreContext = DatastoreContext.newBuilder().dataStoreType("config"). + shardLeaderElectionTimeout(100, TimeUnit.MILLISECONDS).build(); - Future foobar = actorContext.findPrimaryShardAsync("foobar"); + ActorContext actorContext = + new ActorContext(getSystem(), shardManager, mock(ClusterWrapper.class), + mock(Configuration.class), dataStoreContext, new PrimaryShardInfoFutureCache()) { + @Override + protected Future doAsk(ActorRef actorRef, Object message, Timeout timeout) { + return Futures.successful(expectedException); + } + }; - try { - Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); - fail("Expected NotInitializedException"); - } catch(NotInitializedException e){ + Future foobar = actorContext.findPrimaryShardAsync("foobar"); + try { + Await.result(foobar, Duration.apply(100, TimeUnit.MILLISECONDS)); + fail("Expected" + expectedException.getClass().toString()); + } catch(Exception e){ + if(!expectedException.getClass().isInstance(e)) { + fail("Expected Exception of type " + expectedException.getClass().toString()); } + } - Future cached = actorContext.getPrimaryShardActorSelectionCache().getIfPresent("foobar"); + Future cached = actorContext.getPrimaryShardInfoCache().getIfPresent("foobar"); - assertNull(cached); + assertNull(cached); } @Test @@ -516,8 +525,10 @@ public class ActorContextTest extends AbstractActorTest{ TestActorRef shardManagerActorRef = TestActorRef.create(getSystem(), MockShardManager.props()); MockShardManager shardManagerActor = shardManagerActorRef.underlyingActor(); - shardManagerActor.addFindPrimaryResp("shard1", new PrimaryFound(shardActorRef1.path().toString())); - shardManagerActor.addFindPrimaryResp("shard2", new PrimaryFound(shardActorRef2.path().toString())); + shardManagerActor.addFindPrimaryResp("shard1", new RemotePrimaryShardFound(shardActorRef1.path().toString(), + DataStoreVersions.CURRENT_VERSION)); + shardManagerActor.addFindPrimaryResp("shard2", new RemotePrimaryShardFound(shardActorRef2.path().toString(), + DataStoreVersions.CURRENT_VERSION)); shardManagerActor.addFindPrimaryResp("shard3", new NoShardLeaderException("not found")); Configuration mockConfig = mock(Configuration.class); @@ -526,7 +537,7 @@ public class ActorContextTest extends AbstractActorTest{ ActorContext actorContext = new ActorContext(getSystem(), shardManagerActorRef, mock(ClusterWrapper.class), mockConfig, - DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build()); + DatastoreContext.newBuilder().shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), new PrimaryShardInfoFutureCache()); actorContext.broadcast(new TestMessage()); @@ -535,10 +546,11 @@ public class ActorContextTest extends AbstractActorTest{ }}; } - private T expectFirstMatching(ActorRef actor, Class clazz) { + private static T expectFirstMatching(ActorRef actor, Class clazz) { int count = 5000 / 50; for(int i = 0; i < count; i++) { try { + @SuppressWarnings("unchecked") T message = (T) MessageCollectorActor.getFirstMatching(actor, clazz); if(message != null) { return message;