From: Moiz Raja Date: Thu, 15 Jan 2015 15:27:09 +0000 (-0800) Subject: BUG 2584 : Block Datastore creation till Shards are ready X-Git-Tag: release/lithium~451 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=8177f76b3e8bd802cc8e7a05ba3f192f219ab0ee;hp=95771c01e72fca4414c3b4d75734126d7c53f6df BUG 2584 : Block Datastore creation till Shards are ready Change-Id: I4bef70ea9e6b0dd2cdb82344749abc5028cf3184 Signed-off-by: Moiz Raja --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index 51182deb1d..3029ef7e39 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -11,6 +11,8 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorSystem; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier; import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl; import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; @@ -38,14 +40,22 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, DatastoreContextConfigAdminOverlay.Listener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class); - public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout + private static final String UNKNOWN_TYPE = "unknown"; + + private static final long READY_WAIT_FACTOR = 3; private final ActorContext actorContext; + private final long waitTillReadyTimeInMillis; + private AutoCloseable closeable; private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean; + private CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1); + + private final String type; + public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext) { Preconditions.checkNotNull(actorSystem, "actorSystem should not be null"); @@ -53,7 +63,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Preconditions.checkNotNull(configuration, "configuration should not be null"); Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null"); - String type = datastoreContext.getDataStoreType(); + this.type = datastoreContext.getDataStoreType(); String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString(); @@ -63,10 +73,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); actorContext = new ActorContext(actorSystem, actorSystem.actorOf( - ShardManager.props(cluster, configuration, datastoreContext) + ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch) .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ), cluster, configuration, datastoreContext); + this.waitTillReadyTimeInMillis = + actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; + + datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType()); datastoreConfigMXBean.setContext(datastoreContext); datastoreConfigMXBean.registerMBean(); @@ -74,6 +88,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, public DistributedDataStore(ActorContext actorContext) { this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null"); + this.type = UNKNOWN_TYPE; + this.waitTillReadyTimeInMillis = + actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR; + } public void setCloseable(AutoCloseable closeable) { @@ -155,4 +173,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, ActorContext getActorContext() { return actorContext; } + + public void waitTillReady(){ + LOG.info("Beginning to wait for data store to become ready : {}", type); + + try { + waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS); + + LOG.debug("Data store {} is now ready", type); + } catch (InterruptedException e) { + LOG.error("Interrupted when trying to wait for shards to become leader in a reasonable amount of time - giving up"); + } + } + + @VisibleForTesting + public CountDownLatch getWaitTillReadyCountDownLatch() { + return waitTillReadyCountDownLatch; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java index ee9f4f3ad5..8199e33294 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java @@ -40,6 +40,7 @@ public class DistributedDataStoreFactory { schemaService.registerSchemaContextListener(dataStore); dataStore.setCloseable(overlay); + dataStore.waitTillReady(); return dataStore; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c509580ac6..a5abd2fc69 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -63,6 +63,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.datastore.utils.MessageTracker; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; import org.opendaylight.controller.cluster.raft.RaftActor; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; @@ -215,7 +216,7 @@ public class Shard extends RaftActor { private Optional createRoleChangeNotifier(String shardId) { ActorRef shardRoleChangeNotifier = this.getContext().actorOf( RoleChangeNotifier.getProps(shardId), shardId + "-notifier"); - return Optional.of(shardRoleChangeNotifier); + return Optional.of(shardRoleChangeNotifier); } @Override @@ -288,6 +289,8 @@ public class Shard extends RaftActor { handleTransactionCommitTimeoutCheck(); } else if(message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); + } else if(message instanceof RegisterRoleChangeListener){ + roleChangeNotifier.get().forward(message, context()); } else { super.onReceiveCommand(message); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 775cae35e2..d836a347c5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -22,6 +22,7 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.RecoveryFailure; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -34,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; @@ -50,6 +52,9 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; @@ -97,10 +102,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private final DataPersistenceProvider dataPersistenceProvider; + private final CountDownLatch waitTillReadyCountdownLatch; + /** */ protected ShardManager(ClusterWrapper cluster, Configuration configuration, - DatastoreContext datastoreContext) { + DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); @@ -109,6 +116,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { this.type = datastoreContext.getDataStoreType(); this.shardDispatcherPath = new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard); + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; // Subscribe this actor to cluster member events cluster.subscribeToMemberEvents(getSelf()); @@ -123,12 +131,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { public static Props props( final ClusterWrapper cluster, final Configuration configuration, - final DatastoreContext datastoreContext) { + final DatastoreContext datastoreContext, + final CountDownLatch waitTillReadyCountdownLatch) { Preconditions.checkNotNull(cluster, "cluster should not be null"); Preconditions.checkNotNull(configuration, "configuration should not be null"); + Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null"); - return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext)); + return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch)); } @Override @@ -156,12 +166,54 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ignoreMessage(message); } else if(message instanceof DatastoreContext) { onDatastoreContext((DatastoreContext)message); + } else if(message instanceof RoleChangeNotification){ + onRoleChangeNotification((RoleChangeNotification) message); } else{ unknownMessage(message); } } + private void onRoleChangeNotification(RoleChangeNotification message) { + RoleChangeNotification roleChanged = message; + LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(), + roleChanged.getOldRole(), roleChanged.getNewRole()); + + ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId()); + if(shardInformation != null) { + shardInformation.setRole(roleChanged.getNewRole()); + + if (isReady()) { + LOG.info("All Shards are ready - data store {} is ready, available count is {}", type, + waitTillReadyCountdownLatch.getCount()); + + waitTillReadyCountdownLatch.countDown(); + } + } + } + + + private ShardInformation findShardInformation(String memberId) { + for(ShardInformation info : localShards.values()){ + if(info.getShardId().toString().equals(memberId)){ + return info; + } + } + + return null; + } + + private boolean isReady() { + boolean isReady = true; + for (ShardInformation info : localShards.values()) { + if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){ + isReady = false; + break; + } + } + return isReady; + } + private void onActorInitialized(Object message) { final ActorRef sender = getSender(); @@ -305,11 +357,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for (ShardInformation info : localShards.values()) { if (info.getActor() == null) { info.setActor(getContext().actorOf(Shard.props(info.getShardId(), - info.getPeerAddresses(), datastoreContext, schemaContext) + info.getPeerAddresses(), datastoreContext, schemaContext) .withDispatcher(shardDispatcherPath), info.getShardId().toString())); } else { info.getActor().tell(message, getSelf()); } + info.getActor().tell(new RegisterRoleChangeListener(), self()); } } @@ -477,6 +530,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private boolean actorInitialized = false; private final List runnablesOnInitialized = Lists.newArrayList(); + private String role ; private ShardInformation(String shardName, ShardIdentifier shardId, Map peerAddresses) { @@ -544,6 +598,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void addRunnableOnInitialized(Runnable runnable) { runnablesOnInitialized.add(runnable); } + + public void setRole(String newRole) { + this.role = newRole; + } + + public String getRole(){ + return this.role; + } + } private static class ShardManagerCreator implements Creator { @@ -552,17 +615,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { final ClusterWrapper cluster; final Configuration configuration; final DatastoreContext datastoreContext; + private final CountDownLatch waitTillReadyCountdownLatch; ShardManagerCreator(ClusterWrapper cluster, - Configuration configuration, DatastoreContext datastoreContext) { + Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) { this.cluster = cluster; this.configuration = configuration; this.datastoreContext = datastoreContext; + this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch; } @Override public ShardManager create() throws Exception { - return new ShardManager(cluster, configuration, datastoreContext); + return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java index 2757d2f5f6..74e61c189f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java @@ -217,7 +217,8 @@ public class MessageTracker { boolean done = true; public void reset(){ - Preconditions.checkState(done); + Preconditions.checkState(done, + String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage)); done = false; stopwatch.reset().start(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 4ec035ee3b..3034004bb0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -1,8 +1,13 @@ package org.opendaylight.controller.cluster.datastore; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import akka.util.Timeout; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -10,6 +15,7 @@ import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.duration.FiniteDuration; public class DistributedDataStoreTest extends AbstractActorTest { @@ -18,6 +24,12 @@ public class DistributedDataStoreTest extends AbstractActorTest { @Mock private ActorContext actorContext; + @Mock + private DatastoreContext datastoreContext; + + @Mock + private Timeout shardElectionTimeout; + @Before public void setUp() throws Exception { MockitoAnnotations.initMocks(this); @@ -58,4 +70,45 @@ public class DistributedDataStoreTest extends AbstractActorTest { verify(actorContext, times(0)).acquireTxCreationPermit(); } + @Test + public void testWaitTillReadyBlocking(){ + doReturn(datastoreContext).when(actorContext).getDatastoreContext(); + doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout(); + doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration(); + DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext); + + long start = System.currentTimeMillis(); + + distributedDataStore.waitTillReady(); + + long end = System.currentTimeMillis(); + + assertTrue("Expected to be blocked for 50 millis", (end-start) >= 50); + } + + @Test + public void testWaitTillReadyCountDown(){ + final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext); + doReturn(datastoreContext).when(actorContext).getDatastoreContext(); + doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout(); + doReturn(FiniteDuration.apply(5000, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration(); + + Executors.newSingleThreadExecutor().submit(new Runnable() { + @Override + public void run() { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + distributedDataStore.getWaitTillReadyCountDownLatch().countDown(); + } + }); + + long start = System.currentTimeMillis(); + + distributedDataStore.waitTillReady(); + + long end = System.currentTimeMillis(); + + assertTrue("Expected to be released in 500 millis", (end-start) < 5000); + + } + } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 596761ddc8..f0cdacc9ef 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -4,6 +4,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import akka.actor.ActorRef; import akka.actor.Props; @@ -26,6 +29,8 @@ import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized; @@ -41,6 +46,8 @@ import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor; import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; +import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; +import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -53,10 +60,15 @@ public class ShardManagerTest extends AbstractActorTest { private final String shardMrgIDSuffix = "config" + ID_COUNTER++; private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix; + @Mock + private static CountDownLatch ready; + private static ActorRef mockShardActor; @Before public void setUp() { + MockitoAnnotations.initMocks(this); + InMemoryJournal.clear(); if(mockShardActor == null) { @@ -71,10 +83,10 @@ public class ShardManagerTest extends AbstractActorTest { } private Props newShardMgrProps() { - DatastoreContext.Builder builder = DatastoreContext.newBuilder(); builder.dataStoreType(shardMrgIDSuffix); - return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build()); + return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), + builder.build(), ready); } @Test @@ -351,8 +363,10 @@ public class ShardManagerTest extends AbstractActorTest { public void testRecoveryApplicable(){ new JavaTestKit(getSystem()) { { - final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build()); + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); final TestActorRef persistentShardManager = TestActorRef.create(getSystem(), persistentProps); @@ -360,8 +374,10 @@ public class ShardManagerTest extends AbstractActorTest { assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable()); - final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build()); + final Props nonPersistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(false).build(), ready); final TestActorRef nonPersistentShardManager = TestActorRef.create(getSystem(), nonPersistentProps); @@ -382,8 +398,7 @@ public class ShardManagerTest extends AbstractActorTest { private static final long serialVersionUID = 1L; @Override public ShardManager create() throws Exception { - return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) { + return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) { @Override protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) { DataPersistenceProviderMonitor dataPersistenceProviderMonitor @@ -417,6 +432,42 @@ public class ShardManagerTest extends AbstractActorTest { }}; } + @Test + public void testRoleChangeNotificationReleaseReady() throws Exception { + new JavaTestKit(getSystem()) { + { + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + + verify(ready, times(1)).countDown(); + + }}; + } + + @Test + public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception { + new JavaTestKit(getSystem()) { + { + final Props persistentProps = ShardManager.props( + new MockClusterWrapper(), + new MockConfiguration(), + DatastoreContext.newBuilder().persistent(true).build(), ready); + final TestActorRef shardManager = + TestActorRef.create(getSystem(), persistentProps); + + shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name())); + + verify(ready, never()).countDown(); + + }}; + } + private static class TestShardManager extends ShardManager { @@ -424,7 +475,7 @@ public class ShardManagerTest extends AbstractActorTest { TestShardManager(String shardMrgIDSuffix) { super(new MockClusterWrapper(), new MockConfiguration(), - DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()); + DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 1ebd1b91dd..7dfbd668b8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -32,6 +32,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -69,8 +70,11 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal; import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore; +import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor; import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; +import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; @@ -1591,6 +1595,33 @@ public class ShardTest extends AbstractActorTest { }}; } + @Test + public void testRegisterRoleChangeListener() throws Exception { + new ShardTestKit(getSystem()) { + { + final TestActorRef shard = TestActorRef.create(getSystem(), + newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), + "testRegisterRoleChangeListener"); + + waitUntilLeader(shard); + + TestActorRef listener = + TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class)); + + shard.tell(new RegisterRoleChangeListener(), listener); + + // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore + // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary + // sleep. + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + + List allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class); + + assertEquals(1, allMatching.size()); + }}; + } + + private NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); CheckedFuture>, ReadFailedException> read =