+ @Override
+ protected void onRecoveryComplete() {
+ super.onRecoveryComplete();
+
+ new CandidateListChangeListener(getSelf(), persistenceId()).init(getDataStore());
+ new EntityOwnerChangeListener(localMemberName, listenerSupport).init(getDataStore());
+ }
+
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ if(message instanceof RegisterCandidateLocal) {
+ onRegisterCandidateLocal((RegisterCandidateLocal) message);
+ } else if(message instanceof UnregisterCandidateLocal) {
+ onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
+ } else if(message instanceof CandidateAdded){
+ onCandidateAdded((CandidateAdded) message);
+ } else if(message instanceof CandidateRemoved){
+ onCandidateRemoved((CandidateRemoved) message);
+ } else if(message instanceof PeerDown) {
+ onPeerDown((PeerDown) message);
+ } else if(message instanceof PeerUp) {
+ onPeerUp((PeerUp) message);
+ } else if(message instanceof RegisterListenerLocal) {
+ onRegisterListenerLocal((RegisterListenerLocal)message);
+ } else if(message instanceof UnregisterListenerLocal) {
+ onUnregisterListenerLocal((UnregisterListenerLocal) message);
+ } else if(message instanceof SelectOwner) {
+ onSelectOwner((SelectOwner) message);
+ } else if(!commitCoordinator.handleMessage(message, this)) {
+ super.onReceiveCommand(message);
+ }
+ }
+
+ private void onSelectOwner(SelectOwner selectOwner) {
+ String currentOwner = getCurrentOwner(selectOwner.getEntityPath());
+ if(Strings.isNullOrEmpty(currentOwner)) {
+ String entityType = EntityOwnersModel.entityTypeFromEntityPath(selectOwner.getEntityPath());
+ writeNewOwner(selectOwner.getEntityPath(), newOwner(selectOwner.getAllCandidates(),
+ entityOwnershipStatistics.byEntityType(entityType),
+ selectOwner.getOwnerSelectionStrategy()));
+
+ Cancellable cancellable = entityToScheduledOwnershipTask.get(selectOwner.getEntityPath());
+ if(cancellable != null){
+ if(!cancellable.isCancelled()){
+ cancellable.cancel();
+ }
+ entityToScheduledOwnershipTask.remove(selectOwner.getEntityPath());
+ }
+ }
+ }
+
+ private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
+ LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
+
+ listenerSupport.setHasCandidateForEntity(registerCandidate.getEntity());
+
+ NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
+ registerCandidate.getEntity().getId(), localMemberName);
+ commitCoordinator.commitModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners), this);
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
+ private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
+ LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
+
+ Entity entity = unregisterCandidate.getEntity();
+ listenerSupport.unsetHasCandidateForEntity(entity);
+
+ YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
+ commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
+ private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
+ LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
+
+ listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+
+ searchForEntitiesOwnedBy(localMemberName, new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
+ entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+ String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
+ if (registerListener.getEntityType().equals(entityType)) {
+ Entity entity = new Entity(entityType,
+ (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+ listenerSupport.notifyEntityOwnershipListener(entity, false, true, true, registerListener.getListener());
+ }
+ }
+ });
+ }
+
+ private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) {
+ LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
+
+ listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener());
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
+ void tryCommitModifications(final BatchedModifications modifications) {
+ if(isLeader()) {
+ LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
+ } else {
+ final ActorSelection leader = getLeader();
+ if (leader != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: Sending BatchedModifications {} to leader {}", persistenceId(),
+ modifications.getTransactionID(), leader);
+ }
+
+ Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
+ getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
+
+ Patterns.pipe(future, getContext().dispatcher()).pipeTo(getSelf(), ActorRef.noSender());
+ }
+ }
+ }
+
+ boolean hasLeader() {
+ return getLeader() != null && !isIsolatedLeader();
+ }
+
+ @Override
+ protected void onStateChanged() {
+ super.onStateChanged();
+
+ boolean isLeader = isLeader();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("{}: onStateChanged: isLeader: {}, hasLeader: {}", persistenceId(), isLeader, hasLeader());
+ }
+
+ commitCoordinator.onStateChanged(this, isLeader);
+ }
+
+ @Override
+ protected void onLeaderChanged(String oldLeader, String newLeader) {
+ super.onLeaderChanged(oldLeader, newLeader);
+
+ LOG.debug("{}: onLeaderChanged: oldLeader: {}, newLeader: {}, isLeader: {}", persistenceId(), oldLeader,
+ newLeader, isLeader());
+
+ if(isLeader()) {
+ // We were just elected leader. If the old leader is down, select new owners for the entities
+ // owned by the down leader.
+
+ String oldLeaderMemberName = peerIdToMemberNames.get(oldLeader);
+
+ LOG.debug("{}: oldLeaderMemberName: {}", persistenceId(), oldLeaderMemberName);
+
+ if(downPeerMemberNames.contains(oldLeaderMemberName)) {
+ selectNewOwnerForEntitiesOwnedBy(oldLeaderMemberName);
+ }
+ }
+ }
+
+ private void onCandidateRemoved(CandidateRemoved message) {
+ LOG.debug("{}: onCandidateRemoved: {}", persistenceId(), message);
+
+ if(isLeader()) {
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ if(message.getRemovedCandidate().equals(currentOwner) || message.getRemainingCandidates().size() == 0){
+ String entityType = EntityOwnersModel.entityTypeFromEntityPath(message.getEntityPath());
+ writeNewOwner(message.getEntityPath(),
+ newOwner(message.getRemainingCandidates(), entityOwnershipStatistics.byEntityType(entityType),
+ getEntityOwnerElectionStrategy(message.getEntityPath())));
+ }
+ } else {
+ // We're not the leader. If the removed candidate is our local member then check if we actually
+ // have a local candidate registered. If we do then we must have been partitioned from the leader
+ // and the leader removed our candidate since the leader can't tell the difference between a
+ // temporary network partition and a node's process actually restarted. So, in that case, re-add
+ // our candidate.
+ if(localMemberName.equals(message.getRemovedCandidate()) &&
+ listenerSupport.hasCandidateForEntity(createEntity(message.getEntityPath()))) {
+ LOG.debug("Local candidate member was removed but a local candidate is registered for {}" +
+ " - adding back local candidate", message.getEntityPath());
+
+ commitCoordinator.commitModification(new MergeModification(
+ candidatePath(message.getEntityPath(), localMemberName),
+ candidateMapEntry(localMemberName)), this);
+ }
+ }
+ }
+
+ private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(YangInstanceIdentifier entityPath) {
+ final String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath);
+ return strategyConfig.createStrategy(entityType);
+ }
+
+ private void onCandidateAdded(CandidateAdded message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
+
+ // Since a node's candidate member is only added by the node itself, we can assume the node is up so
+ // remove it from the downPeerMemberNames.
+ downPeerMemberNames.remove(message.getNewCandidate());
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath());
+ if(Strings.isNullOrEmpty(currentOwner)){
+ if(strategy.getSelectionDelayInMillis() == 0L) {
+ String entityType = EntityOwnersModel.entityTypeFromEntityPath(message.getEntityPath());
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(),
+ entityOwnershipStatistics.byEntityType(entityType), strategy));
+ } else {
+ scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
+ }
+ }
+ }
+
+ private void onPeerDown(PeerDown peerDown) {
+ LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
+
+ String downMemberName = peerDown.getMemberName();
+ if(downPeerMemberNames.add(downMemberName) && isLeader()) {
+ // Remove the down peer as a candidate from all entities.
+ removeCandidateFromEntities(downMemberName);
+ }