import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
+import akka.cluster.Cluster;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.pattern.Patterns;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
private final EntityOwnershipShardCommitCoordinator commitCoordinator;
private final EntityOwnershipListenerSupport listenerSupport;
private final Set<MemberName> downPeerMemberNames = new HashSet<>();
- private final Map<String, MemberName> peerIdToMemberNames = new HashMap<>();
private final EntityOwnerSelectionStrategyConfig strategyConfig;
private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
private final EntityOwnershipStatistics entityOwnershipStatistics;
this.strategyConfig = builder.ownerSelectionStrategyConfig;
this.entityOwnershipStatistics = new EntityOwnershipStatistics();
this.entityOwnershipStatistics.init(getDataStore());
-
- for(String peerId: getRaftActorContext().getPeerIds()) {
- ShardIdentifier shardId = ShardIdentifier.fromShardIdString(peerId);
- peerIdToMemberNames.put(peerId, shardId.getMemberName());
- }
}
@Override
if (isLeader) {
+ // Re-initialize the downPeerMemberNames from the current akka Cluster state. The previous leader, if any,
+ // is most likely down however it's possible we haven't received the PeerDown message yet.
+ initializeDownPeerMemberNamesFromClusterState();
+
// Clear all existing strategies so that they get re-created when we call createStrategy again
// This allows the strategies to be re-initialized with existing statistics maintained by
// EntityOwnershipStatistics
strategyConfig.clearStrategies();
// Re-assign owners for all members that are known to be down. In a cluster which has greater than
- // 3 nodes it is possible for a some node beside the leader being down when the leadership transitions
- // it makes sense to use this event to re-assign owners for those downed nodes
+ // 3 nodes it is possible for some node beside the leader being down when the leadership transitions
+ // it makes sense to use this event to re-assign owners for those downed nodes.
+ Set<String> ownedBy = new HashSet<>(downPeerMemberNames.size() + 1);
for (MemberName downPeerName : downPeerMemberNames) {
- selectNewOwnerForEntitiesOwnedBy(downPeerName);
+ ownedBy.add(downPeerName.getName());
}
+
+ // Also try to assign owners for entities that have no current owner. See explanation in onPeerUp.
+ ownedBy.add("");
+ selectNewOwnerForEntitiesOwnedBy(ownedBy);
} else {
// The leader changed - notify the coordinator to check if pending modifications need to be sent.
// While onStateChanged also does this, this method handles the case where the shard hears from a
super.onLeaderChanged(oldLeader, newLeader);
}
+ private void initializeDownPeerMemberNamesFromClusterState() {
+ java.util.Optional<Cluster> cluster = getRaftActorContext().getCluster();
+ if(!cluster.isPresent()) {
+ return;
+ }
+
+ CurrentClusterState state = cluster.get().state();
+ Set<Member> unreachable = state.getUnreachable();
+
+ LOG.debug("{}: initializeDownPeerMemberNamesFromClusterState - current downPeerMemberNames: {}, unreachable: {}",
+ persistenceId(), downPeerMemberNames, unreachable);
+
+ downPeerMemberNames.clear();
+ for(Member m: unreachable) {
+ downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
+ }
+
+ for(Member m: state.getMembers()) {
+ if(m.status() != MemberStatus.up() && m.status() != MemberStatus.weaklyUp()) {
+ LOG.debug("{}: Adding down member with status {}", persistenceId(), m.status());
+ downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
+ }
+ }
+
+ LOG.debug("{}: new downPeerMemberNames: {}", persistenceId(), downPeerMemberNames);
+ }
+
private void onCandidateRemoved(CandidateRemoved message) {
LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
// Available members is all the known peers - the number of peers that are down + self
// So if there are 2 peers and 1 is down then availableMembers will be 2
- final int availableMembers = peerIdToMemberNames.size() - downPeerMemberNames.size() + 1;
+ final int availableMembers = getRaftActorContext().getPeerIds().size() - downPeerMemberNames.size() + 1;
LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
// it will first remove all its candidates on startup. If another candidate is registered during the time
// the peer is down, the new candidate will be selected as the new owner.
- selectNewOwnerForEntitiesOwnedBy(downMemberName);
+ selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(downMemberName.getName()));
}
}
- private void selectNewOwnerForEntitiesOwnedBy(MemberName downMemberName) {
+ private void selectNewOwnerForEntitiesOwnedBy(Set<String> ownedBy) {
final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
- searchForEntitiesOwnedBy(downMemberName.getName(), (entityTypeNode, entityNode) -> {
+ searchForEntitiesOwnedBy(ownedBy, (entityTypeNode, entityNode) -> {
YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
node(ENTITY_OWNER_NODE_ID).build();
private void onPeerUp(PeerUp peerUp) {
LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
- peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
downPeerMemberNames.remove(peerUp.getMemberName());
// Notify the coordinator to check if pending modifications need to be sent. We do this here
// to handle the case where the leader's peer address isn't known yet when a prior state or
// leader change occurred.
commitCoordinator.onStateChanged(this, isLeader());
+
+ if(isLeader()) {
+ // Try to assign owners for entities that have no current owner. It's possible the peer that is now up
+ // had previously registered as a candidate and was the only candidate but the owner write tx couldn't be
+ // committed due to a leader change. Eg, the leader is able to successfully commit the candidate add tx but
+ // becomes isolated before it can commit the owner change and switches to follower. The majority partition
+ // with a new leader has the candidate but the entity has no owner. When the partition is healed and the
+ // previously isolated leader reconnects, we'll receive onPeerUp and, if there's still no owner, the
+ // previous leader will gain ownership.
+ selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(""));
+ }
}
private Collection<String> getCandidateNames(MapEntryNode entity) {
return candidateNames;
}
- private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
- LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
+ private void searchForEntitiesOwnedBy(Set<String> ownedBy, EntityWalker walker) {
+ LOG.debug("{}: Searching for entities owned by {}", persistenceId(), ownedBy);
searchForEntities((entityTypeNode, entityNode) -> {
Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
entityNode.getChild(ENTITY_OWNER_NODE_ID);
- if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+ String currentOwner = possibleOwner.isPresent() ? possibleOwner.get().getValue().toString() : "";
+ if(ownedBy.contains(currentOwner)) {
walker.onEntity(entityTypeNode, entityNode);
}
});
*/
package org.opendaylight.controller.cluster.datastore.entityownership;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
+import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
import static org.junit.Assert.assertEquals;
import static org.mockito.AdditionalMatchers.or;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
-import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
kit.expectMsgClass(SuccessReply.class);
- MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
+ expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
// Send a bunch of registration messages quickly and verify.
leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
- MessageCollectorActor.clearMessages(leaderShard.collectorActor());
+ clearMessages(leaderShard.collectorActor());
int max = 100;
List<YangInstanceIdentifier> entityIds = new ArrayList<>();
testLog.info("testLeaderIsolation ending");
}
+ @Test
+ public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
+ testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
+
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+ ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
+ ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
+
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
+ shardIsolatedLeaderCheckIntervalInMillis(100000);
+
+ TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+ newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
+ actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString());
+ peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+ TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+ newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
+ actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
+ peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
+
+ dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
+ shardIsolatedLeaderCheckIntervalInMillis(500);
+
+ TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
+ newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
+ actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
+
+ ShardTestKit.waitUntilLeader(leader);
+
+ // Add listeners on all members
+
+ DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
+ "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
+ leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
+ "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
+ peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
+ "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
+ peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Drop the CandidateAdded message to the leader for now.
+
+ leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
+
+ // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
+ // assigned the owner.
+
+ DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
+ leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+ DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
+ leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+ verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
+
+ // Capture the CandidateAdded messages.
+
+ List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), CandidateAdded.class, 2);
+
+ // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
+ // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
+
+ peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+ peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
+
+ // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
+
+ leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
+ leader.tell(candidateAdded.get(0), leader);
+ leader.tell(candidateAdded.get(1), leader);
+
+ expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, ae -> ae.getEntries().size() > 0);
+
+ // Verify no owner assigned.
+
+ verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
+ verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
+
+ // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
+
+ leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
+ leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+ peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
+ ae -> ae.getLeaderId().equals(leaderId.toString()));
+ peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
+
+ // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
+
+ leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
+ leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
+
+ // Verify the leader transitions to IsolatedLeader.
+
+ verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
+
+ // Send PeerDown to the new leader peer1.
+
+ peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+ // Make peer1 start an election and become leader by sending the TimeoutNow message.
+
+ peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
+
+ // Verify the peer1 transitions to Leader.
+
+ verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
+
+ verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
+ verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
+
+ verifyNoMoreInteractions(peer1Listener);
+ verifyNoMoreInteractions(peer2Listener);
+
+ // Add candidate peer1 candidate for entity2.
+
+ peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
+
+ verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+ verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
+ verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
+
+ reset(leaderListener, peer1Listener, peer2Listener);
+
+ // Remove the isolation.
+
+ leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
+ leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+ peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+ peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
+
+ // Previous leader should switch to Follower.
+
+ verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
+
+ // Send PeerUp to peer1 and peer2.
+
+ peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+ peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
+
+ // The previous leader should become the owner of entity1.
+
+ verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
+
+ // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
+ // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+ // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
+ // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
+ // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
+ verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(or(ownershipChange(entity1, false, false, false),
+ ownershipChange(entity2, false, false, false)), or(ownershipChange(entity1, false, true, true),
+ ownershipChange(entity2, false, false, true))));
+
+ verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+ verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
+
+ // Verify entity2's owner doesn't change.
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
+
+ verifyNoMoreInteractions(leaderListener);
+ verifyNoMoreInteractions(peer1Listener);
+ verifyNoMoreInteractions(peer2Listener);
+
+ testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
+ }
+
@Test
public void testListenerRegistration() throws Exception {
testLog.info("testListenerRegistration starting");
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void handleCommand(Object message) {
- if(collectorActor != null) {
- collectorActor.tell(message, ActorRef.noSender());
- }
-
Predicate drop = dropMessagesOfType.get(message.getClass());
if(drop == null || !drop.test(message)) {
super.handleCommand(message);
}
+
+ if(collectorActor != null) {
+ collectorActor.tell(message, ActorRef.noSender());
+ }
}
void startDroppingMessagesOfType(Class<?> msgClass) {