X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fentityownership%2FEntityOwnershipShard.java;h=ba7512830ea1b8b65a5343946c6098095bf363f0;hb=a623206f49e3c376e1a8494ba584ea0018468f12;hp=334e093d7f4babfafef4174275d7c32092decbe1;hpb=e9cceae4b6689c7c3880b441c13d35cbe4b13ce0;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 334e093d7f..ba7512830e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -25,8 +25,8 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate; import akka.actor.ActorRef; import akka.actor.ActorSelection; +import akka.actor.Cancellable; import akka.pattern.Patterns; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -47,8 +47,7 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Se import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy; -import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyWrapper; -import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.FirstCandidateSelectionStrategy; +import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.PeerDown; @@ -66,6 +65,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Special Shard for EntityOwnership. @@ -78,8 +78,8 @@ class EntityOwnershipShard extends Shard { private final EntityOwnershipListenerSupport listenerSupport; private final Set downPeerMemberNames = new HashSet<>(); private final Map peerIdToMemberNames = new HashMap<>(); - private final Map ownerSelectionStrategies = new HashMap<>(); - private final EntityOwnerSelectionStrategyWrapper defaultEntityOwnerSelectionStrategy; + private final EntityOwnerSelectionStrategyConfig strategyConfig; + private final Map entityToScheduledOwnershipTask = new HashMap<>(); private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) { return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build(); @@ -90,8 +90,7 @@ class EntityOwnershipShard extends Shard { this.localMemberName = builder.localMemberName; this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(builder.localMemberName, LOG); this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId()); - this.defaultEntityOwnerSelectionStrategy = - createEntityOwnerSelectionStrategyWrapper(FirstCandidateSelectionStrategy.INSTANCE); + this.strategyConfig = builder.ownerSelectionStrategyConfig; for(String peerId: getRaftActorContext().getPeerIds()) { ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build(); @@ -142,6 +141,14 @@ class EntityOwnershipShard extends Shard { if(Strings.isNullOrEmpty(currentOwner)) { writeNewOwner(selectOwner.getEntityPath(), newOwner(selectOwner.getAllCandidates(), selectOwner.getOwnerSelectionStrategy())); + + Cancellable cancellable = entityToScheduledOwnershipTask.get(selectOwner.getEntityPath()); + if(cancellable != null){ + if(!cancellable.isCancelled()){ + cancellable.cancel(); + } + entityToScheduledOwnershipTask.remove(selectOwner.getEntityPath()); + } } } @@ -263,7 +270,7 @@ class EntityOwnershipShard extends Shard { String currentOwner = getCurrentOwner(message.getEntityPath()); if(message.getRemovedCandidate().equals(currentOwner)){ writeNewOwner(message.getEntityPath(), - newOwner(message.getRemainingCandidates(), getEntityOwnerElectionStrategyWrapper(message.getEntityPath()))); + newOwner(message.getRemainingCandidates(), getEntityOwnerElectionStrategy(message.getEntityPath()))); } } else { // We're not the leader. If the removed candidate is our local member then check if we actually @@ -283,16 +290,9 @@ class EntityOwnershipShard extends Shard { } } - private EntityOwnerSelectionStrategyWrapper getEntityOwnerElectionStrategyWrapper(YangInstanceIdentifier entityPath) { - String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath); - EntityOwnerSelectionStrategyWrapper entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType); - - if(entityOwnerSelectionStrategy == null){ - entityOwnerSelectionStrategy = defaultEntityOwnerSelectionStrategy; - ownerSelectionStrategies.put(entityType, entityOwnerSelectionStrategy); - } - - return entityOwnerSelectionStrategy; + private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(YangInstanceIdentifier entityPath) { + final String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath); + return strategyConfig.createStrategy(entityType); } private void onCandidateAdded(CandidateAdded message) { @@ -307,12 +307,12 @@ class EntityOwnershipShard extends Shard { downPeerMemberNames.remove(message.getNewCandidate()); String currentOwner = getCurrentOwner(message.getEntityPath()); + EntityOwnerSelectionStrategy strategy = getEntityOwnerElectionStrategy(message.getEntityPath()); if(Strings.isNullOrEmpty(currentOwner)){ - EntityOwnerSelectionStrategyWrapper strategy = getEntityOwnerElectionStrategyWrapper(message.getEntityPath()); - if(strategy.selectionDelayInMillis() == 0L) { + if(strategy.getSelectionDelayInMillis() == 0L) { writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(), strategy)); } else { - strategy.scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates()); + scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates(), strategy); } } } @@ -344,7 +344,7 @@ class EntityOwnershipShard extends Shard { node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()). node(ENTITY_OWNER_NODE_ID).build(); - Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategyWrapper(entityPath)); + Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategy(entityPath)); LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner); @@ -396,7 +396,7 @@ class EntityOwnershipShard extends Shard { public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) { Optional> possibleOwner = entityNode.getChild(ENTITY_OWNER_NODE_ID); - if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) { + if (possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) { walker.onEntity(entityTypeNode, entityNode); } } @@ -422,8 +422,8 @@ class EntityOwnershipShard extends Shard { } } - private static Collection getCandidateNames(MapEntryNode entity) { - Collection candidates = ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getValue(); + private Collection getCandidateNames(MapEntryNode entity) { + Collection candidates = ((MapNode) entity.getChild(CANDIDATE_NODE_ID).get()).getValue(); Collection candidateNames = new ArrayList<>(candidates.size()); for(MapEntryNode candidate: candidates) { candidateNames.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString()); @@ -439,6 +439,26 @@ class EntityOwnershipShard extends Shard { ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this); } + /** + * Schedule a new owner selection job. Cancelling any outstanding job if it has not been cancelled. + * + * @param entityPath + * @param allCandidates + */ + public void scheduleOwnerSelection(YangInstanceIdentifier entityPath, Collection allCandidates, + EntityOwnerSelectionStrategy strategy){ + Cancellable lastScheduledTask = entityToScheduledOwnershipTask.get(entityPath); + if(lastScheduledTask != null && !lastScheduledTask.isCancelled()){ + lastScheduledTask.cancel(); + } + lastScheduledTask = context().system().scheduler().scheduleOnce( + FiniteDuration.apply(strategy.getSelectionDelayInMillis(), TimeUnit.MILLISECONDS) + , self(), new SelectOwner(entityPath, allCandidates, strategy) + , context().system().dispatcher(), self()); + + entityToScheduledOwnershipTask.put(entityPath, lastScheduledTask); + } + private String newOwner(Collection candidates, EntityOwnerSelectionStrategy ownerSelectionStrategy) { Collection viableCandidates = getViableCandidates(candidates); if(viableCandidates.size() == 0){ @@ -470,28 +490,13 @@ class EntityOwnershipShard extends Shard { void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode); } - private EntityOwnerSelectionStrategyWrapper createEntityOwnerSelectionStrategyWrapper(EntityOwnerSelectionStrategy entityOwnerSelectionStrategy){ - return new EntityOwnerSelectionStrategyWrapper(context().system().scheduler(), self(), - context().system().dispatcher(), entityOwnerSelectionStrategy); - } - - @VisibleForTesting - void addEntityOwnerSelectionStrategy(String entityType, Class ownerSelectionStrategyClass){ - try { - EntityOwnerSelectionStrategyWrapper strategy = - createEntityOwnerSelectionStrategyWrapper(ownerSelectionStrategyClass.newInstance()); - ownerSelectionStrategies.put(entityType, strategy); - } catch (InstantiationException | IllegalAccessException e) { - LOG.error("Exception occurred when adding election strategy", e); - } - } - public static Builder newBuilder() { return new Builder(); } static class Builder extends Shard.AbstractBuilder { private String localMemberName; + private EntityOwnerSelectionStrategyConfig ownerSelectionStrategyConfig; protected Builder() { super(EntityOwnershipShard.class); @@ -503,10 +508,17 @@ class EntityOwnershipShard extends Shard { return this; } + Builder ownerSelectionStrategyConfig(EntityOwnerSelectionStrategyConfig ownerSelectionStrategyConfig){ + checkSealed(); + this.ownerSelectionStrategyConfig = ownerSelectionStrategyConfig; + return this; + } + @Override protected void verify() { super.verify(); Preconditions.checkNotNull(localMemberName, "localMemberName should not be null"); + Preconditions.checkNotNull(ownerSelectionStrategyConfig, "ownerSelectionStrategyConfig should not be null"); } } }