+ private void onCandidateAdded(CandidateAdded message) {
+ if(!isLeader()){
+ return;
+ }
+
+ LOG.debug("{}: onCandidateAdded: {}", persistenceId(), message);
+
+ // Since a node's candidate member is only added by the node itself, we can assume the node is up so
+ // remove it from the downPeerMemberNames.
+ downPeerMemberNames.remove(message.getNewCandidate());
+
+ String currentOwner = getCurrentOwner(message.getEntityPath());
+ EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath());
+ if(Strings.isNullOrEmpty(currentOwner)){
+ if(strategy.getSelectionDelayInMillis() == 0L) {
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(), strategy));
+ } else {
+ scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy);
+ }
+ }
+ }
+
+ private void onPeerDown(PeerDown peerDown) {
+ LOG.info("{}: onPeerDown: {}", persistenceId(), peerDown);
+
+ String downMemberName = peerDown.getMemberName();
+ if(downPeerMemberNames.add(downMemberName) && isLeader()) {
+ // Remove the down peer as a candidate from all entities.
+ removeCandidateFromEntities(downMemberName);
+ }
+ }
+
+ private void onPeerUp(PeerUp peerUp) {
+ LOG.debug("{}: onPeerUp: {}", persistenceId(), peerUp);
+
+ peerIdToMemberNames.put(peerUp.getPeerId(), peerUp.getMemberName());
+ downPeerMemberNames.remove(peerUp.getMemberName());
+ }
+
+ private void selectNewOwnerForEntitiesOwnedBy(String owner) {
+ final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+ searchForEntitiesOwnedBy(owner, new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+
+ YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+ node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+ node(ENTITY_OWNER_NODE_ID).build();
+
+ Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategy(entityPath));
+
+ LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+ modifications.addModification(new WriteModification(entityPath,
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+ }
+ });
+
+ commitCoordinator.commitModifications(modifications, this);
+ }
+
+ private void removeCandidateFromEntities(final String owner) {
+ final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+ searchForEntities(new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ if (hasCandidate(entityNode, owner)) {
+ YangInstanceIdentifier entityId =
+ (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+ YangInstanceIdentifier candidatePath = candidatePath(
+ entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
+ entityId, owner);
+
+ LOG.info("{}: Found entity {}, removing candidate {}, path {}", persistenceId(), entityId,
+ owner, candidatePath);
+
+ modifications.addModification(new DeleteModification(candidatePath));
+ }
+ }
+ });
+
+ commitCoordinator.commitModifications(modifications, this);
+ }
+
+ private static boolean hasCandidate(MapEntryNode entity, String candidateName) {
+ return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent();
+ }
+
+ private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
+ Optional<NormalizedNode<?, ?>> possibleEntityTypes = getDataStore().readNode(ENTITY_TYPES_PATH);
+ if(!possibleEntityTypes.isPresent()) {
+ return;
+ }
+
+ LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
+
+ searchForEntities(new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+ entityNode.getChild(ENTITY_OWNER_NODE_ID);
+ if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+ walker.onEntity(entityTypeNode, entityNode);
+ }
+ }
+ });
+ }
+
+ private void searchForEntities(EntityWalker walker) {
+ Optional<NormalizedNode<?, ?>> possibleEntityTypes = getDataStore().readNode(ENTITY_TYPES_PATH);
+ if(!possibleEntityTypes.isPresent()) {
+ return;
+ }
+
+ for(MapEntryNode entityType: ((MapNode) possibleEntityTypes.get()).getValue()) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
+ entityType.getChild(ENTITY_NODE_ID);
+ if(!possibleEntities.isPresent()) {
+ continue; // shouldn't happen but handle anyway
+ }
+
+ for(MapEntryNode entity: ((MapNode) possibleEntities.get()).getValue()) {
+ walker.onEntity(entityType, entity);
+ }
+ }
+ }
+
+ private Collection<String> getCandidateNames(MapEntryNode entity) {
+ Collection<MapEntryNode> candidates = ((MapNode) entity.getChild(CANDIDATE_NODE_ID).get()).getValue();
+ Collection<String> candidateNames = new ArrayList<>(candidates.size());
+ for(MapEntryNode candidate: candidates) {
+ candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
+ }
+
+ return candidateNames;
+ }
+
+ private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
+ LOG.debug("{}: Writing new owner {} for entity {}", persistenceId(), newOwner, entityPath);
+
+ commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
+ }
+
+ /**
+ * Schedule a new owner selection job. Cancelling any outstanding job if it has not been cancelled.
+ *
+ * @param entityPath
+ * @param allCandidates
+ */
+ public void scheduleOwnerSelection(YangInstanceIdentifier entityPath, Collection<String> allCandidates,
+ EntityOwnerSelectionStrategy strategy){
+ Cancellable lastScheduledTask = entityToScheduledOwnershipTask.get(entityPath);
+ if(lastScheduledTask != null && !lastScheduledTask.isCancelled()){
+ lastScheduledTask.cancel();
+ }
+ lastScheduledTask = context().system().scheduler().scheduleOnce(
+ FiniteDuration.apply(strategy.getSelectionDelayInMillis(), TimeUnit.MILLISECONDS)
+ , self(), new SelectOwner(entityPath, allCandidates, strategy)
+ , context().system().dispatcher(), self());
+
+ entityToScheduledOwnershipTask.put(entityPath, lastScheduledTask);
+ }
+
+ private String newOwner(Collection<String> candidates, EntityOwnerSelectionStrategy ownerSelectionStrategy) {
+ Collection<String> viableCandidates = getViableCandidates(candidates);
+ if(viableCandidates.size() == 0){
+ return "";
+ }
+ return ownerSelectionStrategy.newOwner(viableCandidates);
+ }
+
+ private Collection<String> getViableCandidates(Collection<String> candidates) {
+ Collection<String> viableCandidates = new ArrayList<>();
+
+ for (String candidate : candidates) {
+ if (!downPeerMemberNames.contains(candidate)) {
+ viableCandidates.add(candidate);
+ }
+ }
+ return viableCandidates;
+ }
+
+ private String getCurrentOwner(YangInstanceIdentifier entityId) {
+ Optional<NormalizedNode<?, ?>> optionalEntityOwner = getDataStore().readNode(entityId.node(ENTITY_OWNER_QNAME));
+ if(optionalEntityOwner.isPresent()){
+ return optionalEntityOwner.get().getValue().toString();
+ }
+ return null;
+ }
+
+ private static interface EntityWalker {
+ void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+ }
+
+ @VisibleForTesting
+ void addEntityOwnerSelectionStrategy(String entityType,
+ Class<? extends EntityOwnerSelectionStrategy> clazz,
+ long delay){
+ EntityOwnerSelectionStrategyConfig config = EntityOwnerSelectionStrategyConfig.newBuilder()
+ .addStrategy(entityType, clazz, delay).build();
+ this.strategyConfig = config;
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ static class Builder extends Shard.AbstractBuilder<Builder, EntityOwnershipShard> {
+ private String localMemberName;
+
+ protected Builder() {
+ super(EntityOwnershipShard.class);
+ }