Bug 4105: Implement EntityOwnershipListener registration/notification
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShard.java
index a71c86dd9f3f165b8e254e3918a0f9dfacdda811..4dfbc87eb9a2b1b7bcb3545e1e67dc2c03304c3a 100644 (file)
@@ -9,11 +9,13 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
@@ -34,7 +36,9 @@ import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
@@ -111,6 +115,10 @@ class EntityOwnershipShard extends Shard {
             onPeerDown((PeerDown) message);
         } else if(message instanceof PeerUp) {
             onPeerUp((PeerUp) message);
+        } if(message instanceof RegisterListenerLocal) {
+            onRegisterListenerLocal((RegisterListenerLocal)message);
+        } if(message instanceof UnregisterListenerLocal) {
+            onUnregisterListenerLocal((UnregisterListenerLocal)message);
         } else if(!commitCoordinator.handleMessage(message, this)) {
             super.onReceiveCommand(message);
         }
@@ -140,6 +148,36 @@ class EntityOwnershipShard extends Shard {
         getSender().tell(SuccessReply.INSTANCE, getSelf());
     }
 
+    private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
+        LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
+
+        listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
+
+        getSender().tell(SuccessReply.INSTANCE, getSelf());
+
+        searchForEntitiesOwnedBy(localMemberName, new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
+                        entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+                String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
+                if(registerListener.getEntityType().equals(entityType)) {
+                    Entity entity = new Entity(entityType,
+                            (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+                    listenerSupport.notifyEntityOwnershipListener(entity, false, true, registerListener.getListener());
+                }
+            }
+        });
+    }
+
+    private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) {
+        LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
+
+        listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener());
+
+        getSender().tell(SuccessReply.INSTANCE, getSelf());
+    }
+
     void tryCommitModifications(final BatchedModifications modifications) {
         if(isLeader()) {
             LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
@@ -247,6 +285,26 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntitiesOwnedBy(owner, new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Object newOwner = newOwner(getCandidateNames(entityNode));
+                YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+                        node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+                        node(ENTITY_OWNER_NODE_ID).build();
+
+                LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+                modifications.addModification(new WriteModification(entityPath,
+                        ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
+    private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) {
         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
         Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
         if(!possibleEntityTypes.isPresent()) {
@@ -255,7 +313,6 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
 
-        BatchedModifications modifications = commitCoordinator.newBatchedModifications();
         for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
                     entityType.getChild(ENTITY_NODE_ID);
@@ -267,20 +324,10 @@ class EntityOwnershipShard extends Shard {
                 Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
                         entity.getChild(ENTITY_OWNER_NODE_ID);
                 if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
-                    Object newOwner = newOwner(getCandidateNames(entity));
-                    YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
-                            node(entityType.getIdentifier()).node(ENTITY_NODE_ID).node(entity.getIdentifier()).
-                                    node(ENTITY_OWNER_NODE_ID).build();
-
-                    LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
-
-                    modifications.addModification(new WriteModification(entityPath,
-                            ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+                    walker.onEntity(entityType, entity);
                 }
             }
         }
-
-        commitCoordinator.commitModifications(modifications, this);
     }
 
     private Collection<String> getCandidateNames(MapEntryNode entity) {
@@ -341,4 +388,8 @@ class EntityOwnershipShard extends Shard {
             return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
         }
     }
+
+    private static interface EntityWalker {
+        void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+    }
 }