Bug 2187: EOS shard recovery after AddShardReplica 04/29904/6
authorTom Pantelis <tpanteli@brocade.com>
Wed, 18 Nov 2015 05:50:09 +0000 (00:50 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 2 Dec 2015 11:03:03 +0000 (11:03 +0000)
On restart after an EOS shard replica is added and persisted, the
ShardManager recovers its snapshot and attempts to add the local member
to the shard replicas in the configuration. However, since there's no
static module conguration for the EOS shard, the ShardManager can't
create the shard on recovery complete. The shard does get created on
the subsequent CreateShard message however, if there's no local shards
in the static configuration, it creates the shard as inactive, ie with
the DisableElectionsRaftPolicy which we don't want.

To alleviate this, the ShardManager now stores its recovered snapshot
and, on CreateShard, if the shard was in the recovered shard list then
it was pre-existing so is not initialized with the
DisableElectionsRaftPolicy.

I extended
DistributedEntityOwnershipIntegrationTest::testEntityOwnershipShardBootstrapping
to restart the newly created replica and verify it's re-instated
properly. I added the customRaftPolicyClassName to the OnDemandRaftState
so the test can verify.

Testing revealed some timing issues in the EntityOwnershipShard on
re-instatement where pending modifications weren't sent to the leader.
The EntityOwnershipShard does respond to raft behavior state changes to send
pending modifications but, on startup, if the shard stays in the
follower state then no behavior change occurs. In that case the leaderId
changes and onLeaderChanged occurs so I changed it to also notify the
commit coordinator to commit the next batched transaction, if any. I
also did the same for onPeerUp since, in some test runs, the MemberUp
event hadn't occured yet.

Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
Change-Id: Id6bf966e0aa9a0f12f30327c617cb84f10e6b10f

opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java

index b93ea4dd8a8b3e6028d2943f87c84d17ee3eb01a..df40cc5c553ccabe274e1ffe06508f3a63f31354 100644 (file)
@@ -178,9 +178,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
 
             if (context.getReplicatedLog().size() > 0) {
                 self().tell(new InitiateCaptureSnapshot(), self());
-                LOG.info("Snapshot capture initiated after recovery");
+                LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
             } else {
-                LOG.info("Snapshot capture NOT initiated after recovery, journal empty");
+                LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
             }
         }
     }
@@ -303,7 +303,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                 .snapshotIndex(replicatedLog().getSnapshotIndex())
                 .snapshotTerm(replicatedLog().getSnapshotTerm())
                 .votedFor(context.getTermInformation().getVotedFor())
-                .peerAddresses(peerAddresses);
+                .peerAddresses(peerAddresses)
+                .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
 
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
         if (lastLogEntry != null) {
index 8c2986f6d19f402137d40efd551667b22a28c151..57f8beb0054118e8a26549c55dac770e8ee10593 100644 (file)
@@ -33,6 +33,7 @@ public class OnDemandRaftState {
     private String raftState;
     private String votedFor;
     private boolean isSnapshotCaptureInitiated;
+    private String customRaftPolicyClassName;
 
     private List<FollowerInfo> followerInfoList = Collections.emptyList();
     private Map<String, String> peerAddresses = Collections.emptyMap();
@@ -116,6 +117,10 @@ public class OnDemandRaftState {
         return peerAddresses;
     }
 
+    public String getCustomRaftPolicyClassName() {
+        return customRaftPolicyClassName;
+    }
+
     public static class Builder {
         private final OnDemandRaftState stats = new OnDemandRaftState();
 
@@ -209,6 +214,11 @@ public class OnDemandRaftState {
             return this;
         }
 
+        public Builder customRaftPolicyClassName(String className) {
+            stats.customRaftPolicyClassName = className;
+            return this;
+        }
+
         public OnDemandRaftState build() {
             return stats;
         }
index fabfd096104dcab91ee6a4ffabd5758a50466e64..e33d7cdce694eb3715eb4781a0a40c176855677b 100644 (file)
@@ -139,6 +139,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     private DatastoreSnapshot restoreFromSnapshot;
 
+    private ShardManagerSnapshot recoveredSnapshot;
+
     private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
 
     private final String persistenceId;
@@ -293,17 +295,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void onCreateShard(CreateShard createShard) {
+        LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
+
         Object reply;
         try {
             String shardName = createShard.getModuleShardConfig().getShardName();
             if(localShards.containsKey(shardName)) {
+                LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
                 reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
             } else {
                 doCreateShard(createShard);
                 reply = new akka.actor.Status.Success(null);
             }
         } catch (Exception e) {
-            LOG.error("onCreateShard failed", e);
+            LOG.error("{}: onCreateShard failed", persistenceId(), e);
             reply = new akka.actor.Status.Failure(e);
         }
 
@@ -328,13 +333,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
 
+        boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null &&
+                recoveredSnapshot.getShardList().contains(shardName);
+
         Map<String, String> peerAddresses;
         boolean isActiveMember;
-        if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+        if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
+                contains(cluster.getCurrentMemberName())) {
             peerAddresses = getPeerAddresses(shardName);
             isActiveMember = true;
         } else {
-            // The local member is not in the given shard member configuration. In this case we'll create
+            // The local member is not in the static shard member configuration and the shard did not
+            // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
             // the shard with no peers and with elections disabled so it stays as follower. A
             // subsequent AddServer request will be needed to make it an active member.
             isActiveMember = false;
@@ -343,8 +353,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                     customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
         }
 
-        LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
-                moduleShardConfig.getShardMemberNames(), peerAddresses);
+        LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+                persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
+                isActiveMember);
 
         ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
                 shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
@@ -506,7 +517,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             deleteMessages(lastSequenceNr());
             createLocalShards();
         } else if (message instanceof SnapshotOffer) {
-            handleShardRecovery((SnapshotOffer) message);
+            onSnapshotOffer((SnapshotOffer) message);
         }
     }
 
