if (context.getReplicatedLog().size() > 0) {
self().tell(new InitiateCaptureSnapshot(), self());
- LOG.info("Snapshot capture initiated after recovery");
+ LOG.info("{}: Snapshot capture initiated after recovery", persistenceId());
} else {
- LOG.info("Snapshot capture NOT initiated after recovery, journal empty");
+ LOG.info("{}: Snapshot capture NOT initiated after recovery, journal empty", persistenceId());
}
}
}
.snapshotIndex(replicatedLog().getSnapshotIndex())
.snapshotTerm(replicatedLog().getSnapshotTerm())
.votedFor(context.getTermInformation().getVotedFor())
- .peerAddresses(peerAddresses);
+ .peerAddresses(peerAddresses)
+ .customRaftPolicyClassName(context.getConfigParams().getCustomRaftPolicyImplementationClass());
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
if (lastLogEntry != null) {
private String raftState;
private String votedFor;
private boolean isSnapshotCaptureInitiated;
+ private String customRaftPolicyClassName;
private List<FollowerInfo> followerInfoList = Collections.emptyList();
private Map<String, String> peerAddresses = Collections.emptyMap();
return peerAddresses;
}
+ public String getCustomRaftPolicyClassName() {
+ return customRaftPolicyClassName;
+ }
+
public static class Builder {
private final OnDemandRaftState stats = new OnDemandRaftState();
return this;
}
+ public Builder customRaftPolicyClassName(String className) {
+ stats.customRaftPolicyClassName = className;
+ return this;
+ }
+
public OnDemandRaftState build() {
return stats;
}
private DatastoreSnapshot restoreFromSnapshot;
+ private ShardManagerSnapshot recoveredSnapshot;
+
private final Set<String> shardReplicaOperationsInProgress = new HashSet<>();
private final String persistenceId;
}
private void onCreateShard(CreateShard createShard) {
+ LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
+
Object reply;
try {
String shardName = createShard.getModuleShardConfig().getShardName();
if(localShards.containsKey(shardName)) {
+ LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
reply = new akka.actor.Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
reply = new akka.actor.Status.Success(null);
}
} catch (Exception e) {
- LOG.error("onCreateShard failed", e);
+ LOG.error("{}: onCreateShard failed", persistenceId(), e);
reply = new akka.actor.Status.Failure(e);
}
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+ boolean shardWasInRecoveredSnapshot = recoveredSnapshot != null &&
+ recoveredSnapshot.getShardList().contains(shardName);
+
Map<String, String> peerAddresses;
boolean isActiveMember;
- if(configuration.getMembersFromShardName(shardName).contains(cluster.getCurrentMemberName())) {
+ if(shardWasInRecoveredSnapshot || configuration.getMembersFromShardName(shardName).
+ contains(cluster.getCurrentMemberName())) {
peerAddresses = getPeerAddresses(shardName);
isActiveMember = true;
} else {
- // The local member is not in the given shard member configuration. In this case we'll create
+ // The local member is not in the static shard member configuration and the shard did not
+ // previously exist (ie !shardWasInRecoveredSnapshot). In this case we'll create
// the shard with no peers and with elections disabled so it stays as follower. A
// subsequent AddServer request will be needed to make it an active member.
isActiveMember = false;
customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()).build();
}
- LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
- moduleShardConfig.getShardMemberNames(), peerAddresses);
+ LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
+ persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
+ isActiveMember);
ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
deleteMessages(lastSequenceNr());
createLocalShards();
} else if (message instanceof SnapshotOffer) {
- handleShardRecovery((SnapshotOffer) message);
+ onSnapshotOffer((SnapshotOffer) message);
}
}
for(String shardName : memberShardNames){
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+
+ LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
+
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
newShardDatastoreContext(shardName), Shard.builder().restoreFromSnapshot(
saveSnapshot(new ShardManagerSnapshot(shardList));
}
- private void handleShardRecovery(SnapshotOffer offer) {
- LOG.debug ("{}: in handleShardRecovery", persistenceId());
- ShardManagerSnapshot snapshot = (ShardManagerSnapshot)offer.snapshot();
+ private void onSnapshotOffer(SnapshotOffer offer) {
+ recoveredSnapshot = (ShardManagerSnapshot)offer.snapshot();
+
+ LOG.debug ("{}: onSnapshotOffer: {}", persistenceId(), recoveredSnapshot);
+
String currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
new HashSet<>(configuration.getMemberShardNames(currentMember));
- for (String shard : snapshot.getShardList()) {
+ for (String shard : recoveredSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
LOG.debug ("{}: adding shard {}", persistenceId(), shard);
protected void onLeaderChanged(String oldLeader, String newLeader) {
super.onLeaderChanged(oldLeader, newLeader);
+ boolean isLeader = isLeader();
LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
- newLeader, isLeader());
+ newLeader, isLeader);
- if(isLeader()) {
+ if(isLeader) {
// We were just elected leader. If the old leader is down, select new owners for the entities
// owned by the down leader.
if(downPeerMemberNames.contains(oldLeaderMemberName)) {
selectNewOwnerForEntitiesOwnedBy(oldLeaderMemberName);
}
+ } else {
+ // The leader changed - notify the coordinator to check if pending modifications need to be sent.
+ // While onStateChanged also does this, this method handles the case where the shard hears from a
+ // leader and stays in the follower state. In that case no behavior state change occurs.
+ commitCoordinator.onStateChanged(this, isLeader);
}
}
peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
downPeerMemberNames.remove(peerUp.getMemberName());
+
+ // Notify the coordinator to check if pending modifications need to be sent. We do this here
+ // to handle the case where the leader's peer address isn't now yet when a prior state or
+ // leader change occurred.
+ commitCoordinator.onStateChanged(this, isLeader());
}
private void selectNewOwnerForEntitiesOwnedBy(String owner) {
package org.opendaylight.controller.cluster.datastore.entityownership;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.AdditionalMatchers.or;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange;
import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
import akka.actor.Status.Failure;
import akka.actor.Status.Success;
+import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Arrays;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.MemberNode;
+import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
// The queued candidate registration should proceed
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+ reset(leaderMockListener);
+
+ candidateReg.close();
+ verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false));
+ reset(leaderMockListener);
+
+ // Restart follower1 and verify the entity ownership shard is re-instated by registering.
+ Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress());
+ follower1Node.cleanup();
+
+ follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+ follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
+
+ follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+
+ verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName());
+ assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size());
+ assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()),
+ org.hamcrest.CoreMatchers.containsString("member-1"));
+ }
+ });
}
private static void verifyGetOwnershipState(EntityOwnershipService service, Entity entity,