Bug 2187: EOS shard recovery after AddShardReplica
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index fabfd096104dcab91ee6a4ffabd5758a50466e64..e33d7cdce694eb3715eb4781a0a40c176855677b 100644 (file)
@@ -139,6 +139,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreSnapshot restoreFromSnapshot;
 
+    private ShardManagerSnapshot recoveredSnapshot;
+
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
     private final String persistenceId;
@@ -293,17 +295,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onCreateShard(CreateShard createShard) {
+        LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
+
         Object reply;
         try {
             String shardName = createShard.getModuleShardConfig().getShardName();
             if(localShards.containsKey(shardName)) {
+                LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
             } else {
                 doCreateShard(createShard);
                 reply = new akka.actor.Status.Success(null);
             }
         } catch (Exception e) {
-            LOG.error("onCreateShard failed", e);
+            LOG.error("{}: onCreateShard failed", persistenceId(), e);
             reply = new akka.actor.Status.Failure(e);
         }
 
@@ -328,13 +333,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
+        boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null &&
+                recoveredSnapshot.getShardList().contains(shardName);
+
         Map<String, String> peerAddresses;
         boolean isActiveMember;
-        if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+        if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
+                contains(cluster.getCurrentMemberName())) {
             peerAddresses = getPeerAddresses(shardName);
             isActiveMember = true;
         } else {
-            // The local member is not in the given shard member configuration. In this case we'll create
+            // The local member is not in the static shard member configuration and the shard did not
+            // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
             // the shard with no peers and with elections disabled so it stays as follower. A
             // subsequent AddServer request will be needed to make it an active member.
             isActiveMember = false;
@@ -343,8 +353,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
         }
 
-        LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
-                moduleShardConfig.getShardMemberNames(), peerAddresses);
+        LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+                persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
+                isActiveMember);
 
         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
@@ -506,7 +517,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             deleteMessages(lastSequenceNr());
             createLocalShards();
         } else if (message instanceof SnapshotOffer) {
-            handleShardRecovery((SnapshotOffer) message);
+            onSnapshotOffer((SnapshotOffer) message);
         }
     }
 
@@ -807,6 +818,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+
+            LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
+
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
@@ -1056,13 +1070,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         saveSnapshot(new ShardManagerSnapshot(shardList));
     }
 
-    private void handleShardRecovery(SnapshotOffer offer) {
-        LOG.debug ("{}: in handleShardRecovery", persistenceId());
-        ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+    private void onSnapshotOffer(SnapshotOffer offer) {
+        recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot();
+
+        LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot);
+
         String currentMember = cluster.getCurrentMemberName();
         Set<String> configuredShardList =
             new HashSet<>(configuration.getMemberShardNames(currentMember));
-        for (String shard : snapshot.getShardList()) {
+        for (String shard : recoveredSnapshot.getShardList()) {
             if (!configuredShardList.contains(shard)) {
                 // add the current member as a replica for the shard
                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);