BUG 2584 : Block Datastore creation till Shards are ready 71/14171/4
authorMoiz Raja <moraja@cisco.com>
Thu, 15 Jan 2015 15:27:09 +0000 (07:27 -0800)
committerMoiz Raja <moraja@cisco.com>
Fri, 6 Mar 2015 21:15:38 +0000 (13:15 -0800)
Change-Id: I4bef70ea9e6b0dd2cdb82344749abc5028cf3184
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/MessageTracker.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

index 51182deb1dcd6586ee1d421d477e3072363d4a83..3029ef7e399a4db99c6ed2ad18a7e7701cb2f8ee 100644 (file)
@@ -11,6 +11,8 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
@@ -38,14 +40,22 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-    public static final int REGISTER_DATA_CHANGE_LISTENER_TIMEOUT_FACTOR = 24; // 24 times the usual operation timeout
+    private static final String UNKNOWN_TYPE = "unknown";
+
+    private static final long READY_WAIT_FACTOR = 3;
 
     private final ActorContext actorContext;
+    private final long waitTillReadyTimeInMillis;
+
 
     private AutoCloseable closeable;
 
     private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
 
+    private CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+
+    private final String type;
+
     public DistributedDataStore(ActorSystem actorSystem, ClusterWrapper cluster,
             Configuration configuration, DatastoreContext datastoreContext) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
@@ -53,7 +63,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
         Preconditions.checkNotNull(configuration, "configuration should not be null");
         Preconditions.checkNotNull(datastoreContext, "datastoreContext should not be null");
 
-        String type = datastoreContext.getDataStoreType();
+        this.type = datastoreContext.getDataStoreType();
 
         String shardManagerId = ShardManagerIdentifier.builder().type(type).build().toString();
 
@@ -63,10 +73,14 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
 
         actorContext = new ActorContext(actorSystem, actorSystem.actorOf(
-                ShardManager.props(cluster, configuration, datastoreContext)
+                ShardManager.props(cluster, configuration, datastoreContext, waitTillReadyCountDownLatch)
                         .withDispatcher(shardDispatcher).withMailbox(ActorContext.MAILBOX), shardManagerId ),
                 cluster, configuration, datastoreContext);
 
+        this.waitTillReadyTimeInMillis =
+                actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
+
+
         datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(datastoreContext.getDataStoreMXBeanType());
         datastoreConfigMXBean.setContext(datastoreContext);
         datastoreConfigMXBean.registerMBean();
@@ -74,6 +88,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.type = UNKNOWN_TYPE;
+        this.waitTillReadyTimeInMillis =
+                actorContext.getDatastoreContext().getShardLeaderElectionTimeout().duration().toMillis() * READY_WAIT_FACTOR;
+
     }
 
     public void setCloseable(AutoCloseable closeable) {
@@ -155,4 +173,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener,
     ActorContext getActorContext() {
         return actorContext;
     }
+
+    public void waitTillReady(){
+        LOG.info("Beginning to wait for data store to become ready : {}", type);
+
+        try {
+            waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS);
+
+            LOG.debug("Data store {} is now ready", type);
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted when trying to wait for shards to become leader in a reasonable amount of time - giving up");
+        }
+    }
+
+    @VisibleForTesting
+    public CountDownLatch getWaitTillReadyCountDownLatch() {
+        return waitTillReadyCountDownLatch;
+    }
 }
index ee9f4f3ad5276e5586da6135902835f0896f2b6f..8199e33294874f729dcc886fa482c9937eb73f1e 100644 (file)
@@ -40,6 +40,7 @@ public class DistributedDataStoreFactory {
         schemaService.registerSchemaContextListener(dataStore);
 
         dataStore.setCloseable(overlay);
+        dataStore.waitTillReady();
         return dataStore;
     }
 
index c509580ac657f920564cc2c4db1a43f91fa7d325..a5abd2fc69059f4af377ab85ac161372de15cbed 100644 (file)
@@ -63,6 +63,7 @@ import org.opendaylight.controller.cluster.datastore.modification.MutableComposi
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
 import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
 import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
@@ -215,7 +216,7 @@ public class Shard extends RaftActor {
     private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
         ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
             RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
-        return Optional.<ActorRef>of(shardRoleChangeNotifier);
+        return Optional.of(shardRoleChangeNotifier);
     }
 
     @Override
