import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.FirstCandidateSelectionStrategy;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
* @author Thomas Pantelis
*/
class EntityOwnershipShard extends Shard {
+
+ private static final EntityOwnerSelectionStrategy DEFAULT_ENTITY_OWNER_SELECTION_STRATEGY
+ = FirstCandidateSelectionStrategy.INSTANCE;
+
private final String localMemberName;
private final EntityOwnershipShardCommitCoordinator commitCoordinator;
private final EntityOwnershipListenerSupport listenerSupport;
private final Set<String> downPeerMemberNames = new HashSet<>();
private final Map<String, String> peerIdToMemberNames = new HashMap<>();
+ private final Map<String, EntityOwnerSelectionStrategy> ownerSelectionStrategies = new HashMap<>();
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
- if(registerListener.getEntityType().equals(entityType)) {
+ if (registerListener.getEntityType().equals(entityType)) {
Entity entity = new Entity(entityType,
(YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
listenerSupport.notifyEntityOwnershipListener(entity, false, true, true, registerListener.getListener());
if(isLeader()) {
String currentOwner = getCurrentOwner(message.getEntityPath());
if(message.getRemovedCandidate().equals(currentOwner)){
- writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+ writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates(),
+ getEntityOwnerElectionStrategy(message.getEntityPath())));
}
} else {
// We're not the leader. If the removed candidate is our local member then check if we actually
}
}
+ private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(YangInstanceIdentifier entityPath) {
+ String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath);
+ EntityOwnerSelectionStrategy entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType);
+
+ if(entityOwnerSelectionStrategy == null){
+ entityOwnerSelectionStrategy = DEFAULT_ENTITY_OWNER_SELECTION_STRATEGY;
+ ownerSelectionStrategies.put(entityType, entityOwnerSelectionStrategy);
+ }
+
+ return entityOwnerSelectionStrategy;
+ }
+
private void onCandidateAdded(CandidateAdded message) {
if(!isLeader()){
return;
String currentOwner = getCurrentOwner(message.getEntityPath());
if(Strings.isNullOrEmpty(currentOwner)){
- writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
+ EntityOwnerSelectionStrategy entityOwnerSelectionStrategy
+ = getEntityOwnerElectionStrategy(message.getEntityPath());
+ if(entityOwnerSelectionStrategy.selectionDelayInMillis() == 0L) {
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(),
+ entityOwnerSelectionStrategy));
+ } else {
+ throw new UnsupportedOperationException("Delayed selection not implemented yet");
+ }
}
}
searchForEntitiesOwnedBy(owner, new EntityWalker() {
@Override
public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
- Object newOwner = newOwner(getCandidateNames(entityNode));
+
YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
node(ENTITY_OWNER_NODE_ID).build();
+ Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategy(entityPath));
+
LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
modifications.addModification(new WriteModification(entityPath,
ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
}
- private String newOwner(Collection<String> candidates) {
- for(String candidate: candidates) {
- if(!downPeerMemberNames.contains(candidate)) {
- return candidate;
- }
+ private String newOwner(Collection<String> candidates, EntityOwnerSelectionStrategy ownerSelectionStrategy) {
+ Collection<String> viableCandidates = getViableCandidates(candidates);
+ if(viableCandidates.size() == 0){
+ return "";
}
+ return ownerSelectionStrategy.newOwner(viableCandidates);
+ }
- return "";
+ private Collection<String> getViableCandidates(Collection<String> candidates) {
+ Collection<String> viableCandidates = new ArrayList<>();
+
+ for (String candidate : candidates) {
+ if (!downPeerMemberNames.contains(candidate)) {
+ viableCandidates.add(candidate);
+ }
+ }
+ return viableCandidates;
}
private String getCurrentOwner(YangInstanceIdentifier entityId) {