import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateMapEntry;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.createEntity;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
this.localMemberName = localMemberName;
this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
- this.listenerSupport = new EntityOwnershipListenerSupport(getContext());
+ this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
for(String peerId: peerAddresses.keySet()) {
ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
protected void onRecoveryComplete() {
super.onRecoveryComplete();
- new CandidateListChangeListener(getSelf()).init(getDataStore());
+ new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore());
new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
}
onPeerDown((PeerDown) message);
} else if(message instanceof PeerUp) {
onPeerUp((PeerUp) message);
+ } if(message instanceof RegisterListenerLocal) {
+ onRegisterListenerLocal((RegisterListenerLocal)message);
+ } if(message instanceof UnregisterListenerLocal) {
+ onUnregisterListenerLocal((UnregisterListenerLocal)message);
} else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
- listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
+ listenerSupport.setHasCandidateForEntity(registerCandidate.getEntity());
NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
registerCandidate.getEntity().getId(), localMemberName);
LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
Entity entity = unregisterCandidate.getEntity();
- listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
+ listenerSupport.unsetHasCandidateForEntity(entity);
YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
+ private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
+ LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
+
+ listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+
+ searchForEntitiesOwnedBy(localMemberName, new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
+ entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+ String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
+ if(registerListener.getEntityType().equals(entityType)) {
+ Entity entity = new Entity(entityType,
+ (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+ listenerSupport.notifyEntityOwnershipListener(entity, false, true, true, registerListener.getListener());
+ }
+ }
+ });
+ }
+
+ private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) {
+ LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
+
+ listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener());
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
void tryCommitModifications(final BatchedModifications modifications) {
if(isLeader()) {
LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
}
private void onCandidateRemoved(CandidateRemoved message) {
- if(!isLeader()) {
- return;
- }
-
LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
- String currentOwner = getCurrentOwner(message.getEntityPath());
- if(message.getRemovedCandidate().equals(currentOwner)){
- writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+ if(isLeader()) {
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(message.getRemovedCandidate().equals(currentOwner)){
+ writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+ }
+ } else {
+ // We're not the leader. If the removed candidate is our local member then check if we actually
+ // have a local candidate registered. If we do then we must have been partitioned from the leader
+ // and the leader removed our candidate since the leader can't tell the difference between a
+ // temporary network partition and a node's process actually restarted. So, in that case, re-add
+ // our candidate.
+ if(localMemberName.equals(message.getRemovedCandidate()) &&
+ listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
+ LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
+ " - adding back local candidate", message.getEntityPath());
+
+ commitCoordinator.commitModification(new MergeModification(
+ candidatePath(message.getEntityPath(), localMemberName),
+ candidateMapEntry(localMemberName)), this);
+ }
}
}
LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
+ // Since a node's candidate member is only added by the node itself, we can assume the node is up so
+ // remove it from the downPeerMemberNames.
+ downPeerMemberNames.remove(message.getNewCandidate());
+
String currentOwner = getCurrentOwner(message.getEntityPath());
if(Strings.isNullOrEmpty(currentOwner)){
writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
}
private void onPeerDown(PeerDown peerDown) {
- LOG.debug("{}: onPeerDown: {}", persistenceId(), peerDown);
+ LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
String downMemberName = peerDown.getMemberName();
if(downPeerMemberNames.add(downMemberName) && isLeader()) {
- // Select new owners for entities owned by the down peer.
- selectNewOwnerForEntitiesOwnedBy(downMemberName);
+ // Remove the down peer as a candidate from all entities.
+ removeCandidateFromEntities(downMemberName);
}
}
LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
-
- if(downPeerMemberNames.remove(peerUp.getMemberName()) && isLeader()) {
- // This peer was previously down - for its previously owned entities, if there were no other
- // candidates, the owner would have been cleared so handle that here by trying to re-assign
- // ownership for entities whose owner is cleared.
- selectNewOwnerForEntitiesOwnedBy("");
- }
+ downPeerMemberNames.remove(peerUp.getMemberName());
}
private void selectNewOwnerForEntitiesOwnedBy(String owner) {
+ final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+ searchForEntitiesOwnedBy(owner, new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Object newOwner = newOwner(getCandidateNames(entityNode));
+ YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+ node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+ node(ENTITY_OWNER_NODE_ID).build();
+
+ LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+ modifications.addModification(new WriteModification(entityPath,
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+ }
+ });
+
+ commitCoordinator.commitModifications(modifications, this);
+ }
+
+ private void removeCandidateFromEntities(final String owner) {
+ final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+ searchForEntities(new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ if(hasCandidate(entityNode, owner)) {
+ YangInstanceIdentifier entityId =
+ (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+ YangInstanceIdentifier candidatePath = candidatePath(
+ entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
+ entityId, owner);
+
+ LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
+ owner, candidatePath);
+
+ modifications.addModification(new DeleteModification(candidatePath));
+ }
+ }
+ });
+
+ commitCoordinator.commitModifications(modifications, this);
+ }
+
+ private boolean hasCandidate(MapEntryNode entity, String candidateName) {
+ return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent();
+ }
+
+ private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
if(!possibleEntityTypes.isPresent()) {
LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
- BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+ searchForEntities(new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+ entityNode.getChild(ENTITY_OWNER_NODE_ID);
+ if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+ walker.onEntity(entityTypeNode, entityNode);
+ }
+ }
+ });
+ }
+
+ private void searchForEntities(EntityWalker walker) {
+ DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
+ Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
+ if(!possibleEntityTypes.isPresent()) {
+ return;
+ }
+
for(MapEntryNode entityType: ((MapNode) possibleEntityTypes.get()).getValue()) {
Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
entityType.getChild(ENTITY_NODE_ID);
}
for(MapEntryNode entity: ((MapNode) possibleEntities.get()).getValue()) {
- Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
- entity.getChild(ENTITY_OWNER_NODE_ID);
- if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
- Object newOwner = newOwner(getCandidateNames(entity));
- YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
- node(entityType.getIdentifier()).node(ENTITY_NODE_ID).node(entity.getIdentifier()).
- node(ENTITY_OWNER_NODE_ID).build();
-
- LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
-
- modifications.addModification(new WriteModification(entityPath,
- ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
- }
+ walker.onEntity(entityType, entity);
}
}
-
- commitCoordinator.commitModifications(modifications, this);
}
private Collection<String> getCandidateNames(MapEntryNode entity) {
return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
}
}
+
+ private static interface EntityWalker {
+ void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+ }
}