@@ -288,6 +289,8 @@ public class Shard extends RaftActor {
                 handleTransactionCommitTimeoutCheck();
             } else if(message instanceof DatastoreContext) {
                 onDatastoreContext((DatastoreContext)message);
+            } else if(message instanceof RegisterRoleChangeListener){
+                roleChangeNotifier.get().forward(message, context());
             } else {
                 super.onReceiveCommand(message);
             }
index 775cae35e22843cafd223bce22eb5232a230868f..d836a347c514b434db26a32cbb172d1671e99253 100644 (file)
@@ -22,6 +22,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.RecoveryFailure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -34,6 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -50,6 +52,9 @@ import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
@@ -97,10 +102,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private final DataPersistenceProvider dataPersistenceProvider;
 
+    private final CountDownLatch waitTillReadyCountdownLatch;
+
     /**
      */
     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
-            DatastoreContext datastoreContext) {
+            DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
 
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
@@ -109,6 +116,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.type = datastoreContext.getDataStoreType();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+        this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -123,12 +131,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     public static Props props(
         final ClusterWrapper cluster,
         final Configuration configuration,
-        final DatastoreContext datastoreContext) {
+        final DatastoreContext datastoreContext,
+        final CountDownLatch waitTillReadyCountdownLatch) {
 
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
+        Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
 
-        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
+        return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
     }
 
     @Override
@@ -156,12 +166,54 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             ignoreMessage(message);
         } else if(message instanceof DatastoreContext) {
             onDatastoreContext((DatastoreContext)message);
+        } else if(message instanceof RoleChangeNotification){
+            onRoleChangeNotification((RoleChangeNotification) message);
         } else{
             unknownMessage(message);
         }
 
     }
 
+    private void onRoleChangeNotification(RoleChangeNotification message) {
+        RoleChangeNotification roleChanged = message;
+        LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
+                roleChanged.getOldRole(), roleChanged.getNewRole());
+
+        ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
+        if(shardInformation != null) {
+            shardInformation.setRole(roleChanged.getNewRole());
+
+            if (isReady()) {
+                LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
+                        waitTillReadyCountdownLatch.getCount());
+
+                waitTillReadyCountdownLatch.countDown();
+            }
+        }
+    }
+
+
+    private ShardInformation findShardInformation(String memberId) {
+        for(ShardInformation info : localShards.values()){
+            if(info.getShardId().toString().equals(memberId)){
+                return info;
+            }
+        }
+
+        return null;
+    }
+
+    private boolean isReady() {
+        boolean isReady = true;
+        for (ShardInformation info : localShards.values()) {
+            if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
+                isReady = false;
+                break;
+            }
+        }
+        return isReady;
+    }
+
     private void onActorInitialized(Object message) {
         final ActorRef sender = getSender();
 
@@ -305,11 +357,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     for (ShardInformation info : localShards.values()) {
                         if (info.getActor() == null) {
                             info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
-                                            info.getPeerAddresses(), datastoreContext, schemaContext)
+                                    info.getPeerAddresses(), datastoreContext, schemaContext)
                                             .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
                         } else {
                             info.getActor().tell(message, getSelf());
                         }
+                        info.getActor().tell(new RegisterRoleChangeListener(), self());
                     }
                 }
 
@@ -477,6 +530,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private boolean actorInitialized = false;
 
         private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
+        private String role ;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
                 Map<ShardIdentifier, String> peerAddresses) {
@@ -544,6 +598,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void addRunnableOnInitialized(Runnable runnable) {
             runnablesOnInitialized.add(runnable);
         }
+
+        public void setRole(String newRole) {
+            this.role = newRole;
+        }
+
+        public String getRole(){
+            return this.role;
+        }
+
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
@@ -552,17 +615,19 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         final ClusterWrapper cluster;
         final Configuration configuration;
         final DatastoreContext datastoreContext;
+        private final CountDownLatch waitTillReadyCountdownLatch;
 
         ShardManagerCreator(ClusterWrapper cluster,
-                Configuration configuration, DatastoreContext datastoreContext) {
+                            Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
             this.cluster = cluster;
             this.configuration = configuration;
             this.datastoreContext = datastoreContext;
+            this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(cluster, configuration, datastoreContext);
+            return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
         }
     }
 
