Refactor AbstractBrokerAwareActivator
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
11 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
13 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_QNAME;
14 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
15 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
16 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
17 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
18 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
19 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
20 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_QNAME;
21 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidateNodeKey;
22 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
23 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
24
25 import akka.actor.ActorRef;
26 import akka.actor.ActorSelection;
27 import akka.actor.Cancellable;
28 import akka.cluster.Cluster;
29 import akka.cluster.ClusterEvent.CurrentClusterState;
30 import akka.cluster.Member;
31 import akka.cluster.MemberStatus;
32 import akka.pattern.Patterns;
33 import com.google.common.base.Preconditions;
34 import com.google.common.base.Strings;
35 import com.google.common.collect.ImmutableList;
36 import com.google.common.collect.ImmutableSet;
37 import java.util.ArrayList;
38 import java.util.Collection;
39 import java.util.HashMap;
40 import java.util.HashSet;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Optional;
44 import java.util.Set;
45 import java.util.concurrent.TimeUnit;
46 import org.opendaylight.controller.cluster.access.concepts.MemberName;
47 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
48 import org.opendaylight.controller.cluster.datastore.Shard;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
51 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
52 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
53 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RemoveAllCandidates;
54 import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner;
55 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
56 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
57 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
58 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
59 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
60 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
61 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
62 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
63 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
64 import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
65 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
66 import org.opendaylight.controller.cluster.datastore.modification.Modification;
67 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
68 import org.opendaylight.controller.cluster.raft.RaftState;
69 import org.opendaylight.controller.cluster.raft.VotingState;
70 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
73 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
74 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
75 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
76 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
77 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
78 import scala.concurrent.Future;
79 import scala.concurrent.duration.FiniteDuration;
80
81 /**
82  * Special Shard for EntityOwnership.
83  *
84  * @author Thomas Pantelis
85  */
86 class EntityOwnershipShard extends Shard {
87     private final MemberName localMemberName;
88     private final EntityOwnershipShardCommitCoordinator commitCoordinator;
89     private final EntityOwnershipListenerSupport listenerSupport;
90     private final Set<MemberName> downPeerMemberNames = new HashSet<>();
91     private final EntityOwnerSelectionStrategyConfig strategyConfig;
92     private final Map<YangInstanceIdentifier, Cancellable> entityToScheduledOwnershipTask = new HashMap<>();
93     private final EntityOwnershipStatistics entityOwnershipStatistics;
94     private boolean removeAllInitialCandidates = true;
95
96     protected EntityOwnershipShard(final Builder builder) {
97         super(builder);
98         this.localMemberName = builder.localMemberName;
99         this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(builder.localMemberName, LOG);
100         this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
101         this.strategyConfig = builder.ownerSelectionStrategyConfig;
102         this.entityOwnershipStatistics = new EntityOwnershipStatistics();
103         this.entityOwnershipStatistics.init(getDataStore());
104     }
105
106     private static DatastoreContext noPersistenceDatastoreContext(final DatastoreContext datastoreContext) {
107         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
108     }
109
110     @Override
111     protected void onDatastoreContext(final DatastoreContext context) {
112         super.onDatastoreContext(noPersistenceDatastoreContext(context));
113     }
114
115     @Override
116     protected void onRecoveryComplete() {
117         super.onRecoveryComplete();
118
119         new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore());
120         new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
121     }
122
123     @Override
124     public void handleNonRaftCommand(final Object message) {
125         if (message instanceof RegisterCandidateLocal) {
126             onRegisterCandidateLocal((RegisterCandidateLocal) message);
127         } else if (message instanceof UnregisterCandidateLocal) {
128             onUnregisterCandidateLocal((UnregisterCandidateLocal) message);
129         } else if (message instanceof CandidateAdded) {
130             onCandidateAdded((CandidateAdded) message);
131         } else if (message instanceof CandidateRemoved) {
132             onCandidateRemoved((CandidateRemoved) message);
133         } else if (message instanceof PeerDown) {
134             onPeerDown((PeerDown) message);
135         } else if (message instanceof PeerUp) {
136             onPeerUp((PeerUp) message);
137         } else if (message instanceof RegisterListenerLocal) {
138             onRegisterListenerLocal((RegisterListenerLocal) message);
139         } else if (message instanceof UnregisterListenerLocal) {
140             onUnregisterListenerLocal((UnregisterListenerLocal) message);
141         } else if (message instanceof SelectOwner) {
142             onSelectOwner((SelectOwner) message);
143         } else if (message instanceof RemoveAllCandidates) {
144             onRemoveAllCandidates((RemoveAllCandidates) message);
145         } else if (!commitCoordinator.handleMessage(message, this)) {
146             super.handleNonRaftCommand(message);
147         }
148     }
149
150     private void onRemoveAllCandidates(final RemoveAllCandidates message) {
151         LOG.debug("{}: onRemoveAllCandidates: {}", persistenceId(), message);
152
153         removeCandidateFromEntities(message.getMemberName());
154     }
155
156     private void onSelectOwner(final SelectOwner selectOwner) {
157         LOG.debug("{}: onSelectOwner: {}", persistenceId(), selectOwner);
158
159         String currentOwner = getCurrentOwner(selectOwner.getEntityPath());
160         if (Strings.isNullOrEmpty(currentOwner)) {
161             writeNewOwner(selectOwner.getEntityPath(), newOwner(currentOwner, selectOwner.getAllCandidates(),
162                     selectOwner.getOwnerSelectionStrategy()));
163
164             Cancellable cancellable = entityToScheduledOwnershipTask.get(selectOwner.getEntityPath());
165             if (cancellable != null) {
166                 if (!cancellable.isCancelled()) {
167                     cancellable.cancel();
168                 }
169                 entityToScheduledOwnershipTask.remove(selectOwner.getEntityPath());
170             }
171         }
172     }
173
174     private void onRegisterCandidateLocal(final RegisterCandidateLocal registerCandidate) {
175         LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
176
177         NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
178                 registerCandidate.getEntity().getIdentifier(), localMemberName.getName());
179         commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
180
181         getSender().tell(SuccessReply.INSTANCE, getSelf());
182     }
183
184     private void onUnregisterCandidateLocal(final UnregisterCandidateLocal unregisterCandidate) {
185         LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
186
187         DOMEntity entity = unregisterCandidate.getEntity();
188         YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getIdentifier(),
189                 localMemberName.getName());
190         commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
191
192         getSender().tell(SuccessReply.INSTANCE, getSelf());
193     }
194
195     private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
196         LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
197
198         listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
199
200         getSender().tell(SuccessReply.INSTANCE, getSelf());
201
202         searchForEntities((entityTypeNode, entityNode) -> {
203             Optional<DataContainerChild<?, ?>> possibleType = entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
204             String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
205             if (registerListener.getEntityType().equals(entityType)) {
206                 final boolean hasOwner;
207                 final boolean isOwner;
208
209                 Optional<DataContainerChild<?, ?>> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID);
210                 if (possibleOwner.isPresent()) {
211                     isOwner = localMemberName.getName().equals(possibleOwner.get().getValue().toString());
212                     hasOwner = true;
213                 } else {
214                     isOwner = false;
215                     hasOwner = false;
216                 }
217
218                 DOMEntity entity = new DOMEntity(entityType,
219                     (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
220
221                 listenerSupport.notifyEntityOwnershipListener(entity, false, isOwner, hasOwner,
222                     registerListener.getListener());
223             }
224         });
225     }
226
227     private void onUnregisterListenerLocal(final UnregisterListenerLocal unregisterListener) {
228         LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
229
230         listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(),
231                 unregisterListener.getListener());
232
233         getSender().tell(SuccessReply.INSTANCE, getSelf());
234     }
235
236     void tryCommitModifications(final BatchedModifications modifications) {
237         if (isLeader()) {
238             LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(),
239                     modifications.getTransactionId());
240
241             // Note that it's possible the commit won't get consensus and will timeout and not be applied
242             // to the state. However we don't need to retry it in that case b/c it will be committed to
243             // the journal first and, once a majority of followers come back on line and it is replicated,
244             // it will be applied at that point.
245             handleBatchedModificationsLocal(modifications, self());
246         } else {
247             final ActorSelection leader = getLeader();
248             if (leader != null) {
249                 possiblyRemoveAllInitialCandidates(leader);
250
251                 LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(),
252                         modifications.getTransactionId(), leader);
253
254                 Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
255                         getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
256
257                 Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
258             }
259         }
260     }
261
262     void possiblyRemoveAllInitialCandidates(final ActorSelection leader) {
263         // The following handles removing all candidates on startup when re-joining with a remote leader. When a
264         // follower is detected as down, the leader will re-assign new owners to entities that were owned by the
265         // down member but doesn't remove the down member as a candidate, as the down node may actually be isolated
266         // and still running. Therefore on startup we send an initial message to the remote leader to remove any
267         // potential stale candidates we had previously registered, as it's possible a candidate may not be
268         // registered by a client in the new incarnation. We have to send the RemoveAllCandidates message prior to any
269         // pending registrations.
270         if (removeAllInitialCandidates && leader != null) {
271             removeAllInitialCandidates = false;
272             if (!isLeader()) {
273                 LOG.debug("{} - got new leader {} on startup - sending RemoveAllCandidates", persistenceId(), leader);
274
275                 leader.tell(new RemoveAllCandidates(localMemberName), ActorRef.noSender());
276             }
277         }
278     }
279
280     boolean hasLeader() {
281         return getLeader() != null && (!isLeader() || isLeaderActive());
282     }
283
284     /**
285      * Determine if we are in jeopardy based on observed RAFT state.
286      */
287     private static boolean inJeopardy(final RaftState state) {
288         switch (state) {
289             case Candidate:
290             case Follower:
291             case Leader:
292             case PreLeader:
293                 return false;
294             case IsolatedLeader:
295                 return true;
296             default:
297                 throw new IllegalStateException("Unsupported RAFT state " + state);
298         }
299     }
300
301     private void notifyAllListeners() {
302         searchForEntities((entityTypeNode, entityNode) -> {
303             Optional<DataContainerChild<?, ?>> possibleType = entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
304             if (possibleType.isPresent()) {
305                 final boolean hasOwner;
306                 final boolean isOwner;
307
308                 Optional<DataContainerChild<?, ?>> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID);
309                 if (possibleOwner.isPresent()) {
310                     isOwner = localMemberName.getName().equals(possibleOwner.get().getValue().toString());
311                     hasOwner = true;
312                 } else {
313                     isOwner = false;
314                     hasOwner = false;
315                 }
316
317                 DOMEntity entity = new DOMEntity(possibleType.get().getValue().toString(),
318                     (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
319
320                 listenerSupport.notifyEntityOwnershipListeners(entity, isOwner, isOwner, hasOwner);
321             }
322         });
323     }
324
325     @Override
326     protected void onStateChanged() {
327         boolean isLeader = isLeader();
328         LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());
329
330         // Examine current RAFT state to see if we are in jeopardy, potentially notifying all listeners
331         final boolean inJeopardy = inJeopardy(getRaftState());
332         final boolean wasInJeopardy = listenerSupport.setInJeopardy(inJeopardy);
333         if (inJeopardy != wasInJeopardy) {
334             LOG.debug("{}: {} jeopardy state, notifying all listeners", persistenceId(),
335                 inJeopardy ? "entered" : "left");
336             notifyAllListeners();
337         }
338
339         commitCoordinator.onStateChanged(this, isLeader);
340
341         super.onStateChanged();
342     }
343
344     @Override
345     protected void onLeaderChanged(final String oldLeader, final String newLeader) {
346         boolean isLeader = isLeader();
347         LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
348                 newLeader, isLeader);
349
350         if (isLeader) {
351
352             // Re-initialize the downPeerMemberNames from the current akka Cluster state. The previous leader, if any,
353             // is most likely down however it's possible we haven't received the PeerDown message yet.
354             initializeDownPeerMemberNamesFromClusterState();
355
356             // Clear all existing strategies so that they get re-created when we call createStrategy again
357             // This allows the strategies to be re-initialized with existing statistics maintained by
358             // EntityOwnershipStatistics
359             strategyConfig.clearStrategies();
360
361             // Re-assign owners for all members that are known to be down. In a cluster which has greater than
362             // 3 nodes it is possible for some node beside the leader being down when the leadership transitions
363             // it makes sense to use this event to re-assign owners for those downed nodes.
364             Set<String> ownedBy = new HashSet<>(downPeerMemberNames.size() + 1);
365             for (MemberName downPeerName : downPeerMemberNames) {
366                 ownedBy.add(downPeerName.getName());
367             }
368
369             // Also try to assign owners for entities that have no current owner. See explanation in onPeerUp.
370             ownedBy.add("");
371             selectNewOwnerForEntitiesOwnedBy(ownedBy);
372         } else {
373             // The leader changed - notify the coordinator to check if pending modifications need to be sent.
374             // While onStateChanged also does this, this method handles the case where the shard hears from a
375             // leader and stays in the follower state. In that case no behavior state change occurs.
376             commitCoordinator.onStateChanged(this, isLeader);
377         }
378
379         super.onLeaderChanged(oldLeader, newLeader);
380     }
381
382     @Override
383     protected void onVotingStateChangeComplete() {
384         // Re-evaluate ownership for all entities - if a member changed from voting to non-voting it should lose
385         // ownership and vice versa it now is a candidate to become owner.
386         final List<Modification> modifications = new ArrayList<>();
387         searchForEntities((entityTypeNode, entityNode) -> {
388             YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH)
389                     .node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier())
390                     .node(ENTITY_OWNER_NODE_ID).build();
391
392             Optional<String> possibleOwner =
393                     entityNode.getChild(ENTITY_OWNER_NODE_ID).map(node -> node.getValue().toString());
394             String newOwner = newOwner(possibleOwner.orElse(null), getCandidateNames(entityNode),
395                     getEntityOwnerElectionStrategy(entityPath));
396
397             if (!newOwner.equals(possibleOwner.orElse(""))) {
398                 modifications.add(new WriteModification(entityPath,
399                         ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
400             }
401         });
402
403         commitCoordinator.commitModifications(modifications, this);
404     }
405
406     private void initializeDownPeerMemberNamesFromClusterState() {
407         Optional<Cluster> cluster = getRaftActorContext().getCluster();
408         if (!cluster.isPresent()) {
409             return;
410         }
411
412         CurrentClusterState state = cluster.get().state();
413         Set<Member> unreachable = state.getUnreachable();
414
415         LOG.debug(
416             "{}: initializeDownPeerMemberNamesFromClusterState - current downPeerMemberNames: {}, unreachable: {}",
417             persistenceId(), downPeerMemberNames, unreachable);
418
419         downPeerMemberNames.clear();
420         for (Member m: unreachable) {
421             downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
422         }
423
424         for (Member m: state.getMembers()) {
425             if (m.status() != MemberStatus.up() && m.status() != MemberStatus.weaklyUp()) {
426                 LOG.debug("{}: Adding down member with status {}", persistenceId(), m.status());
427                 downPeerMemberNames.add(MemberName.forName(m.getRoles().iterator().next()));
428             }
429         }
430
431         LOG.debug("{}: new downPeerMemberNames: {}", persistenceId(), downPeerMemberNames);
432     }
433
434     private void onCandidateRemoved(final CandidateRemoved message) {
435         LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
436
437         if (isLeader()) {
438             String currentOwner = getCurrentOwner(message.getEntityPath());
439             writeNewOwner(message.getEntityPath(),
440                     newOwner(currentOwner, message.getRemainingCandidates(),
441                             getEntityOwnerElectionStrategy(message.getEntityPath())));
442         }
443     }
444
445     private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(final YangInstanceIdentifier entityPath) {
446         final String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath);
447         return strategyConfig.createStrategy(entityType, entityOwnershipStatistics.byEntityType(entityType));
448     }
449
450     private void onCandidateAdded(final CandidateAdded message) {
451         if (!isLeader()) {
452             return;
453         }
454
455         LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
456
457         // Since a node's candidate member is only added by the node itself, we can assume the node is up so
458         // remove it from the downPeerMemberNames.
459         downPeerMemberNames.remove(MemberName.forName(message.getNewCandidate()));
460
461         final String currentOwner = getCurrentOwner(message.getEntityPath());
462         final EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath());
463
464         // Available members is all the known peers - the number of peers that are down + self
465         // So if there are 2 peers and 1 is down then availableMembers will be 2
466         final int availableMembers = getRaftActorContext().getPeerIds().size() - downPeerMemberNames.size() + 1;
467
468         LOG.debug("{}: Using strategy {} to select owner, currentOwner = {}", persistenceId(), strategy, currentOwner);
469
470         if (strategy.getSelectionDelayInMillis() == 0L) {
471             writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
472                     strategy));
473         } else if (message.getAllCandidates().size() == availableMembers) {
474             LOG.debug("{}: Received the maximum candidates requests : {} writing new owner",
475                     persistenceId(), availableMembers);
476             cancelOwnerSelectionTask(message.getEntityPath());
477             writeNewOwner(message.getEntityPath(), newOwner(currentOwner, message.getAllCandidates(),
478                     strategy));
479         } else {
480             scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
481         }
482     }
483
484     private void onPeerDown(final PeerDown peerDown) {
485         LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
486
487         MemberName downMemberName = peerDown.getMemberName();
488         if (downPeerMemberNames.add(downMemberName) && isLeader()) {
489             // Select new owners for entities owned by the down peer and which have other candidates. For an entity for
490             // which the down peer is the only candidate, we leave it as the owner and don't clear it. This is done to
491             // handle the case where the peer member process is actually still running but the node is partitioned.
492             // When the partition is healed, the peer just remains as the owner. If the peer process actually restarted,
493             // it will first remove all its candidates on startup. If another candidate is registered during the time
494             // the peer is down, the new candidate will be selected as the new owner.
495
496             selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(downMemberName.getName()));
497         }
498     }
499
500     private void selectNewOwnerForEntitiesOwnedBy(final Set<String> ownedBy) {
501         final List<Modification> modifications = new ArrayList<>();
502         searchForEntitiesOwnedBy(ownedBy, (entityTypeNode, entityNode) -> {
503             YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH)
504                     .node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier())
505                     .node(ENTITY_OWNER_NODE_ID).build();
506             String newOwner = newOwner(getCurrentOwner(entityPath), getCandidateNames(entityNode),
507                     getEntityOwnerElectionStrategy(entityPath));
508
509             if (!newOwner.isEmpty()) {
510                 LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
511
512                 modifications.add(new WriteModification(entityPath,
513                     ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
514
515             } else {
516                 LOG.debug("{}: Found entity {} but no other candidates - not clearing owner", persistenceId(),
517                         entityPath);
518             }
519         });
520
521         commitCoordinator.commitModifications(modifications, this);
522     }
523
524     private void onPeerUp(final PeerUp peerUp) {
525         LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
526
527         downPeerMemberNames.remove(peerUp.getMemberName());
528
529         // Notify the coordinator to check if pending modifications need to be sent. We do this here
530         // to handle the case where the leader's peer address isn't known yet when a prior state or
531         // leader change occurred.
532         commitCoordinator.onStateChanged(this, isLeader());
533
534         if (isLeader()) {
535             // Try to assign owners for entities that have no current owner. It's possible the peer that is now up
536             // had previously registered as a candidate and was the only candidate but the owner write tx couldn't be
537             // committed due to a leader change. Eg, the leader is able to successfully commit the candidate add tx but
538             // becomes isolated before it can commit the owner change and switches to follower. The majority partition
539             // with a new leader has the candidate but the entity has no owner. When the partition is healed and the
540             // previously isolated leader reconnects, we'll receive onPeerUp and, if there's still no owner, the
541             // previous leader will gain ownership.
542             selectNewOwnerForEntitiesOwnedBy(ImmutableSet.of(""));
543         }
544     }
545
546     private static Collection<String> getCandidateNames(final MapEntryNode entity) {
547         return entity.getChild(CANDIDATE_NODE_ID).map(child -> {
548             Collection<MapEntryNode> candidates = ((MapNode) child).getValue();
549             Collection<String> candidateNames = new ArrayList<>(candidates.size());
550             for (MapEntryNode candidate: candidates) {
551                 candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
552             }
553             return candidateNames;
554         }).orElse(ImmutableList.of());
555     }
556
557     private void searchForEntitiesOwnedBy(final Set<String> ownedBy, final EntityWalker walker) {
558         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), ownedBy);
559
560         searchForEntities((entityTypeNode, entityNode) -> {
561             Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
562                     entityNode.getChild(ENTITY_OWNER_NODE_ID);
563             String currentOwner = possibleOwner.isPresent() ? possibleOwner.get().getValue().toString() : "";
564             if (ownedBy.contains(currentOwner)) {
565                 walker.onEntity(entityTypeNode, entityNode);
566             }
567         });
568     }
569
570     private void removeCandidateFromEntities(final MemberName member) {
571         final List<Modification> modifications = new ArrayList<>();
572         searchForEntities((entityTypeNode, entityNode) -> {
573             if (hasCandidate(entityNode, member)) {
574                 YangInstanceIdentifier entityId =
575                         (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
576                 YangInstanceIdentifier candidatePath = candidatePath(
577                         entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
578                         entityId, member.getName());
579
580                 LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
581                         member, candidatePath);
582
583                 modifications.add(new DeleteModification(candidatePath));
584             }
585         });
586
587         commitCoordinator.commitModifications(modifications, this);
588     }
589
590     private static boolean hasCandidate(final MapEntryNode entity, final MemberName candidateName) {
591         return entity.getChild(CANDIDATE_NODE_ID)
592                 .flatMap(child -> ((MapNode)child).getChild(candidateNodeKey(candidateName.getName())))
593                 .isPresent();
594     }
595
596     private void searchForEntities(final EntityWalker walker) {
597         Optional<NormalizedNode<?, ?>> possibleEntityTypes = getDataStore().readNode(ENTITY_TYPES_PATH);
598         if (!possibleEntityTypes.isPresent()) {
599             return;
600         }
601
602         for (MapEntryNode entityType : ((MapNode) possibleEntityTypes.get()).getValue()) {
603             Optional<DataContainerChild<?, ?>> possibleEntities = entityType.getChild(ENTITY_NODE_ID);
604             if (!possibleEntities.isPresent()) {
605                 // shouldn't happen but handle anyway
606                 continue;
607             }
608
609             for (MapEntryNode entity:  ((MapNode) possibleEntities.get()).getValue()) {
610                 walker.onEntity(entityType, entity);
611             }
612         }
613     }
614
615     private void writeNewOwner(final YangInstanceIdentifier entityPath, final String newOwner) {
616         LOG.debug("{}: Writing new owner {} for entity {}", persistenceId(), newOwner, entityPath);
617
618         commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
619                 ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
620     }
621
622     /**
623      * Schedule a new owner selection job. Cancelling any outstanding job if it has not been cancelled.
624      */
625     private void scheduleOwnerSelection(final YangInstanceIdentifier entityPath, final Collection<String> allCandidates,
626                                        final EntityOwnerSelectionStrategy strategy) {
627         cancelOwnerSelectionTask(entityPath);
628
629         LOG.debug("{}: Scheduling owner selection after {} ms", persistenceId(), strategy.getSelectionDelayInMillis());
630
631         final Cancellable lastScheduledTask = context().system().scheduler().scheduleOnce(
632                 FiniteDuration.apply(strategy.getSelectionDelayInMillis(), TimeUnit.MILLISECONDS), self(),
633                 new SelectOwner(entityPath, allCandidates, strategy), context().system().dispatcher(), self());
634
635         entityToScheduledOwnershipTask.put(entityPath, lastScheduledTask);
636     }
637
638     private void cancelOwnerSelectionTask(final YangInstanceIdentifier entityPath) {
639         final Cancellable lastScheduledTask = entityToScheduledOwnershipTask.get(entityPath);
640         if (lastScheduledTask != null && !lastScheduledTask.isCancelled()) {
641             lastScheduledTask.cancel();
642         }
643     }
644
645     private String newOwner(final String currentOwner, final Collection<String> candidates,
646             final EntityOwnerSelectionStrategy ownerSelectionStrategy) {
647         Collection<String> viableCandidates = getViableCandidates(candidates);
648         if (viableCandidates.isEmpty()) {
649             return "";
650         }
651         return ownerSelectionStrategy.newOwner(currentOwner, viableCandidates);
652     }
653
654     private Collection<String> getViableCandidates(final Collection<String> candidates) {
655         Map<MemberName, VotingState> memberToVotingState = new HashMap<>();
656         getRaftActorContext().getPeers().forEach(peerInfo -> memberToVotingState.put(
657                 ShardIdentifier.fromShardIdString(peerInfo.getId()).getMemberName(), peerInfo.getVotingState()));
658
659         Collection<String> viableCandidates = new ArrayList<>();
660
661         for (String candidate : candidates) {
662             MemberName memberName = MemberName.forName(candidate);
663             if (memberToVotingState.get(memberName) != VotingState.NON_VOTING
664                     && !downPeerMemberNames.contains(memberName)) {
665                 viableCandidates.add(candidate);
666             }
667         }
668         return viableCandidates;
669     }
670
671     private String getCurrentOwner(final YangInstanceIdentifier entityId) {
672         return getDataStore().readNode(entityId.node(ENTITY_OWNER_QNAME))
673                 .map(owner -> owner.getValue().toString())
674                 .orElse(null);
675     }
676
677     @FunctionalInterface
678     private interface EntityWalker {
679         void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
680     }
681
682     public static Builder newBuilder() {
683         return new Builder();
684     }
685
686     static class Builder extends Shard.AbstractBuilder<Builder, EntityOwnershipShard> {
687         private MemberName localMemberName;
688         private EntityOwnerSelectionStrategyConfig ownerSelectionStrategyConfig;
689
690         protected Builder() {
691             super(EntityOwnershipShard.class);
692         }
693
694         Builder localMemberName(final MemberName newLocalMemberName) {
695             checkSealed();
696             this.localMemberName = newLocalMemberName;
697             return this;
698         }
699
700         Builder ownerSelectionStrategyConfig(final EntityOwnerSelectionStrategyConfig newOwnerSelectionStrategyConfig) {
701             checkSealed();
702             this.ownerSelectionStrategyConfig = newOwnerSelectionStrategyConfig;
703             return this;
704         }
705
706         @Override
707         protected void verify() {
708             super.verify();
709             Preconditions.checkNotNull(localMemberName, "localMemberName should not be null");
710             Preconditions.checkNotNull(ownerSelectionStrategyConfig, "ownerSelectionStrategyConfig should not be null");
711         }
712     }
713 }