import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
import akka.actor.ActorRef;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
onPeerDown((PeerDown) message);
} else if(message instanceof PeerUp) {
onPeerUp((PeerUp) message);
+ } if(message instanceof RegisterListenerLocal) {
+ onRegisterListenerLocal((RegisterListenerLocal)message);
+ } if(message instanceof UnregisterListenerLocal) {
+ onUnregisterListenerLocal((UnregisterListenerLocal)message);
} else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
+ private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
+ LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
+
+ listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+
+ searchForEntitiesOwnedBy(localMemberName, new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
+ entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+ String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
+ if(registerListener.getEntityType().equals(entityType)) {
+ Entity entity = new Entity(entityType,
+ (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+ listenerSupport.notifyEntityOwnershipListener(entity, false, true, registerListener.getListener());
+ }
+ }
+ });
+ }
+
+ private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) {
+ LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
+
+ listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener());
+
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
void tryCommitModifications(final BatchedModifications modifications) {
if(isLeader()) {
LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
}
private void selectNewOwnerForEntitiesOwnedBy(String owner) {
+ final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+ searchForEntitiesOwnedBy(owner, new EntityWalker() {
+ @Override
+ public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+ Object newOwner = newOwner(getCandidateNames(entityNode));
+ YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+ node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+ node(ENTITY_OWNER_NODE_ID).build();
+
+ LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+ modifications.addModification(new WriteModification(entityPath,
+ ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+ }
+ });
+
+ commitCoordinator.commitModifications(modifications, this);
+ }
+
+ private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) {
DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
if(!possibleEntityTypes.isPresent()) {
LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
- BatchedModifications modifications = commitCoordinator.newBatchedModifications();
for(MapEntryNode entityType: ((MapNode) possibleEntityTypes.get()).getValue()) {
Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
entityType.getChild(ENTITY_NODE_ID);
Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
entity.getChild(ENTITY_OWNER_NODE_ID);
if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
- Object newOwner = newOwner(getCandidateNames(entity));
- YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
- node(entityType.getIdentifier()).node(ENTITY_NODE_ID).node(entity.getIdentifier()).
- node(ENTITY_OWNER_NODE_ID).build();
-
- LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
-
- modifications.addModification(new WriteModification(entityPath,
- ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+ walker.onEntity(entityType, entity);
}
}
}
-
- commitCoordinator.commitModifications(modifications, this);
}
private Collection<String> getCandidateNames(MapEntryNode entity) {
return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
}
}
+
+ private static interface EntityWalker {
+ void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+ }
}