@@ -807,6 +818,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         for(String shardName : memberShardNames){
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+
+            LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
+
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
                     newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
@@ -1056,13 +1070,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         saveSnapshot(new ShardManagerSnapshot(shardList));
     }
 
-    private void handleShardRecovery(SnapshotOffer offer) {
-        LOG.debug ("{}: in handleShardRecovery", persistenceId());
-        ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+    private void onSnapshotOffer(SnapshotOffer offer) {
+        recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot();
+
+        LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot);
+
         String currentMember = cluster.getCurrentMemberName();
         Set<String> configuredShardList =
             new HashSet<>(configuration.getMemberShardNames(currentMember));
-        for (String shard : snapshot.getShardList()) {
+        for (String shard : recoveredSnapshot.getShardList()) {
             if (!configuredShardList.contains(shard)) {
                 // add the current member as a replica for the shard
                 LOG.debug ("{}: adding shard {}", persistenceId(), shard);
index bd07dc550a9f5397765ba5c9743d524f9ddb299a..30fafbbb10f4e78b1a09f0a5c6ec2559a4d88bf3 100644 (file)
@@ -256,10 +256,11 @@ class EntityOwnershipShard extends Shard {
     protected void onLeaderChanged(String oldLeader, String newLeader) {
         super.onLeaderChanged(oldLeader, newLeader);
 
+        boolean isLeader = isLeader();
         LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
-                newLeader, isLeader());
+                newLeader, isLeader);
 
-        if(isLeader()) {
+        if(isLeader) {
             // We were just elected leader. If the old leader is down, select new owners for the entities
             // owned by the down leader.
 
@@ -270,6 +271,11 @@ class EntityOwnershipShard extends Shard {
             if(downPeerMemberNames.contains(oldLeaderMemberName)) {
                 selectNewOwnerForEntitiesOwnedBy(oldLeaderMemberName);
             }
+        } else {
+            // The leader changed - notify the coordinator to check if pending modifications need to be sent.
+            // While onStateChanged also does this, this method handles the case where the shard hears from a
+            // leader and stays in the follower state. In that case no behavior state change occurs.
+            commitCoordinator.onStateChanged(this, isLeader);
         }
     }
 
@@ -346,6 +352,11 @@ class EntityOwnershipShard extends Shard {
 
         peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
         downPeerMemberNames.remove(peerUp.getMemberName());
+
+        // Notify the coordinator to check if pending modifications need to be sent. We do this here
+        // to handle the case where the leader's peer address isn't now yet when a prior state or
+        // leader change occurred.
+        commitCoordinator.onStateChanged(this, isLeader());
     }
 
     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
index 46ca5f284c4fc5706e8bafb5669408f77f2ab9e1..e1a5759a41c68889c0e09cb7357850c97937058b 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.AdditionalMatchers.or;
 import static org.mockito.Mockito.atMost;
@@ -15,16 +17,19 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
 import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange;
 import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
 import akka.actor.Status.Failure;
 import akka.actor.Status.Success;
+import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,8 +46,10 @@ import org.mockito.MockitoAnnotations;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.MemberNode;
+import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -370,6 +377,33 @@ public class DistributedEntityOwnershipIntegrationTest {
 
         // The queued candidate registration should proceed
         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+        reset(leaderMockListener);
+
+        candidateReg.close();
+        verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false));
+        reset(leaderMockListener);
+
+        // Restart follower1 and verify the entity ownership shard is re-instated by registering.
+        Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress());
+        follower1Node.cleanup();
+
+        follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+                moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+                datastoreContextBuilder(followerDatastoreContextBuilder).build();
+        follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
+
+        follower1EntityOwnershipService.registerCandidate(ENTITY1);
+        verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+
+        verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, new RaftStateVerifier() {
+            @Override
+            public void verify(OnDemandRaftState raftState) {
+                assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName());
+                assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size());
+                assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()),
+                        org.hamcrest.CoreMatchers.containsString("member-1"));
+            }
+        });
     }
 
     private static void verifyGetOwnershipState(EntityOwnershipService service, Entity entity,