b7321afe5536f07f61136c6f3632df6b774d2ca3
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
13 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
14 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
15 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
16 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
17 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
18 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
19 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
20 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
21 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
22 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
23 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
24
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.Cancellable;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent.CurrentClusterState;
30 import akka.cluster.Member;
31 import akka.cluster.MemberStatus;
32 import akka.pattern.Patterns;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Preconditions;
35 import com.google.common.base.Strings;
36 import com.google.common.collect.ImmutableSet;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Set;
44 import java.util.concurrent.TimeUnit;
45 import org.opendaylight.controller.cluster.access.concepts.MemberName;
46 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
47 import org.opendaylight.controller.cluster.datastore.Shard;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
52 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RemoveAllCandidates;
53 import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner;
54 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
55 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
56 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
57 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
58 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
59 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
60 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
61 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
62 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
63 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
64 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
65 import org.opendaylight.controller.cluster.datastore.modification.Modification;
66 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
67 import org.opendaylight.controller.cluster.raft.RaftState;
68 import org.opendaylight.controller.cluster.raft.VotingState;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
70 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
72 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
73 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
74 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
75 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
76 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
77 import scala.concurrent.Future;
78 import scala.concurrent.duration.FiniteDuration;
79
80 /**
81  * Special Shard for EntityOwnership.
82  *
83  * @author Thomas Pantelis
84  */
85 class EntityOwnershipShard extends Shard {
86     private final MemberName localMemberName;
87     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
88     private final EntityOwnershipListenerSupport listenerSupport;
89     private final Set<MemberName> downPeerMemberNames = new HashSet<>();
90     private final EntityOwnerSelectionStrategyConfig strategyConfig;
91     private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
92     private final EntityOwnershipStatistics entityOwnershipStatistics;
93     private boolean removeAllInitialCandidates = true;
94
95     protected EntityOwnershipShard(final Builder builder) {
96         super(builder);
97         this.localMemberName = builder.localMemberName;
98         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(builder.localMemberName, LOG);
99         this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
100         this.strategyConfig = builder.ownerSelectionStrategyConfig;
101         this.entityOwnershipStatistics = new EntityOwnershipStatistics();
102         this.entityOwnershipStatistics.init(getDataStore());
103     }
104
105     private static DatastoreContext noPersistenceDatastoreContext(final DatastoreContext datastoreContext) {
106         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
107     }
108
109     @Override
110     protected void onDatastoreContext(final DatastoreContext context) {
111         super.onDatastoreContext(noPersistenceDatastoreContext(context));
112     }
113
114     @Override
115     protected void onRecoveryComplete() {
116         super.onRecoveryComplete();
117
118         new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore());
119         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
120     }
121
122     @Override
123     public void handleNonRaftCommand(final Object message) {
124         if (message instanceof RegisterCandidateLocal) {
125             onRegisterCandidateLocal((RegisterCandidateLocal) message);
126         } else if (message instanceof UnregisterCandidateLocal) {
127             onUnregisterCandidateLocal((UnregisterCandidateLocal) message);
128         } else if (message instanceof CandidateAdded) {
129             onCandidateAdded((CandidateAdded) message);
130         } else if (message instanceof CandidateRemoved) {
131             onCandidateRemoved((CandidateRemoved) message);
132         } else if (message instanceof PeerDown) {
133             onPeerDown((PeerDown) message);
134         } else if (message instanceof PeerUp) {
135             onPeerUp((PeerUp) message);
136         } else if (message instanceof RegisterListenerLocal) {
137             onRegisterListenerLocal((RegisterListenerLocal) message);
138         } else if (message instanceof UnregisterListenerLocal) {
139             onUnregisterListenerLocal((UnregisterListenerLocal) message);
140         } else if (message instanceof SelectOwner) {
141             onSelectOwner((SelectOwner) message);
142         } else if (message instanceof RemoveAllCandidates) {
143             onRemoveAllCandidates((RemoveAllCandidates) message);
144         } else if (!commitCoordinator.handleMessage(message, this)) {
145             super.handleNonRaftCommand(message);
146         }
147     }
148
149     private void onRemoveAllCandidates(final RemoveAllCandidates message) {
150         LOG.debug("{}: onRemoveAllCandidates: {}", persistenceId(), message);
151
152         removeCandidateFromEntities(message.getMemberName());
153     }
154
155     private void onSelectOwner(final SelectOwner selectOwner) {
156         LOG.debug("{}: onSelectOwner: {}", persistenceId(), selectOwner);
157
158         String currentOwner = getCurrentOwner(selectOwner.getEntityPath());
159         if (Strings.isNullOrEmpty(currentOwner)) {
160             writeNewOwner(selectOwner.getEntityPath(), newOwner(currentOwner, selectOwner.getAllCandidates(),
161                     selectOwner.getOwnerSelectionStrategy()));
162
163             Cancellable cancellable = entityToScheduledOwnershipTask.get(selectOwner.getEntityPath());
164             if (cancellable != null) {
165                 if (!cancellable.isCancelled()) {
166                     cancellable.cancel();
167                 }
168                 entityToScheduledOwnershipTask.remove(selectOwner.getEntityPath());
169             }
170         }
171     }
172
173     private void onRegisterCandidateLocal(final RegisterCandidateLocal registerCandidate) {
174         LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
175
176         NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
177                 registerCandidate.getEntity().getIdentifier(), localMemberName.getName());
178         commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
179
180         getSender().tell(SuccessReply.INSTANCE, getSelf());
181     }
182
183     private void onUnregisterCandidateLocal(final UnregisterCandidateLocal unregisterCandidate) {
184         LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
185
186         DOMEntity entity = unregisterCandidate.getEntity();
187         YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getIdentifier(),
188                 localMemberName.getName());
189         commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
190
191         getSender().tell(SuccessReply.INSTANCE, getSelf());
192     }
193
194     private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
195         LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
196
197         listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
198
199         getSender().tell(SuccessReply.INSTANCE, getSelf());
200
201         searchForEntities((entityTypeNode, entityNode) -> {
202             java.util.Optional<DataContainerChild<?, ?>> possibleType = entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
203             String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
204             if (registerListener.getEntityType().equals(entityType)) {
205                 final boolean hasOwner;
206                 final boolean isOwner;
207
208                 java.util.Optional<DataContainerChild<?, ?>> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID);
209                 if (possibleOwner.isPresent()) {
210                     isOwner = localMemberName.getName().equals(possibleOwner.get().getValue().toString());
211                     hasOwner = true;
212                 } else {
213                     isOwner = false;
214                     hasOwner = false;
215                 }
216
217                 DOMEntity entity = new DOMEntity(entityType,
218                     (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
219
220                 listenerSupport.notifyEntityOwnershipListener(entity, false, isOwner, hasOwner,
221                     registerListener.getListener());
222             }
223         });
224     }
225
226     private void onUnregisterListenerLocal(final UnregisterListenerLocal unregisterListener) {
227         LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
228
229         listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(),
230                 unregisterListener.getListener());
231
232         getSender().tell(SuccessReply.INSTANCE, getSelf());
233     }
234
235     void tryCommitModifications(final BatchedModifications modifications) {
236         if (isLeader()) {
237             LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(),
238                     modifications.getTransactionId());
239
240             // Note that it's possible the commit won't get consensus and will timeout and not be applied
241             // to the state. However we don't need to retry it in that case b/c it will be committed to
242             // the journal first and, once a majority of followers come back on line and it is replicated,
243             // it will be applied at that point.
244             handleBatchedModificationsLocal(modifications, self());
245         } else {
246             final ActorSelection leader = getLeader();
247             if (leader != null) {
248                 possiblyRemoveAllInitialCandidates(leader);
249
250                 LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(),
251                         modifications.getTransactionId(), leader);
252
253                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
254                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
255
256                 Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
257             }
258         }
259     }
260
261     void possiblyRemoveAllInitialCandidates(final ActorSelection leader) {
262         // The following handles removing all candidates on startup when re-joining with a remote leader. When a
263         // follower is detected as down, the leader will re-assign new owners to entities that were owned by the
264         // down member but doesn't remove the down member as a candidate, as the down node may actually be isolated
265         // and still running. Therefore on startup we send an initial message to the remote leader to remove any
266         // potential stale candidates we had previously registered, as it's possible a candidate may not be
267         // registered by a client in the new incarnation. We have to send the RemoveAllCandidates message prior to any
268         // pending registrations.
269         if (removeAllInitialCandidates && leader != null) {
270             removeAllInitialCandidates = false;
271             if (!isLeader()) {
272                 LOG.debug("{} - got new leader {} on startup - sending RemoveAllCandidates", persistenceId(), leader);
273
274                 leader.tell(new RemoveAllCandidates(localMemberName), ActorRef.noSender());
275             }
276         }
277     }
278
279     boolean hasLeader() {
280         return getLeader() != null && (!isLeader() || isLeaderActive());
281     }
282
283     /**
284      * Determine if we are in jeopardy based on observed RAFT state.
285      */
286     private static boolean inJeopardy(final RaftState state) {
287         switch (state) {
288             case Candidate:
289             case Follower:
290             case Leader:
291             case PreLeader:
292                 return false;
293             case IsolatedLeader:
294                 return true;
295             default:
296                 throw new IllegalStateException("Unsupported RAFT state " + state);
297         }
298     }
299
300     private void notifyAllListeners() {
301         searchForEntities((entityTypeNode, entityNode) -> {
302             java.util.Optional<DataContainerChild<?, ?>> possibleType = entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
303             if (possibleType.isPresent()) {
304                 final boolean hasOwner;
305                 final boolean isOwner;
306
307                 java.util.Optional<DataContainerChild<?, ?>> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID);
308                 if (possibleOwner.isPresent()) {
309                     isOwner = localMemberName.getName().equals(possibleOwner.get().getValue().toString());
310                     hasOwner = true;
311                 } else {
312                     isOwner = false;
313                     hasOwner = false;
314                 }
315
316                 DOMEntity entity = new DOMEntity(possibleType.get().getValue().toString(),
317                     (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
318
319                 listenerSupport.notifyEntityOwnershipListeners(entity, isOwner, isOwner, hasOwner);
320             }
321         });
322     }
323
324     @Override
325     protected void onStateChanged() {
326         boolean isLeader = isLeader();
327         LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());
328
329         // Examine current RAFT state to see if we are in jeopardy, potentially notifying all listeners
330         final boolean inJeopardy = inJeopardy(getRaftState());
331         final boolean wasInJeopardy = listenerSupport.setInJeopardy(inJeopardy);
332         if (inJeopardy != wasInJeopardy) {
333             LOG.debug("{}: {} jeopardy state, notifying all listeners", persistenceId(),
334                 inJeopardy ? "entered" : "left");
335             notifyAllListeners();
336         }
337
338         commitCoordinator.onStateChanged(this, isLeader);
339
340         super.onStateChanged();
341     }
342
343     @Override
344     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
345         boolean isLeader = isLeader();
346         LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
347                 newLeader, isLeader);
348
349         if (isLeader) {
350
351             // Re-initialize the downPeerMemberNames from the current akka Cluster state. The previous leader, if any,
352             // is most likely down however it's possible we haven't received the PeerDown message yet.
353             initializeDownPeerMemberNamesFromClusterState();
354
355             // Clear all existing strategies so that they get re-created when we call createStrategy again
356             // This allows the strategies to be re-initialized with existing statistics maintained by
357             // EntityOwnershipStatistics
358             strategyConfig.clearStrategies();
359
360             // Re-assign owners for all members that are known to be down. In a cluster which has greater than
361             // 3 nodes it is possible for some node beside the leader being down when the leadership transitions
362             // it makes sense to use this event to re-assign owners for those downed nodes.
363             Set<String> ownedBy = new HashSet<>(downPeerMemberNames.size() + 1);
364             for (MemberName downPeerName : downPeerMemberNames) {
365                 ownedBy.add(downPeerName.getName());
366             }
367
368             // Also try to assign owners for entities that have no current owner. See explanation in onPeerUp.
369             ownedBy.add("");
370             selectNewOwnerForEntitiesOwnedBy(ownedBy);
371         } else {
372             // The leader changed - notify the coordinator to check if pending modifications need to be sent.
373             // While onStateChanged also does this, this method handles the case where the shard hears from a
374             // leader and stays in the follower state. In that case no behavior state change occurs.
375             commitCoordinator.onStateChanged(this, isLeader);
376         }
377
378         super.onLeaderChanged(oldLeader, newLeader);
379     }
380
381     @Override
382     protected void onVotingStateChangeComplete() {
383         // Re-evaluate ownership for all entities - if a member changed from voting to non-voting it should lose
384         // ownership and vice versa it now is a candidate to become owner.
385         final List<Modification> modifications = new ArrayList<>();
386         searchForEntities((entityTypeNode, entityNode) -> {
387             YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH)
388                     .node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier())
389                     .node(ENTITY_OWNER_NODE_ID).build();
390
391             java.util.Optional<String> possibleOwner =
392                     entityNode.getChild(ENTITY_OWNER_NODE_ID).map(node -> node.getValue().toString());
393             String newOwner = newOwner(possibleOwner.orElse(null), getCandidateNames(entityNode),
394                     getEntityOwnerElectionStrategy(entityPath));
395
396             if (!newOwner.equals(possibleOwner.orElse(""))) {
397                 modifications.add(new WriteModification(entityPath,
398                         ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
399             }
400         });
401
402         commitCoordinator.commitModifications(modifications, this);
403     }
404
405     private void initializeDownPeerMemberNamesFromClusterState() {
406         java.util.Optional<Cluster> cluster = getRaftActorContext().getCluster();
407         if (!cluster.isPresent()) {
408             return;
409         }
410
411         CurrentClusterState state = cluster.get().state();
412         Set<Member> unreachable = state.getUnreachable();
413
414         LOG.debug(
415             "{}: initializeDownPeerMemberNamesFromClusterState - current downPeerMemberNames: {}, unreachable: {}",
416             persistenceId(), downPeerMemberNames, unreachable);
417
418         downPeerMemberNames.clear();
419         for (Member m: unreachable) {
420             downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
421         }
422
423         for (Member m: state.getMembers()) {
424             if (m.status() != MemberStatus.up() && m.status() != MemberStatus.weaklyUp()) {
425                 LOG.debug("{}: Adding down member with status {}", persistenceId(), m.status());
426                 downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
427             }
428         }
429
430         LOG.debug("{}: new downPeerMemberNames: {}", persistenceId(), downPeerMemberNames);
431     }
432
433     private void onCandidateRemoved(final CandidateRemoved message) {
434         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
435
436         if (isLeader()) {
437             String currentOwner = getCurrentOwner(message.getEntityPath());
438             writeNewOwner(message.getEntityPath(),
439                     newOwner(currentOwner, message.getRemainingCandidates(),
440                             getEntityOwnerElectionStrategy(message.getEntityPath())));
441         }
442     }
443
444     private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(final YangInstanceIdentifier entityPath) {
445         final String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath);
446         return strategyConfig.createStrategy(entityType, entityOwnershipStatistics.byEntityType(entityType));
447     }
448
449     private void onCandidateAdded(final CandidateAdded message) {
450         if (!isLeader()) {
451             return;
452         }
453
454         LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
455
456         // Since a node's candidate member is only added by the node itself, we can assume the node is up so
457         // remove it from the downPeerMemberNames.
458         downPeerMemberNames.remove(MemberName.forName(message.getNewCandidate()));
459
460         final String currentOwner = getCurrentOwner(message.getEntityPath());
461         final EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath());
462
463         // Available members is all the known peers - the number of peers that are down + self
464         // So if there are 2 peers and 1 is down then availableMembers will be 2
465         final int availableMembers = getRaftActorContext().getPeerIds().size() - downPeerMemberNames.size() + 1;
466
467         LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
468
469         if (strategy.getSelectionDelayInMillis() == 0L) {
470             writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
471                     strategy));
472         } else if (message.getAllCandidates().size() == availableMembers) {
473             LOG.debug("{}: Received the maximum candidates requests : {} writing new owner",
474                     persistenceId(), availableMembers);
475             cancelOwnerSelectionTask(message.getEntityPath());
476             writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
477                     strategy));
478         } else {
479             scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
480         }
481     }
482
483     private void onPeerDown(final PeerDown peerDown) {
484         LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
485
486         MemberName downMemberName = peerDown.getMemberName();
487         if (downPeerMemberNames.add(downMemberName) && isLeader()) {
488             // Select new owners for entities owned by the down peer and which have other candidates. For an entity for
489             // which the down peer is the only candidate, we leave it as the owner and don't clear it. This is done to
490             // handle the case where the peer member process is actually still running but the node is partitioned.
491             // When the partition is healed, the peer just remains as the owner. If the peer process actually restarted,
492             // it will first remove all its candidates on startup. If another candidate is registered during the time
493             // the peer is down, the new candidate will be selected as the new owner.
494
495             selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(downMemberName.getName()));
496         }
497     }
498
499     private void selectNewOwnerForEntitiesOwnedBy(final Set<String> ownedBy) {
500         final List<Modification> modifications = new ArrayList<>();
501         searchForEntitiesOwnedBy(ownedBy, (entityTypeNode, entityNode) -> {
502             YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH)
503                     .node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier())
504                     .node(ENTITY_OWNER_NODE_ID).build();
505             String newOwner = newOwner(getCurrentOwner(entityPath), getCandidateNames(entityNode),
506                     getEntityOwnerElectionStrategy(entityPath));
507
508             if (!newOwner.isEmpty()) {
509                 LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
510
511                 modifications.add(new WriteModification(entityPath,
512                     ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
513
514             } else {
515                 LOG.debug("{}: Found entity {} but no other candidates - not clearing owner", persistenceId(),
516                         entityPath);
517             }
518         });
519
520         commitCoordinator.commitModifications(modifications, this);
521     }
522
523     private void onPeerUp(final PeerUp peerUp) {
524         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
525
526         downPeerMemberNames.remove(peerUp.getMemberName());
527
528         // Notify the coordinator to check if pending modifications need to be sent. We do this here
529         // to handle the case where the leader's peer address isn't known yet when a prior state or
530         // leader change occurred.
531         commitCoordinator.onStateChanged(this, isLeader());
532
533         if (isLeader()) {
534             // Try to assign owners for entities that have no current owner. It's possible the peer that is now up
535             // had previously registered as a candidate and was the only candidate but the owner write tx couldn't be
536             // committed due to a leader change. Eg, the leader is able to successfully commit the candidate add tx but
537             // becomes isolated before it can commit the owner change and switches to follower. The majority partition
538             // with a new leader has the candidate but the entity has no owner. When the partition is healed and the
539             // previously isolated leader reconnects, we'll receive onPeerUp and, if there's still no owner, the
540             // previous leader will gain ownership.
541             selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(""));
542         }
543     }
544
545     private static Collection<String> getCandidateNames(final MapEntryNode entity) {
546         Collection<MapEntryNode> candidates = ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getValue();
547         Collection<String> candidateNames = new ArrayList<>(candidates.size());
548         for (MapEntryNode candidate: candidates) {
549             candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
550         }
551
552         return candidateNames;
553     }
554
555     private void searchForEntitiesOwnedBy(final Set<String> ownedBy, final EntityWalker walker) {
556         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), ownedBy);
557
558         searchForEntities((entityTypeNode, entityNode) -> {
559             java.util.Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
560                     entityNode.getChild(ENTITY_OWNER_NODE_ID);
561             String currentOwner = possibleOwner.isPresent() ? possibleOwner.get().getValue().toString() : "";
562             if (ownedBy.contains(currentOwner)) {
563                 walker.onEntity(entityTypeNode, entityNode);
564             }
565         });
566     }
567
568     private void removeCandidateFromEntities(final MemberName member) {
569         final List<Modification> modifications = new ArrayList<>();
570         searchForEntities((entityTypeNode, entityNode) -> {
571             if (hasCandidate(entityNode, member)) {
572                 YangInstanceIdentifier entityId =
573                         (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
574                 YangInstanceIdentifier candidatePath = candidatePath(
575                         entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
576                         entityId, member.getName());
577
578                 LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
579                         member, candidatePath);
580
581                 modifications.add(new DeleteModification(candidatePath));
582             }
583         });
584
585         commitCoordinator.commitModifications(modifications, this);
586     }
587
588     private static boolean hasCandidate(final MapEntryNode entity, final MemberName candidateName) {
589         return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName.getName()))
590                 .isPresent();
591     }
592
593     private void searchForEntities(final EntityWalker walker) {
594         Optional<NormalizedNode<?, ?>> possibleEntityTypes = getDataStore().readNode(ENTITY_TYPES_PATH);
595         if (!possibleEntityTypes.isPresent()) {
596             return;
597         }
598
599         for (MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
600             java.util.Optional<DataContainerChild<?, ?>> possibleEntities = entityType.getChild(ENTITY_NODE_ID);
601             if (!possibleEntities.isPresent()) {
602                 // shouldn't happen but handle anyway
603                 continue;
604             }
605
606             for (MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
607                 walker.onEntity(entityType, entity);
608             }
609         }
610     }
611
612     private void writeNewOwner(final YangInstanceIdentifier entityPath, final String newOwner) {
613         LOG.debug("{}: Writing new owner {} for entity {}", persistenceId(), newOwner, entityPath);
614
615         commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
616                 ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
617     }
618
619     /**
620      * Schedule a new owner selection job. Cancelling any outstanding job if it has not been cancelled.
621      */
622     private void scheduleOwnerSelection(final YangInstanceIdentifier entityPath, final Collection<String> allCandidates,
623                                        final EntityOwnerSelectionStrategy strategy) {
624         cancelOwnerSelectionTask(entityPath);
625
626         LOG.debug("{}: Scheduling owner selection after {} ms", persistenceId(), strategy.getSelectionDelayInMillis());
627
628         final Cancellable lastScheduledTask = context().system().scheduler().scheduleOnce(
629                 FiniteDuration.apply(strategy.getSelectionDelayInMillis(), TimeUnit.MILLISECONDS), self(),
630                 new SelectOwner(entityPath, allCandidates, strategy), context().system().dispatcher(), self());
631
632         entityToScheduledOwnershipTask.put(entityPath, lastScheduledTask);
633     }
634
635     private void cancelOwnerSelectionTask(final YangInstanceIdentifier entityPath) {
636         final Cancellable lastScheduledTask = entityToScheduledOwnershipTask.get(entityPath);
637         if (lastScheduledTask != null && !lastScheduledTask.isCancelled()) {
638             lastScheduledTask.cancel();
639         }
640     }
641
642     private String newOwner(final String currentOwner, final Collection<String> candidates,
643             final EntityOwnerSelectionStrategy ownerSelectionStrategy) {
644         Collection<String> viableCandidates = getViableCandidates(candidates);
645         if (viableCandidates.isEmpty()) {
646             return "";
647         }
648         return ownerSelectionStrategy.newOwner(currentOwner, viableCandidates);
649     }
650
651     private Collection<String> getViableCandidates(final Collection<String> candidates) {
652         Map<MemberName, VotingState> memberToVotingState = new HashMap<>();
653         getRaftActorContext().getPeers().forEach(peerInfo -> memberToVotingState.put(
654                 ShardIdentifier.fromShardIdString(peerInfo.getId()).getMemberName(), peerInfo.getVotingState()));
655
656         Collection<String> viableCandidates = new ArrayList<>();
657
658         for (String candidate : candidates) {
659             MemberName memberName = MemberName.forName(candidate);
660             if (memberToVotingState.get(memberName) != VotingState.NON_VOTING
661                     && !downPeerMemberNames.contains(memberName)) {
662                 viableCandidates.add(candidate);
663             }
664         }
665         return viableCandidates;
666     }
667
668     private String getCurrentOwner(final YangInstanceIdentifier entityId) {
669         Optional<NormalizedNode<?, ?>> optionalEntityOwner = getDataStore().readNode(entityId.node(ENTITY_OWNER_QNAME));
670         if (optionalEntityOwner.isPresent()) {
671             return optionalEntityOwner.get().getValue().toString();
672         }
673         return null;
674     }
675
676     @FunctionalInterface
677     private interface EntityWalker {
678         void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
679     }
680
681     public static Builder newBuilder() {
682         return new Builder();
683     }
684
685     static class Builder extends Shard.AbstractBuilder<Builder, EntityOwnershipShard> {
686         private MemberName localMemberName;
687         private EntityOwnerSelectionStrategyConfig ownerSelectionStrategyConfig;
688
689         protected Builder() {
690             super(EntityOwnershipShard.class);
691         }
692
693         Builder localMemberName(final MemberName newLocalMemberName) {
694             checkSealed();
695             this.localMemberName = newLocalMemberName;
696             return this;
697         }
698
699         Builder ownerSelectionStrategyConfig(final EntityOwnerSelectionStrategyConfig newOwnerSelectionStrategyConfig) {
700             checkSealed();
701             this.ownerSelectionStrategyConfig = newOwnerSelectionStrategyConfig;
702             return this;
703         }
704
705         @Override
706         protected void verify() {
707             super.verify();
708             Preconditions.checkNotNull(localMemberName, "localMemberName should not be null");
709             Preconditions.checkNotNull(ownerSelectionStrategyConfig, "ownerSelectionStrategyConfig should not be null");
710         }
711     }
712 }