From e04c7f93b0b614580c45318585f7709192465757 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Wed, 18 Nov 2015 00:50:09 -0500 Subject: [PATCH] Bug 2187: EOS shard recovery after AddShardReplica On restart after an EOS shard replica is added and persisted, the ShardManager recovers its snapshot and attempts to add the local member to the shard replicas in the configuration. However, since there's no static module conguration for the EOS shard, the ShardManager can't create the shard on recovery complete. The shard does get created on the subsequent CreateShard message however, if there's no local shards in the static configuration, it creates the shard as inactive, ie with the DisableElectionsRaftPolicy which we don't want. To alleviate this, the ShardManager now stores its recovered snapshot and, on CreateShard, if the shard was in the recovered shard list then it was pre-existing so is not initialized with the DisableElectionsRaftPolicy. I extended DistributedEntityOwnershipIntegrationTest::testEntityOwnershipShardBootstrapping to restart the newly created replica and verify it's re-instated properly. I added the customRaftPolicyClassName to the OnDemandRaftState so the test can verify. Testing revealed some timing issues in the EntityOwnershipShard on re-instatement where pending modifications weren't sent to the leader. The EntityOwnershipShard does respond to raft behavior state changes to send pending modifications but, on startup, if the shard stays in the follower state then no behavior change occurs. In that case the leaderId changes and onLeaderChanged occurs so I changed it to also notify the commit coordinator to commit the next batched transaction, if any. I also did the same for onPeerUp since, in some test runs, the MemberUp event hadn't occured yet. Signed-off-by: Tom Pantelis Change-Id: Id6bf966e0aa9a0f12f30327c617cb84f10e6b10f --- .../controller/cluster/raft/RaftActor.java | 7 ++-- .../client/messages/OnDemandRaftState.java | 10 ++++++ .../cluster/datastore/ShardManager.java | 36 +++++++++++++------ .../entityownership/EntityOwnershipShard.java | 15 ++++++-- ...ributedEntityOwnershipIntegrationTest.java | 34 ++++++++++++++++++ 5 files changed, 87 insertions(+), 15 deletions(-) diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java index b93ea4dd8a..df40cc5c55 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java @@ -178,9 +178,9 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { if (context.getReplicatedLog().size() > 0) { self().tell(new InitiateCaptureSnapshot(), self()); - LOG.info("Snapshot capture initiated after recovery"); + LOG.info("{}: Snapshot capture initiated after recovery", persistenceId()); } else { - LOG.info("Snapshot capture NOT initiated after recovery, journal empty"); + LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId()); } } } @@ -303,7 +303,8 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor { .snapshotIndex(replicatedLog().getSnapshotIndex()) .snapshotTerm(replicatedLog().getSnapshotTerm()) .votedFor(context.getTermInformation().getVotedFor()) - .peerAddresses(peerAddresses); + .peerAddresses(peerAddresses) + .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass()); ReplicatedLogEntry lastLogEntry = getLastLogEntry(); if (lastLogEntry != null) { diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java index 8c2986f6d1..57f8beb005 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/client/messages/OnDemandRaftState.java @@ -33,6 +33,7 @@ public class OnDemandRaftState { private String raftState; private String votedFor; private boolean isSnapshotCaptureInitiated; + private String customRaftPolicyClassName; private List followerInfoList = Collections.emptyList(); private Map peerAddresses = Collections.emptyMap(); @@ -116,6 +117,10 @@ public class OnDemandRaftState { return peerAddresses; } + public String getCustomRaftPolicyClassName() { + return customRaftPolicyClassName; + } + public static class Builder { private final OnDemandRaftState stats = new OnDemandRaftState(); @@ -209,6 +214,11 @@ public class OnDemandRaftState { return this; } + public Builder customRaftPolicyClassName(String className) { + stats.customRaftPolicyClassName = className; + return this; + } + public OnDemandRaftState build() { return stats; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index fabfd09610..e33d7cdce6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -139,6 +139,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private DatastoreSnapshot restoreFromSnapshot; + private ShardManagerSnapshot recoveredSnapshot; + private final Set shardReplicaOperationsInProgress = new HashSet<>(); private final String persistenceId; @@ -293,17 +295,20 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } private void onCreateShard(CreateShard createShard) { + LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard); + Object reply; try { String shardName = createShard.getModuleShardConfig().getShardName(); if(localShards.containsKey(shardName)) { + LOG.debug("{}: Shard {} already exists", persistenceId(), shardName); reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName)); } else { doCreateShard(createShard); reply = new akka.actor.Status.Success(null); } } catch (Exception e) { - LOG.error("onCreateShard failed", e); + LOG.error("{}: onCreateShard failed", persistenceId(), e); reply = new akka.actor.Status.Failure(e); } @@ -328,13 +333,18 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName); + boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null && + recoveredSnapshot.getShardList().contains(shardName); + Map peerAddresses; boolean isActiveMember; - if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) { + if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName). + contains(cluster.getCurrentMemberName())) { peerAddresses = getPeerAddresses(shardName); isActiveMember = true; } else { - // The local member is not in the given shard member configuration. In this case we'll create + // The local member is not in the static shard member configuration and the shard did not + // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create // the shard with no peers and with elections disabled so it stays as follower. A // subsequent AddServer request will be needed to make it an active member. isActiveMember = false; @@ -343,8 +353,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build(); } - LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId, - moduleShardConfig.getShardMemberNames(), peerAddresses); + LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}", + persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses, + isActiveMember); ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses, shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver); @@ -506,7 +517,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { deleteMessages(lastSequenceNr()); createLocalShards(); } else if (message instanceof SnapshotOffer) { - handleShardRecovery((SnapshotOffer) message); + onSnapshotOffer((SnapshotOffer) message); } } @@ -807,6 +818,9 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { for(String shardName : memberShardNames){ ShardIdentifier shardId = getShardIdentifier(memberName, shardName); + + LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId); + Map peerAddresses = getPeerAddresses(shardName); localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot( @@ -1056,13 +1070,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { saveSnapshot(new ShardManagerSnapshot(shardList)); } - private void handleShardRecovery(SnapshotOffer offer) { - LOG.debug ("{}: in handleShardRecovery", persistenceId()); - ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot(); + private void onSnapshotOffer(SnapshotOffer offer) { + recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot(); + + LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot); + String currentMember = cluster.getCurrentMemberName(); Set configuredShardList = new HashSet<>(configuration.getMemberShardNames(currentMember)); - for (String shard : snapshot.getShardList()) { + for (String shard : recoveredSnapshot.getShardList()) { if (!configuredShardList.contains(shard)) { // add the current member as a replica for the shard LOG.debug ("{}: adding shard {}", persistenceId(), shard); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index bd07dc550a..30fafbbb10 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -256,10 +256,11 @@ class EntityOwnershipShard extends Shard { protected void onLeaderChanged(String oldLeader, String newLeader) { super.onLeaderChanged(oldLeader, newLeader); + boolean isLeader = isLeader(); LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader, - newLeader, isLeader()); + newLeader, isLeader); - if(isLeader()) { + if(isLeader) { // We were just elected leader. If the old leader is down, select new owners for the entities // owned by the down leader. @@ -270,6 +271,11 @@ class EntityOwnershipShard extends Shard { if(downPeerMemberNames.contains(oldLeaderMemberName)) { selectNewOwnerForEntitiesOwnedBy(oldLeaderMemberName); } + } else { + // The leader changed - notify the coordinator to check if pending modifications need to be sent. + // While onStateChanged also does this, this method handles the case where the shard hears from a + // leader and stays in the follower state. In that case no behavior state change occurs. + commitCoordinator.onStateChanged(this, isLeader); } } @@ -346,6 +352,11 @@ class EntityOwnershipShard extends Shard { peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName()); downPeerMemberNames.remove(peerUp.getMemberName()); + + // Notify the coordinator to check if pending modifications need to be sent. We do this here + // to handle the case where the leader's peer address isn't now yet when a prior state or + // leader change occurred. + commitCoordinator.onStateChanged(this, isLeader()); } private void selectNewOwnerForEntitiesOwnedBy(String owner) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java index 46ca5f284c..e1a5759a41 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.AdditionalMatchers.or; import static org.mockito.Mockito.atMost; @@ -15,16 +17,19 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState; import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange; import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath; import akka.actor.Status.Failure; import akka.actor.Status.Success; +import akka.cluster.Cluster; import akka.testkit.JavaTestKit; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Arrays; @@ -41,8 +46,10 @@ import org.mockito.MockitoAnnotations; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.MemberNode; +import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig; import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica; +import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -370,6 +377,33 @@ public class DistributedEntityOwnershipIntegrationTest { // The queued candidate registration should proceed verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true)); + reset(leaderMockListener); + + candidateReg.close(); + verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false)); + reset(leaderMockListener); + + // Restart follower1 and verify the entity ownership shard is re-instated by registering. + Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress()); + follower1Node.cleanup(); + + follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ). + moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false). + datastoreContextBuilder(followerDatastoreContextBuilder).build(); + follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore()); + + follower1EntityOwnershipService.registerCandidate(ENTITY1); + verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true)); + + verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, new RaftStateVerifier() { + @Override + public void verify(OnDemandRaftState raftState) { + assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName()); + assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size()); + assertThat("Peer Id", Iterables.getLast(raftState.getPeerAddresses().keySet()), + org.hamcrest.CoreMatchers.containsString("member-1")); + } + }); } private static void verifyGetOwnershipState(EntityOwnershipService service, Entity entity, -- 2.36.6