+ private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
+ Optional<NormalizedNode<?, ?>> possibleEntityTypes = getDataStore().readNode(ENTITY_TYPES_PATH);
+ if(!possibleEntityTypes.isPresent()) {
+ return;
+ }
+
+ LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
+
+ searchForEntities(new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
+ entityNode.getChild(ENTITY_OWNER_NODE_ID);
+ if (possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
+ walker.onEntity(entityTypeNode, entityNode);
+ }
+ }
+ });
+ }
+
+ private void searchForEntities(EntityWalker walker) {
+ Optional<NormalizedNode<?, ?>> possibleEntityTypes = getDataStore().readNode(ENTITY_TYPES_PATH);
+ if(!possibleEntityTypes.isPresent()) {
+ return;
+ }
+
+ for(MapEntryNode entityType: ((MapNode) possibleEntityTypes.get()).getValue()) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
+ entityType.getChild(ENTITY_NODE_ID);
+ if(!possibleEntities.isPresent()) {
+ continue; // shouldn't happen but handle anyway
+ }
+
+ for(MapEntryNode entity: ((MapNode) possibleEntities.get()).getValue()) {
+ walker.onEntity(entityType, entity);
+ }
+ }
+ }
+
+ private Collection<String> getCandidateNames(MapEntryNode entity) {
+ Collection<MapEntryNode> candidates = ((MapNode) entity.getChild(CANDIDATE_NODE_ID).get()).getValue();
+ Collection<String> candidateNames = new ArrayList<>(candidates.size());
+ for(MapEntryNode candidate: candidates) {
+ candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
+ }
+
+ return candidateNames;
+ }
+
+ private void writeNewOwner(YangInstanceIdentifier entityPath, String newOwner) {
+ LOG.debug("{}: Writing new owner {} for entity {}", persistenceId(), newOwner, entityPath);
+
+ commitCoordinator.commitModification(new WriteModification(entityPath.node(ENTITY_OWNER_QNAME),
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
+ }
+
+ /**
+ * Schedule a new owner selection job. Cancelling any outstanding job if it has not been cancelled.
+ *
+ * @param entityPath
+ * @param allCandidates
+ */
+ public void scheduleOwnerSelection(YangInstanceIdentifier entityPath, Collection<String> allCandidates,
+ EntityOwnerSelectionStrategy strategy){
+ Cancellable lastScheduledTask = entityToScheduledOwnershipTask.get(entityPath);
+ if(lastScheduledTask != null && !lastScheduledTask.isCancelled()){
+ lastScheduledTask.cancel();
+ }
+ lastScheduledTask = context().system().scheduler().scheduleOnce(
+ FiniteDuration.apply(strategy.getSelectionDelayInMillis(), TimeUnit.MILLISECONDS)
+ , self(), new SelectOwner(entityPath, allCandidates, strategy)
+ , context().system().dispatcher(), self());
+
+ entityToScheduledOwnershipTask.put(entityPath, lastScheduledTask);
+ }
+
+ private String newOwner(Collection<String> candidates, EntityOwnerSelectionStrategy ownerSelectionStrategy) {
+ Collection<String> viableCandidates = getViableCandidates(candidates);
+ if(viableCandidates.size() == 0){
+ return "";
+ }
+ return ownerSelectionStrategy.newOwner(viableCandidates);
+ }
+
+ private Collection<String> getViableCandidates(Collection<String> candidates) {
+ Collection<String> viableCandidates = new ArrayList<>();
+
+ for (String candidate : candidates) {
+ if (!downPeerMemberNames.contains(candidate)) {
+ viableCandidates.add(candidate);
+ }
+ }
+ return viableCandidates;
+ }
+
+ private String getCurrentOwner(YangInstanceIdentifier entityId) {
+ Optional<NormalizedNode<?, ?>> optionalEntityOwner = getDataStore().readNode(entityId.node(ENTITY_OWNER_QNAME));
+ if(optionalEntityOwner.isPresent()){
+ return optionalEntityOwner.get().getValue().toString();
+ }
+ return null;
+ }
+
+ private static interface EntityWalker {
+ void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ static class Builder extends Shard.AbstractBuilder<Builder, EntityOwnershipShard> {
+ private String localMemberName;
+ private EntityOwnerSelectionStrategyConfig ownerSelectionStrategyConfig;
+
+ protected Builder() {
+ super(EntityOwnershipShard.class);
+ }
+
+ Builder localMemberName(String localMemberName) {
+ checkSealed();