index 2757d2f5f68eef469361bd940866d2705b5494cc..74e61c189f828bb9228152ada1dedae2b16c53e3 100644 (file)
@@ -217,7 +217,8 @@ public class MessageTracker {
         boolean done = true;
 
         public void reset(){
-            Preconditions.checkState(done);
+            Preconditions.checkState(done,
+                    String.format("Trying to reset a context that is not done (%s). currentMessage = %s", done, currentMessage));
             done = false;
             stopwatch.reset().start();
         }
index 4ec035ee3b52308e6390d9a2e4dece6703438318..3034004bb0ad2209f43602749206a36f187c67ba 100644 (file)
@@ -1,8 +1,13 @@
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import akka.util.Timeout;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mock;
@@ -10,6 +15,7 @@ import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.FiniteDuration;
 
 public class DistributedDataStoreTest extends AbstractActorTest {
 
@@ -18,6 +24,12 @@ public class DistributedDataStoreTest extends AbstractActorTest {
     @Mock
     private ActorContext actorContext;
 
+    @Mock
+    private DatastoreContext datastoreContext;
+
+    @Mock
+    private Timeout shardElectionTimeout;
+
     @Before
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
@@ -58,4 +70,45 @@ public class DistributedDataStoreTest extends AbstractActorTest {
         verify(actorContext, times(0)).acquireTxCreationPermit();
     }
 
+    @Test
+    public void testWaitTillReadyBlocking(){
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
+        doReturn(FiniteDuration.apply(50, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
+        DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+
+        long start = System.currentTimeMillis();
+
+        distributedDataStore.waitTillReady();
+
+        long end = System.currentTimeMillis();
+
+        assertTrue("Expected to be blocked for 50 millis", (end-start) >= 50);
+    }
+
+    @Test
+    public void testWaitTillReadyCountDown(){
+        final DistributedDataStore distributedDataStore = new DistributedDataStore(actorContext);
+        doReturn(datastoreContext).when(actorContext).getDatastoreContext();
+        doReturn(shardElectionTimeout).when(datastoreContext).getShardLeaderElectionTimeout();
+        doReturn(FiniteDuration.apply(5000, TimeUnit.MILLISECONDS)).when(shardElectionTimeout).duration();
+
+        Executors.newSingleThreadExecutor().submit(new Runnable() {
+            @Override
+            public void run() {
+                Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+                distributedDataStore.getWaitTillReadyCountDownLatch().countDown();
+            }
+        });
+
+        long start = System.currentTimeMillis();
+
+        distributedDataStore.waitTillReady();
+
+        long end = System.currentTimeMillis();
+
+        assertTrue("Expected to be released in 500 millis", (end-start) < 5000);
+
+    }
+
 }
\ No newline at end of file
index 596761ddc8fa9e9d25b4c797c2f814f5a1c63a09..f0cdacc9ef2bc0e08ccb0348a8d990279de8f57a 100644 (file)
@@ -4,6 +4,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import akka.actor.ActorRef;
 import akka.actor.Props;
@@ -26,6 +29,8 @@ import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
@@ -41,6 +46,8 @@ import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
+import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
+import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -53,10 +60,15 @@ public class ShardManagerTest extends AbstractActorTest {
     private final String shardMrgIDSuffix = "config" + ID_COUNTER++;
     private final String shardMgrID = "shard-manager-" + shardMrgIDSuffix;
 
+    @Mock
+    private static CountDownLatch ready;
+
     private static ActorRef mockShardActor;
 
     @Before
     public void setUp() {
+        MockitoAnnotations.initMocks(this);
+
         InMemoryJournal.clear();
 
         if(mockShardActor == null) {
@@ -71,10 +83,10 @@ public class ShardManagerTest extends AbstractActorTest {
     }
 
     private Props newShardMgrProps() {
-
         DatastoreContext.Builder builder = DatastoreContext.newBuilder();
         builder.dataStoreType(shardMrgIDSuffix);
-        return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(), builder.build());
+        return ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
+                builder.build(), ready);
     }
 
     @Test
@@ -351,8 +363,10 @@ public class ShardManagerTest extends AbstractActorTest {
     public void testRecoveryApplicable(){
         new JavaTestKit(getSystem()) {
             {
-                final Props persistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(true).dataStoreType(shardMrgIDSuffix).build());
+                final Props persistentProps = ShardManager.props(
+                        new MockClusterWrapper(),
+                        new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(true).build(), ready);
                 final TestActorRef<ShardManager> persistentShardManager =
                         TestActorRef.create(getSystem(), persistentProps);
 
@@ -360,8 +374,10 @@ public class ShardManagerTest extends AbstractActorTest {
 
                 assertTrue("Recovery Applicable", dataPersistenceProvider1.isRecoveryApplicable());
 
-                final Props nonPersistentProps = ShardManager.props(new MockClusterWrapper(), new MockConfiguration(),
-                        DatastoreContext.newBuilder().persistent(false).dataStoreType(shardMrgIDSuffix).build());
+                final Props nonPersistentProps = ShardManager.props(
+                        new MockClusterWrapper(),
+                        new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(false).build(), ready);
                 final TestActorRef<ShardManager> nonPersistentShardManager =
                         TestActorRef.create(getSystem(), nonPersistentProps);
 
@@ -382,8 +398,7 @@ public class ShardManagerTest extends AbstractActorTest {
             private static final long serialVersionUID = 1L;
             @Override
             public ShardManager create() throws Exception {
-                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
-                        DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build()) {
+                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(), ready) {
                     @Override
                     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
                         DataPersistenceProviderMonitor dataPersistenceProviderMonitor
@@ -417,6 +432,42 @@ public class ShardManagerTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRoleChangeNotificationReleaseReady() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                final Props persistentProps = ShardManager.props(
+                        new MockClusterWrapper(),
+                        new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(true).build(), ready);
+                final TestActorRef<ShardManager> shardManager =
+                        TestActorRef.create(getSystem(), persistentProps);
+
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("member-1-shard-default-unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+
+                verify(ready, times(1)).countDown();
+
+            }};
+    }
+
+    @Test
+    public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
+        new JavaTestKit(getSystem()) {
+            {
+                final Props persistentProps = ShardManager.props(
+                        new MockClusterWrapper(),
+                        new MockConfiguration(),
+                        DatastoreContext.newBuilder().persistent(true).build(), ready);
+                final TestActorRef<ShardManager> shardManager =
+                        TestActorRef.create(getSystem(), persistentProps);
+
+                shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification("unknown", RaftState.Candidate.name(), RaftState.Leader.name()));
+
+                verify(ready, never()).countDown();
+
+            }};
+    }
+
 
 
     private static class TestShardManager extends ShardManager {
@@ -424,7 +475,7 @@ public class ShardManagerTest extends AbstractActorTest {
 
         TestShardManager(String shardMrgIDSuffix) {
             super(new MockClusterWrapper(), new MockConfiguration(),
-                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build());
+                    DatastoreContext.newBuilder().dataStoreType(shardMrgIDSuffix).build(), ready);
         }
 
         @Override
index 1ebd1b91dd474bf14ce0c5d9c32a9eb8cce64d84..7dfbd668b811231b4b32edd76a04987c671aba99 100644 (file)
@@ -32,6 +32,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -69,8 +70,11 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat
 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
+import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
 import org.opendaylight.controller.cluster.datastore.utils.MockDataChangeListener;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListenerReply;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
@@ -1591,6 +1595,33 @@ public class ShardTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testRegisterRoleChangeListener() throws Exception {
+        new ShardTestKit(getSystem()) {
+            {
+                final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                        newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                        "testRegisterRoleChangeListener");
+
+                waitUntilLeader(shard);
+
+                TestActorRef<MessageCollectorActor> listener =
+                        TestActorRef.create(getSystem(), Props.create(MessageCollectorActor.class));
+
+                shard.tell(new RegisterRoleChangeListener(), listener);
+
+                // TODO: MessageCollectorActor exists as a test util in both the akka-raft and distributed-datastore
+                // projects. Need to move it to commons as a regular utility and then we can get rid of this arbitrary
+                // sleep.
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+
+                List<Object> allMatching = MessageCollectorActor.getAllMatching(listener, RegisterRoleChangeListenerReply.class);
+
+                assertEquals(1, allMatching.size());
+            }};
+    }
+
+
     private NormalizedNode<?, ?> readStore(final InMemoryDOMDataStore store) throws ReadFailedException {
         DOMStoreReadTransaction transaction = store.newReadOnlyTransaction();
         CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read =