Merge "BUG 2584 : Datastore is ready when all local shards have a leader"
authorTom Pantelis <tpanteli@brocade.com>
Fri, 27 Mar 2015 12:05:08 +0000 (12:05 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 27 Mar 2015 12:05:08 +0000 (12:05 +0000)
1  2 
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index 55a86ceeea966aab6d67f2b3ba1692d772e74640,1632d5646640101ed47a8d1b0e50417b39686a0d..52762b4eb352ff9de295e44969b13c4410b7f2f0
@@@ -43,19 -43,19 +43,19 @@@ import java.util.concurrent.CountDownLa
  import org.opendaylight.controller.cluster.DataPersistenceProvider;
  import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
  import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
  import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
  import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
  import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
  import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
  import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
  import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
  import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
  import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
  import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
  import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
  import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
  import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@@ -96,8 -96,6 +96,8 @@@ public class ShardManager extends Abstr
      // A data store could be of type config/operational
      private final String type;
  
 +    private final String shardManagerIdentifierString;
 +
      private final ClusterWrapper cluster;
  
      private final Configuration configuration;
          this.datastoreContext = datastoreContext;
          this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
          this.type = datastoreContext.getDataStoreType();
 +        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
          this.shardDispatcherPath =
                  new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
          this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
  
      @Override
      public void handleCommand(Object message) throws Exception {
 -        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
 -            findPrimary(FindPrimary.fromSerializable(message));
 +        if (message  instanceof FindPrimary) {
 +            findPrimary((FindPrimary)message);
          } else if(message instanceof FindLocalShard){
              findLocalShard((FindLocalShard) message);
          } else if (message instanceof UpdateSchemaContext) {
          ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
          if(shardInformation != null) {
              shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
+             if (isReadyWithLeaderId()) {
+                 LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
+                         persistenceId(), type, waitTillReadyCountdownLatch.getCount());
+                 waitTillReadyCountdownLatch.countDown();
+             }
          } else {
              LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
          }
          ShardInformation shardInfo = message.getShardInfo();
  
          LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
 -                shardInfo.getShardId());
 +                shardInfo.getShardName());
  
          shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
  
          if(!shardInfo.isShardInitialized()) {
 -            message.getSender().tell(new ActorNotInitialized(), getSelf());
 +            LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
 +            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
          } else {
 +            LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
              message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
          }
      }
          if(shardInformation != null) {
              shardInformation.setRole(roleChanged.getNewRole());
  
-             if (isReady()) {
+             if (isReadyWithLeaderId()) {
                  LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
                          persistenceId(), type, waitTillReadyCountdownLatch.getCount());
  
          return null;
      }
  
-     private boolean isReady() {
+     private boolean isReadyWithLeaderId() {
          boolean isReady = true;
          for (ShardInformation info : localShards.values()) {
-             if(!info.isShardReady()){
+             if(!info.isShardReadyWithLeaderId()){
                  isReady = false;
                  break;
              }
      }
  
      private void markShardAsInitialized(String shardName) {
 -        LOG.debug("Initializing shard [{}]", shardName);
 +        LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
  
          ShardInformation shardInformation = localShards.get(shardName);
          if (shardInformation != null) {
  
                  shardInformation.addOnShardInitialized(onShardInitialized);
  
 +                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
 +
                  Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                          datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
                          new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
                  onShardInitialized.setTimeoutSchedule(timeoutSchedule);
  
              } else if (!shardInformation.isShardInitialized()) {
 -                getSender().tell(new ActorNotInitialized(), getSelf());
 +                LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
 +                        shardInformation.getShardName());
 +                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
              } else {
 +                LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
 +                        shardInformation.getShardName());
                  getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
              }
  
                  "recovering and a leader is being elected. Try again later.", shardId));
      }
  
 +    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
 +        return new NotInitializedException(String.format(
 +                "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
 +    }
 +
      private void memberRemoved(ClusterEvent.MemberRemoved message) {
 +        String memberName = message.member().roles().head();
 +
 +        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
 +                message.member().address());
 +
          memberNameToAddress.remove(message.member().roles().head());
      }
  
      private void memberUp(ClusterEvent.MemberUp message) {
          String memberName = message.member().roles().head();
  
 +        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
 +                message.member().address());
 +
          memberNameToAddress.put(memberName, message.member().address());
  
          for(ShardInformation info : localShards.values()){
  
      }
  
 +    @VisibleForTesting
 +    protected ClusterWrapper getCluster() {
 +        return cluster;
 +    }
 +
      @VisibleForTesting
      protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
          return getContext().actorOf(Shard.props(info.getShardId(),
      }
  
      private void findPrimary(FindPrimary message) {
 +        LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
 +
          final String shardName = message.getShardName();
  
          // First see if the there is a local replica for the shard
              sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                  @Override
                  public Object get() {
 -                    Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
 +                    Object found = new PrimaryFound(info.getSerializedLeaderActor());
  
                      if(LOG.isDebugEnabled()) {
 -                        LOG.debug("{}: Found primary for {}: {}", shardName, found);
 +                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
                      }
  
                      return found;
              return;
          }
  
 -        List<String> members = configuration.getMembersFromShardName(shardName);
 +        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
 +            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
 +                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
  
 -        if(cluster.getCurrentMemberName() != null) {
 -            members.remove(cluster.getCurrentMemberName());
 -        }
 +                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
 +                        shardName, path);
  
 -        /**
 -         * FIXME: Instead of sending remote shard actor path back to sender,
 -         * forward FindPrimary message to remote shard manager
 -         */
 -        // There is no way for us to figure out the primary (for now) so assume
 -        // that one of the remote nodes is a primary
 -        for(String memberName : members) {
 -            Address address = memberNameToAddress.get(memberName);
 -            if(address != null){
 -                String path =
 -                    getShardActorPath(shardName, memberName);
 -                getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
 +                getContext().actorSelection(path).forward(message, getContext());
                  return;
              }
          }
 -        getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
 +
 +        LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
 +
 +        getSender().tell(new PrimaryNotFoundException(
 +                String.format("No primary shard found for %s.", shardName)), getSelf());
 +    }
 +
 +    private StringBuilder getShardManagerActorPathBuilder(Address address) {
 +        StringBuilder builder = new StringBuilder();
 +        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
 +        return builder;
      }
  
      private String getShardActorPath(String shardName, String memberName) {
          Address address = memberNameToAddress.get(memberName);
          if(address != null) {
 -            StringBuilder builder = new StringBuilder();
 -            builder.append(address.toString())
 -                .append("/user/")
 -                .append(ShardManagerIdentifier.builder().type(type).build().toString())
 -                .append("/")
 +            StringBuilder builder = getShardManagerActorPathBuilder(address);
 +            builder.append("/")
                  .append(getShardIdentifier(memberName, shardName));
              return builder.toString();
          }
index 95b1b78a198e511d15347637f2f1558b86d81e65,578bf1d1725bc34053939f5be4a541c5df0156d2..b676cf225c801039d1e679298e6d7cb23d2808ac
@@@ -9,23 -9,16 +9,23 @@@ import static org.mockito.Mockito.times
  import static org.mockito.Mockito.verify;
  import static org.mockito.Mockito.when;
  import akka.actor.ActorRef;
 +import akka.actor.ActorSystem;
 +import akka.actor.AddressFromURIString;
  import akka.actor.Props;
 +import akka.cluster.Cluster;
 +import akka.cluster.ClusterEvent;
 +import akka.dispatch.Dispatchers;
  import akka.japi.Creator;
  import akka.pattern.Patterns;
  import akka.persistence.RecoveryCompleted;
  import akka.testkit.JavaTestKit;
  import akka.testkit.TestActorRef;
  import akka.util.Timeout;
 +import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Sets;
  import com.google.common.util.concurrent.Uninterruptibles;
 +import com.typesafe.config.ConfigFactory;
  import java.net.URI;
  import java.util.Arrays;
  import java.util.Collection;
@@@ -42,16 -35,15 +42,16 @@@ import org.mockito.Mock
  import org.mockito.MockitoAnnotations;
  import org.opendaylight.controller.cluster.DataPersistenceProvider;
  import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 +import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
 +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
  import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 +import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
  import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
 -import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
  import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
  import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
  import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
  import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
  import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 -import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
  import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
  import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor;
  import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
@@@ -83,11 -75,6 +83,11 @@@ public class ShardManagerTest extends A
      private final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
              dataStoreType(shardMrgIDSuffix).shardInitializationTimeout(600, TimeUnit.MILLISECONDS);
  
 +    private static ActorRef newMockShardActor(ActorSystem system, String shardName, String memberName) {
 +        String name = new ShardIdentifier(shardName, memberName,"config").toString();
 +        return TestActorRef.create(system, Props.create(MessageCollectorActor.class), name);
 +    }
 +
      @Before
      public void setUp() {
          MockitoAnnotations.initMocks(this);
      }
  
      private Props newPropsShardMgrWithMockShardActor() {
 +        return newPropsShardMgrWithMockShardActor("shardManager", mockShardActor, new MockClusterWrapper(),
 +                new MockConfiguration());
 +    }
 +
 +    private Props newPropsShardMgrWithMockShardActor(final String name, final ActorRef shardActor,
 +            final ClusterWrapper clusterWrapper, final Configuration config) {
          Creator<ShardManager> creator = new Creator<ShardManager>() {
              private static final long serialVersionUID = 1L;
              @Override
              public ShardManager create() throws Exception {
 -                return new ShardManager(new MockClusterWrapper(), new MockConfiguration(),
 -                        datastoreContextBuilder.build(), ready) {
 -                    @Override
 -                    protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
 -                        return mockShardActor;
 -                    }
 -                };
 +                return new ForwardingShardManager(clusterWrapper, config, datastoreContextBuilder.build(),
 +                        ready, name, shardActor);
              }
          };
  
 -        return Props.create(new DelegatingShardManagerCreator(creator));
 +        return Props.create(new DelegatingShardManagerCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId());
      }
  
      @Test
  
              shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
  
 -            shardManager.tell(new FindPrimary("non-existent", false).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary("non-existent", false), getRef());
  
 -            expectMsgEquals(duration("5 seconds"), new PrimaryNotFound("non-existent").toSerializable());
 +            expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
          }};
      }
  
              shardManager.tell((new RoleChangeNotification(memberId, RaftState.Candidate.name(),
                      RaftState.Leader.name())), mockShardActor);
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
  
 -            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 +            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
              assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                      primaryFound.getPrimaryPath().contains("member-1-shard-default"));
          }};
                      RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
              shardManager.tell(new LeaderStateChanged(memberId1, memberId2), mockShardActor);
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
  
 -            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 +            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
              assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                      primaryFound.getPrimaryPath().contains("member-2-shard-default"));
          }};
          new JavaTestKit(getSystem()) {{
              final ActorRef shardManager = getSystem().actorOf(newPropsShardMgrWithMockShardActor());
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
  
 -            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
 +            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
          }};
      }
  
              shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
              shardManager.tell(new ActorInitialized(), mockShardActor);
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
  
              expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
          }};
              shardManager.tell(new RoleChangeNotification(memberId,
                      RaftState.Candidate.name(), RaftState.Follower.name()), mockShardActor);
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
  
              expectMsgClass(duration("5 seconds"), NoShardLeaderException.class);
  
              shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, false), getRef());
  
 -            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 +            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
              assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                      primaryFound.getPrimaryPath().contains("member-1-shard-default"));
          }};
  
              // We're passing waitUntilInitialized = true to FindPrimary so the response should be
              // delayed until we send ActorInitialized and RoleChangeNotification.
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
  
              expectNoMsg(FiniteDuration.create(150, TimeUnit.MILLISECONDS));
  
  
              shardManager.tell(new LeaderStateChanged(memberId, memberId), mockShardActor);
  
 -            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 +            PrimaryFound primaryFound = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
              assertTrue("Unexpected primary path " +  primaryFound.getPrimaryPath(),
                      primaryFound.getPrimaryPath().contains("member-1-shard-default"));
  
  
              shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
  
 -            expectMsgClass(duration("2 seconds"), ActorNotInitialized.class);
 +            expectMsgClass(duration("2 seconds"), NotInitializedException.class);
  
              shardManager.tell(new ActorInitialized(), mockShardActor);
  
              shardManager.tell(new RoleChangeNotification("member-1-shard-default-" + shardMrgIDSuffix,
                      null, RaftState.Candidate.name()), mockShardActor);
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
  
              expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
          }};
              shardManager.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
              shardManager.tell(new ActorInitialized(), mockShardActor);
  
 -            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true).toSerializable(), getRef());
 +            shardManager.tell(new FindPrimary(Shard.DEFAULT_NAME, true), getRef());
  
              expectMsgClass(duration("2 seconds"), NoShardLeaderException.class);
          }};
      }
  
 +    @Test
 +    public void testOnReceiveFindPrimaryForRemoteShard() throws Exception {
 +        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
 +
 +        // Create an ActorSystem ShardManager actor for member-1.
 +
 +        final ActorSystem system1 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
 +        Cluster.get(system1).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
 +
 +        ActorRef mockShardActor1 = newMockShardActor(system1, Shard.DEFAULT_NAME, "member-1");
 +
 +        final TestActorRef<ForwardingShardManager> shardManager1 = TestActorRef.create(system1,
 +                newPropsShardMgrWithMockShardActor("shardManager1", mockShardActor1, new ClusterWrapperImpl(system1),
 +                        new MockConfiguration()), shardManagerID);
 +
 +        // Create an ActorSystem ShardManager actor for member-2.
 +
 +        final ActorSystem system2 = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
 +
 +        Cluster.get(system2).join(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
 +
 +        final ActorRef mockShardActor2 = newMockShardActor(system2, "astronauts", "member-2");
 +
 +        MockConfiguration mockConfig2 = new MockConfiguration(ImmutableMap.<String, List<String>>builder().
 +                put("default", Arrays.asList("member-1", "member-2")).
 +                put("astronauts", Arrays.asList("member-2")).build());
 +
 +        final TestActorRef<ForwardingShardManager> shardManager2 = TestActorRef.create(system2,
 +                newPropsShardMgrWithMockShardActor("shardManager2", mockShardActor2, new ClusterWrapperImpl(system2),
 +                        mockConfig2), shardManagerID);
 +
 +        new JavaTestKit(system1) {{
 +
 +            shardManager1.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 +            shardManager2.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 +
 +            shardManager2.tell(new ActorInitialized(), mockShardActor2);
 +
 +            String memberId2 = "member-2-shard-astronauts-" + shardMrgIDSuffix;
 +            shardManager2.tell(new LeaderStateChanged(memberId2, memberId2), mockShardActor2);
 +            shardManager2.tell(new RoleChangeNotification(memberId2,
 +                    RaftState.Candidate.name(), RaftState.Leader.name()), mockShardActor2);
 +
 +            shardManager1.underlyingActor().waitForMemberUp();
 +
 +            shardManager1.tell(new FindPrimary("astronauts", false), getRef());
 +
 +            PrimaryFound found = expectMsgClass(duration("5 seconds"), PrimaryFound.class);
 +            String path = found.getPrimaryPath();
 +            assertTrue("Unexpected primary path " + path, path.contains("member-2-shard-astronauts-config"));
 +
 +            shardManager2.underlyingActor().verifyFindPrimary();
 +
 +            Cluster.get(system2).down(AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558"));
 +
 +            shardManager1.underlyingActor().waitForMemberRemoved();
 +
 +            shardManager1.tell(new FindPrimary("astronauts", false), getRef());
 +
 +            expectMsgClass(duration("5 seconds"), PrimaryNotFoundException.class);
 +        }};
 +
 +        JavaTestKit.shutdownActorSystem(system1);
 +        JavaTestKit.shutdownActorSystem(system2);
 +    }
 +
      @Test
      public void testOnReceiveFindLocalShardForNonExistentShard() throws Exception {
          new JavaTestKit(getSystem()) {{
  
              shardManager.tell(new FindLocalShard(Shard.DEFAULT_NAME, false), getRef());
  
 -            expectMsgClass(duration("5 seconds"), ActorNotInitialized.class);
 +            expectMsgClass(duration("5 seconds"), NotInitializedException.class);
          }};
      }
  
          }};
      }
  
 -    @Test
 -    public void testOnReceiveMemberUp() throws Exception {
 -        new JavaTestKit(getSystem()) {{
 -            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 -
 -            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
 -
 -            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 -
 -            PrimaryFound found = PrimaryFound.fromSerializable(expectMsgClass(duration("5 seconds"),
 -                    PrimaryFound.SERIALIZABLE_CLASS));
 -            String path = found.getPrimaryPath();
 -            assertTrue("Found path contains " + path, path.contains("member-2-shard-astronauts-config"));
 -        }};
 -    }
 -
 -    @Test
 -    public void testOnReceiveMemberDown() throws Exception {
 -
 -        new JavaTestKit(getSystem()) {{
 -            final ActorRef shardManager = getSystem().actorOf(newShardMgrProps());
 -
 -            MockClusterWrapper.sendMemberUp(shardManager, "member-2", getRef().path().toString());
 -
 -            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 -
 -            expectMsgClass(duration("5 seconds"), PrimaryFound.SERIALIZABLE_CLASS);
 -
 -            MockClusterWrapper.sendMemberRemoved(shardManager, "member-2", getRef().path().toString());
 -
 -            shardManager.tell(new FindPrimary("astronauts", false).toSerializable(), getRef());
 -
 -            expectMsgClass(duration("5 seconds"), PrimaryNotFound.SERIALIZABLE_CLASS);
 -        }};
 -    }
 -
      @Test
      public void testOnRecoveryJournalIsCleaned() {
          InMemoryJournal.addEntry(shardMgrID, 1L, new ShardManager.SchemaContextModules(
      }
  
      @Test
-     public void testRoleChangeNotificationReleaseReady() throws Exception {
+     public void testRoleChangeNotificationAndLeaderStateChangedReleaseReady() throws Exception {
          new JavaTestKit(getSystem()) {
              {
                  TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
                  shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
                          memberId, RaftState.Candidate.name(), RaftState.Leader.name()));
  
+                 verify(ready, never()).countDown();
+                 shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, memberId));
+                 verify(ready, times(1)).countDown();
+             }};
+     }
+     @Test
+     public void testRoleChangeNotificationToFollowerWithLeaderStateChangedReleaseReady() throws Exception {
+         new JavaTestKit(getSystem()) {
+             {
+                 TestActorRef<ShardManager> shardManager = TestActorRef.create(getSystem(), newShardMgrProps());
+                 String memberId = "member-1-shard-default-" + shardMrgIDSuffix;
+                 shardManager.underlyingActor().onReceiveCommand(new RoleChangeNotification(
+                         memberId, null, RaftState.Follower.name()));
+                 verify(ready, never()).countDown();
+                 shardManager.underlyingActor().onReceiveCommand(new LeaderStateChanged(memberId, "member-2-shard-default-" + shardMrgIDSuffix));
                  verify(ready, times(1)).countDown();
  
              }};
      }
  
      @Test
      public void testRoleChangeNotificationDoNothingForUnknownShard() throws Exception {
          new JavaTestKit(getSystem()) {
              return delegate.create();
          }
      }
 +
 +    private static class ForwardingShardManager extends ShardManager {
 +        private CountDownLatch findPrimaryMessageReceived = new CountDownLatch(1);
 +        private CountDownLatch memberUpReceived = new CountDownLatch(1);
 +        private CountDownLatch memberRemovedReceived = new CountDownLatch(1);
 +        private final ActorRef shardActor;
 +        private final String name;
 +
 +        protected ForwardingShardManager(ClusterWrapper cluster, Configuration configuration,
 +                DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch, String name,
 +                ActorRef shardActor) {
 +            super(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
 +            this.shardActor = shardActor;
 +            this.name = name;
 +        }
 +
 +        @Override
 +        public void handleCommand(Object message) throws Exception {
 +            try{
 +                super.handleCommand(message);
 +            } finally {
 +                if(message instanceof FindPrimary) {
 +                    findPrimaryMessageReceived.countDown();
 +                } else if(message instanceof ClusterEvent.MemberUp) {
 +                    String role = ((ClusterEvent.MemberUp)message).member().roles().head();
 +                    if(!getCluster().getCurrentMemberName().equals(role)) {
 +                        memberUpReceived.countDown();
 +                    }
 +                } else if(message instanceof ClusterEvent.MemberRemoved) {
 +                    String role = ((ClusterEvent.MemberRemoved)message).member().roles().head();
 +                    if(!getCluster().getCurrentMemberName().equals(role)) {
 +                        memberRemovedReceived.countDown();
 +                    }
 +                }
 +            }
 +        }
 +
 +        @Override
 +        public String persistenceId() {
 +            return name;
 +        }
 +
 +        @Override
 +        protected ActorRef newShardActor(SchemaContext schemaContext, ShardInformation info) {
 +            return shardActor;
 +        }
 +
 +        void waitForMemberUp() {
 +            assertEquals("MemberUp received", true,
 +                    Uninterruptibles.awaitUninterruptibly(memberUpReceived, 5, TimeUnit.SECONDS));
 +            memberUpReceived = new CountDownLatch(1);
 +        }
 +
 +        void waitForMemberRemoved() {
 +            assertEquals("MemberRemoved received", true,
 +                    Uninterruptibles.awaitUninterruptibly(memberRemovedReceived, 5, TimeUnit.SECONDS));
 +            memberRemovedReceived = new CountDownLatch(1);
 +        }
 +
 +        void verifyFindPrimary() {
 +            assertEquals("FindPrimary received", true,
 +                    Uninterruptibles.awaitUninterruptibly(findPrimaryMessageReceived, 5, TimeUnit.SECONDS));
 +            findPrimaryMessageReceived = new CountDownLatch(1);
 +        }
 +    }
  }