Bug 4105: Implement EntityOwnershipListener registration/notification 07/26807/1
authorTom Pantelis <tpanteli@brocade.com>
Mon, 17 Aug 2015 09:57:53 +0000 (05:57 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 19:20:56 +0000 (15:20 -0400)
Change-Id: I49ee7f4b5f48ddde4779d37ba34c88dd776dd47b
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
16 files changed:
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipCandidateRegistration.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipListenerRegistration.java
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipService.java
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterListenerLocal.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterListenerLocal.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java

index 43bf779..c3c43d9 100644 (file)
@@ -8,10 +8,23 @@
 
 package org.opendaylight.controller.md.sal.common.api.clustering;
 
+import javax.annotation.Nonnull;
+import org.opendaylight.yangtools.concepts.ObjectRegistration;
+
 /**
  * An EntityOwnershipCandidateRegistration records a request to register a Candidate for a given Entity. Calling
  * close on the EntityOwnershipCandidateRegistration will remove the Candidate from any future ownership considerations
  * for that Entity and will also remove it as a Listener for ownership status changes.
  */
-public interface EntityOwnershipCandidateRegistration extends EntityOwnershipListenerRegistration {
+public interface EntityOwnershipCandidateRegistration extends ObjectRegistration<EntityOwnershipCandidate> {
+   /**
+    * Returns the entity that the listener was registered for
+    */
+    @Nonnull Entity getEntity();
+
+   /**
+    * Unregister the listener
+    */
+   @Override
+   void close();
 }
index ff9c851..ff7ce55 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.md.sal.common.api.clustering;
 
+import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.concepts.ObjectRegistration;
 
 /**
@@ -18,13 +19,13 @@ import org.opendaylight.yangtools.concepts.ObjectRegistration;
 public interface EntityOwnershipListenerRegistration extends ObjectRegistration<EntityOwnershipListener> {
 
     /**
-     *
-     * @return the entity that the listener was registered for
+     * Return the entity type that the listener was registered for
      */
-    Entity getEntity();
+    @Nonnull String getEntityType();
 
     /**
      * Unregister the listener
      */
+    @Override
     void close();
 }
index fc15dac..52009a2 100644 (file)
@@ -8,6 +8,8 @@
 
 package org.opendaylight.controller.md.sal.common.api.clustering;
 
+import javax.annotation.Nonnull;
+
 /**
  * <p>
  * The EntityOwnershipService provides the means for a component/application to request ownership for a given
@@ -22,26 +24,33 @@ package org.opendaylight.controller.md.sal.common.api.clustering;
 public interface EntityOwnershipService {
 
     /**
-     * Registers as a Candidate that wants to own the given Entity. Only one such request can be made per process.
-     * If multiple requests for registering a Candidate for a given Entity are received in the current process a
-     * CandidateAlreadyRegisteredException will be thrown
+     * Registers a candidate for ownership of the given entity. Only one such request can be made per entity
+     * per process. If multiple requests for registering a candidate for a given entity are received in the
+     * current process a CandidateAlreadyRegisteredException will be thrown.
+     * <p>
+     * The registration is performed asynchronously and the {@link EntityOwnershipCandidate} instance is
+     * notified whenever its process instance is granted ownership of the entity and also whenever it loses
+     * ownership. Note that the {@link EntityOwnershipCandidate} is not notified when another process instance
+     * is granted ownership.
      *
      * @param entity the entity which the Candidate wants to own
      * @param candidate the Candidate that wants to own the entity
      * @return a registration object that can be used to unregister the Candidate
      * @throws org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException
      */
-    EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
+    EntityOwnershipCandidateRegistration registerCandidate(@Nonnull Entity entity, @Nonnull EntityOwnershipCandidate candidate)
             throws CandidateAlreadyRegisteredException;
 
     /**
-     * Registers a Listener that is interested in the ownership status of the given Entity. On registration the Listener
-     * will be notified of the ownership status of the given Entity at the time of registration.
+     * Registers a listener that is interested in ownership changes for entities of the given entity type. The
+     * listener is notified whenever its process instance is granted ownership of the entity and also whenever
+     * it loses ownership. On registration the listener will be notified of all entities its process instance
+     * currently owns at the time of registration.
      *
-     * @param entity the Entity whose ownership status the Listener is interested in
-     * @param listener the Listener that is interested in the entity
+     * @param entityType the type of entities whose ownership status the Listener is interested in
+     * @param listener the listener that is interested in the entities
      * @return a registration object that can be used to unregister the Listener
      */
-    EntityOwnershipListenerRegistration registerListener(Entity entity, EntityOwnershipListener listener);
+    EntityOwnershipListenerRegistration registerListener(@Nonnull String entityType, @Nonnull EntityOwnershipListener listener);
 
 }
index e118458..afef3aa 100644 (file)
@@ -7,20 +7,30 @@
  */
 package org.opendaylight.controller.md.sal.common.impl.clustering;
 
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 
 /**
  * Abstract base class for an EntityOwnershipCandidateRegistration.
  *
  * @author Thomas Pantelis
  */
-public abstract class AbstractEntityOwnershipCandidateRegistration
-        extends AbstractEntityOwnershipListenerRegistration<EntityOwnershipCandidate>
+public abstract class AbstractEntityOwnershipCandidateRegistration extends AbstractObjectRegistration<EntityOwnershipCandidate>
         implements EntityOwnershipCandidateRegistration {
+    private final Entity entity;
 
-    protected AbstractEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) {
-        super(candidate, entity);
+    protected AbstractEntityOwnershipCandidateRegistration(@Nonnull EntityOwnershipCandidate candidate,
+            @Nonnull Entity entity) {
+        super(candidate);
+        this.entity = Preconditions.checkNotNull(entity, "entity cannot be null");
+    }
+
+    @Override
+    public Entity getEntity() {
+        return entity;
     }
 }
index 881d662..77118a0 100644 (file)
@@ -7,32 +7,29 @@
  */
 package org.opendaylight.controller.md.sal.common.impl.clustering;
 
-import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
 
 /**
  * Abstract base class for an EntityOwnershipListenerRegistration.
  *
  * @author Thomas Pantelis
  */
-public abstract class AbstractEntityOwnershipListenerRegistration<T extends EntityOwnershipListener>
+public abstract class AbstractEntityOwnershipListenerRegistration extends AbstractObjectRegistration<EntityOwnershipListener>
         implements EntityOwnershipListenerRegistration {
-    private final T listener;
-    private final Entity entity;
+    private final String entityType;
 
-    protected AbstractEntityOwnershipListenerRegistration(T listener, Entity entity) {
-        this.listener = listener;
-        this.entity = entity;
+    protected AbstractEntityOwnershipListenerRegistration(@Nonnull EntityOwnershipListener listener,
+            @Nonnull String entityType) {
+        super(listener);
+        this.entityType = Preconditions.checkNotNull(entityType, "entityType cannot be null");
     }
 
     @Override
-    public T getInstance() {
-        return listener;
-    }
-
-    @Override
-    public Entity getEntity() {
-        return entity;
+    public String getEntityType() {
+        return entityType;
     }
 }
index 94c7aa0..b92a124 100644 (file)
@@ -26,7 +26,15 @@ class DistributedEntityOwnershipCandidateRegistration extends AbstractEntityOwne
     }
 
     @Override
-    public void close() {
+    protected void removeRegistration() {
         service.unregisterCandidate(getEntity(), getInstance());
     }
+
+    @Override
+    public String toString() {
+        return "DistributedEntityOwnershipCandidateRegistration [entity=" + getEntity() + ", candidate="
+                + getInstance() + "]";
+    }
+
+
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipListenerRegistration.java
new file mode 100644 (file)
index 0000000..9498e8f
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.impl.clustering.AbstractEntityOwnershipListenerRegistration;
+
+/**
+ * Implementation of EntityOwnershipListenerRegistration.
+ *
+ * @author Thomas Pantelis
+ */
+class DistributedEntityOwnershipListenerRegistration extends AbstractEntityOwnershipListenerRegistration {
+
+    private final DistributedEntityOwnershipService service;
+
+    DistributedEntityOwnershipListenerRegistration(EntityOwnershipListener listener, String entityType,
+            DistributedEntityOwnershipService service) {
+        super(listener, entityType);
+        this.service = service;
+    }
+
+    @Override
+    protected void removeRegistration() {
+        service.unregisterListener(getEntityType(), getInstance());
+    }
+
+    @Override
+    public String toString() {
+        return "DistributedEntityOwnershipListenerRegistration [entityType=" + getEntityType()
+                + ", listener=" + getInstance() + "]";
+    }
+}
index af35ebd..cd45ef5 100644 (file)
@@ -11,6 +11,7 @@ import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -19,7 +20,9 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
@@ -113,6 +116,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     @Override
     public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
             throws CandidateAlreadyRegisteredException {
+        Preconditions.checkNotNull(entity, "entity cannot be null");
+        Preconditions.checkNotNull(candidate, "candidate cannot be null");
 
         EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate);
         if(currentCandidate != null) {
@@ -128,16 +133,29 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     }
 
     void unregisterCandidate(Entity entity, EntityOwnershipCandidate entityOwnershipCandidate) {
-        LOG.debug("Unregistering candidate for {}", entity);
+        LOG.debug("Unregistering candidate {} for {}", entityOwnershipCandidate, entity);
 
         executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entityOwnershipCandidate, entity));
         registeredEntities.remove(entity);
     }
 
     @Override
-    public EntityOwnershipListenerRegistration registerListener(Entity entity, EntityOwnershipListener listener) {
-        // TODO Auto-generated method stub
-        return null;
+    public EntityOwnershipListenerRegistration registerListener(String entityType, EntityOwnershipListener listener) {
+        Preconditions.checkNotNull(entityType, "entityType cannot be null");
+        Preconditions.checkNotNull(listener, "listener cannot be null");
+
+        RegisterListenerLocal registerListener = new RegisterListenerLocal(listener, entityType);
+
+        LOG.debug("Registering listener with message: {}", registerListener);
+
+        executeLocalEntityOwnershipShardOperation(registerListener);
+        return new DistributedEntityOwnershipListenerRegistration(listener, entityType, this);
+    }
+
+    void unregisterListener(String entityType, EntityOwnershipListener listener) {
+        LOG.debug("Unregistering listener {} for entity type {}", listener, entityType);
+
+        executeLocalEntityOwnershipShardOperation(new UnregisterListenerLocal(listener, entityType));
     }
 
     @Override
index 59d2844..385bb70 100644 (file)
@@ -37,6 +37,8 @@ final class EntityOwnersModel {
     static final NodeIdentifier ENTITY_OWNERS_NODE_ID = new NodeIdentifier(EntityOwners.QNAME);
     static final NodeIdentifier ENTITY_OWNER_NODE_ID = new NodeIdentifier(ENTITY_OWNER_QNAME);
     static final NodeIdentifier ENTITY_NODE_ID = new NodeIdentifier(ENTITY_QNAME);
+    static final NodeIdentifier ENTITY_ID_NODE_ID = new NodeIdentifier(ENTITY_ID_QNAME);
+    static final NodeIdentifier ENTITY_TYPE_NODE_ID = new NodeIdentifier(ENTITY_TYPE_QNAME);
     static final NodeIdentifier CANDIDATE_NODE_ID = new NodeIdentifier(Candidate.QNAME);
     static final NodeIdentifier CANDIDATE_NAME_NODE_ID = new NodeIdentifier(CANDIDATE_NAME_QNAME);
     static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME);
index 5220ea2..7941bc0 100644 (file)
@@ -12,6 +12,7 @@ import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.Map;
@@ -32,6 +33,7 @@ class EntityOwnershipListenerSupport {
     private final ActorContext actorContext;
     private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
     private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
+    private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
 
     EntityOwnershipListenerSupport(ActorContext actorContext) {
         this.actorContext = actorContext;
@@ -40,7 +42,60 @@ class EntityOwnershipListenerSupport {
     void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
         LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity);
 
-        if(entityListenerMap.put(entity, listener)) {
+        addListener(listener, entity, entityListenerMap);
+    }
+
+    void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
+        LOG.debug("Adding EntityOwnershipListener {} for entity type {}", listener, entityType);
+
+        addListener(listener, entityType, entityTypeListenerMap);
+    }
+
+    void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
+        LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity);
+
+        removeListener(listener, entity, entityListenerMap);
+    }
+
+    void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
+        LOG.debug("Removing EntityOwnershipListener {} for entity type {}", listener, entityType);
+
+        removeListener(listener, entityType, entityTypeListenerMap);
+    }
+
+    void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) {
+        notifyListeners(entity, entity, wasOwner, isOwner, entityListenerMap);
+        notifyListeners(entity, entity.getType(), wasOwner, isOwner, entityTypeListenerMap);
+    }
+
+    void notifyEntityOwnershipListener(Entity entity, boolean wasOwner, boolean isOwner,
+            EntityOwnershipListener listener) {
+        notifyListeners(entity, wasOwner, isOwner, Arrays.asList(listener));
+    }
+
+    private <T> void notifyListeners(Entity entity, T mapKey, boolean wasOwner, boolean isOwner,
+            Multimap<T, EntityOwnershipListener> listenerMap) {
+        Collection<EntityOwnershipListener> listeners = listenerMap.get(mapKey);
+        if(!listeners.isEmpty()) {
+            notifyListeners(entity, wasOwner, isOwner, listeners);
+        }
+    }
+
+    private void notifyListeners(Entity entity, boolean wasOwner, boolean isOwner,
+            Collection<EntityOwnershipListener> listeners) {
+        EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner);
+        for(EntityOwnershipListener listener: listeners) {
+            ActorRef listenerActor = listenerActorFor(listener);
+
+            LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor, changed);
+
+            listenerActor.tell(changed, ActorRef.noSender());
+        }
+    }
+
+    private <T> void addListener(EntityOwnershipListener listener, T mapKey,
+            Multimap<T, EntityOwnershipListener> toListenerMap) {
+        if(toListenerMap.put(mapKey, listener)) {
             ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
             if(listenerEntry == null) {
                 listenerActorMap.put(listener, new ListenerActorRefEntry());
@@ -50,10 +105,9 @@ class EntityOwnershipListenerSupport {
         }
     }
 
-    void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
-        LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity);
-
-        if(entityListenerMap.remove(entity, listener)) {
+    private <T> void removeListener(EntityOwnershipListener listener, T mapKey,
+            Multimap<T, EntityOwnershipListener> fromListenerMap) {
+        if(fromListenerMap.remove(mapKey, listener)) {
             ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
 
             LOG.debug("Found {}", listenerEntry);
@@ -70,22 +124,6 @@ class EntityOwnershipListenerSupport {
         }
     }
 
-    void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) {
-        Collection<EntityOwnershipListener> listeners = entityListenerMap.get(entity);
-        if(listeners.isEmpty()) {
-            return;
-        }
-
-        EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner);
-        for(EntityOwnershipListener listener: listeners) {
-            ActorRef listenerActor = listenerActorFor(listener);
-
-            LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor,changed);
-
-            listenerActor.tell(changed, ActorRef.noSender());
-        }
-    }
-
     private ActorRef listenerActorFor(EntityOwnershipListener listener) {
         return listenerActorMap.get(listener).actorFor(listener);
     }
index a71c86d..4dfbc87 100644 (file)
@@ -9,11 +9,13 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH;
+import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
 import akka.actor.ActorRef;
@@ -34,7 +36,9 @@ import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
@@ -111,6 +115,10 @@ class EntityOwnershipShard extends Shard {
             onPeerDown((PeerDown) message);
         } else if(message instanceof PeerUp) {
             onPeerUp((PeerUp) message);
+        } if(message instanceof RegisterListenerLocal) {
+            onRegisterListenerLocal((RegisterListenerLocal)message);
+        } if(message instanceof UnregisterListenerLocal) {
+            onUnregisterListenerLocal((UnregisterListenerLocal)message);
         } else if(!commitCoordinator.handleMessage(message, this)) {
             super.onReceiveCommand(message);
         }
@@ -140,6 +148,36 @@ class EntityOwnershipShard extends Shard {
         getSender().tell(SuccessReply.INSTANCE, getSelf());
     }
 
+    private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) {
+        LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener);
+
+        listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener());
+
+        getSender().tell(SuccessReply.INSTANCE, getSelf());
+
+        searchForEntitiesOwnedBy(localMemberName, new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
+                        entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
+                String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
+                if(registerListener.getEntityType().equals(entityType)) {
+                    Entity entity = new Entity(entityType,
+                            (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
+                    listenerSupport.notifyEntityOwnershipListener(entity, false, true, registerListener.getListener());
+                }
+            }
+        });
+    }
+
+    private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) {
+        LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener);
+
+        listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener());
+
+        getSender().tell(SuccessReply.INSTANCE, getSelf());
+    }
+
     void tryCommitModifications(final BatchedModifications modifications) {
         if(isLeader()) {
             LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID());
@@ -247,6 +285,26 @@ class EntityOwnershipShard extends Shard {
     }
 
     private void selectNewOwnerForEntitiesOwnedBy(String owner) {
+        final BatchedModifications modifications = commitCoordinator.newBatchedModifications();
+        searchForEntitiesOwnedBy(owner, new EntityWalker() {
+            @Override
+            public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
+                Object newOwner = newOwner(getCandidateNames(entityNode));
+                YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
+                        node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
+                        node(ENTITY_OWNER_NODE_ID).build();
+
+                LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
+
+                modifications.addModification(new WriteModification(entityPath,
+                        ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+            }
+        });
+
+        commitCoordinator.commitModifications(modifications, this);
+    }
+
+    private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) {
         DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
         Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
         if(!possibleEntityTypes.isPresent()) {
@@ -255,7 +313,6 @@ class EntityOwnershipShard extends Shard {
 
         LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner);
 
-        BatchedModifications modifications = commitCoordinator.newBatchedModifications();
         for(MapEntryNode entityType:  ((MapNode) possibleEntityTypes.get()).getValue()) {
             Optional<DataContainerChild<? extends PathArgument, ?>> possibleEntities =
                     entityType.getChild(ENTITY_NODE_ID);
@@ -267,20 +324,10 @@ class EntityOwnershipShard extends Shard {
                 Optional<DataContainerChild<? extends PathArgument, ?>> possibleOwner =
                         entity.getChild(ENTITY_OWNER_NODE_ID);
                 if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) {
-                    Object newOwner = newOwner(getCandidateNames(entity));
-                    YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
-                            node(entityType.getIdentifier()).node(ENTITY_NODE_ID).node(entity.getIdentifier()).
-                                    node(ENTITY_OWNER_NODE_ID).build();
-
-                    LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
-
-                    modifications.addModification(new WriteModification(entityPath,
-                            ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)));
+                    walker.onEntity(entityType, entity);
                 }
             }
         }
-
-        commitCoordinator.commitModifications(modifications, this);
     }
 
     private Collection<String> getCandidateNames(MapEntryNode entity) {
@@ -341,4 +388,8 @@ class EntityOwnershipShard extends Shard {
             return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
         }
     }
+
+    private static interface EntityWalker {
+        void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterListenerLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterListenerLocal.java
new file mode 100644 (file)
index 0000000..6c64fb5
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+
+/**
+ * Message sent to the local EntityOwnershipShard to register an EntityOwnershipListener.
+ *
+ * @author Thomas Pantelis
+ */
+public class RegisterListenerLocal {
+    private final EntityOwnershipListener listener;
+    private final String entityType;
+
+    public RegisterListenerLocal(EntityOwnershipListener listener, String entityType) {
+        this.listener = listener;
+        this.entityType = entityType;
+    }
+
+    public EntityOwnershipListener getListener() {
+        return listener;
+    }
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    @Override
+    public String toString() {
+        return "RegisterListenerLocal [entityType=" + entityType + ", listener=" + listener + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterListenerLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterListenerLocal.java
new file mode 100644 (file)
index 0000000..2bfa81a
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+
+/**
+ * Message sent to the local EntityOwnershipShard to unregister an EntityOwnershipListener.
+ *
+ * @author Thomas Pantelis
+ */
+public class UnregisterListenerLocal {
+    private final EntityOwnershipListener listener;
+    private final String entityType;
+
+    public UnregisterListenerLocal(EntityOwnershipListener listener, String entityType) {
+        this.listener = listener;
+        this.entityType = entityType;
+    }
+
+    public EntityOwnershipListener getListener() {
+        return listener;
+    }
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    @Override
+    public String toString() {
+        return "UnregisterListenerLocal [entityType=" + entityType + ", listener=" + listener + "]";
+    }
+}
index 0db6ef6..51f042d 100644 (file)
@@ -33,7 +33,9 @@ import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.config.ModuleConfig;
 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
@@ -41,6 +43,8 @@ import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlready
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -192,8 +196,42 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh
     }
 
     @Test
-    public void testRegisterListener() {
-        // TODO
+    public void testListenerRegistration() {
+        final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
+        DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
+            @Override
+            protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+                return shardPropsCreator;
+            }
+        };
+
+        service.start();
+
+        shardPropsCreator.expectShardMessage(RegisterListenerLocal.class);
+
+        YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+        Entity entity = new Entity(ENTITY_TYPE, entityId);
+        EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
+
+        EntityOwnershipListenerRegistration reg = service.registerListener(entity.getType(), listener);
+
+        assertNotNull("EntityOwnershipListenerRegistration null", reg);
+        assertEquals("getEntityType", entity.getType(), reg.getEntityType());
+        assertEquals("getInstance", listener, reg.getInstance());
+
+        RegisterListenerLocal regListener = shardPropsCreator.waitForShardMessage();
+        assertSame("getListener", listener, regListener.getListener());
+        assertEquals("getEntityType", entity.getType(), regListener.getEntityType());
+
+        shardPropsCreator.expectShardMessage(UnregisterListenerLocal.class);
+
+        reg.close();
+
+        UnregisterListenerLocal unregListener = shardPropsCreator.waitForShardMessage();
+        assertEquals("getEntityType", entity.getType(), unregListener.getEntityType());
+        assertSame("getListener", listener, unregListener.getListener());
+
+        service.close();
     }
 
     private void verifyEntityCandidate(ActorRef entityOwnershipShard, String entityType,
index dbc5c2c..8ddc0b4 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -19,6 +21,8 @@ import akka.actor.UntypedActorContext;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Test;
@@ -55,72 +59,112 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
 
         EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1");
         EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2");
-        Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1")));
-        Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2")));
+        EntityOwnershipListener mockListener3 = mock(EntityOwnershipListener.class, "EntityOwnershipListener3");
+        Entity entity1 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id1")));
+        Entity entity2 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id2")));
+        Entity entity3 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id3")));
+        Entity entity4 = new Entity("type2", YangInstanceIdentifier.of(QName.create("test", "id4")));
+        Entity entity5 = new Entity("noListener", YangInstanceIdentifier.of(QName.create("test", "id5")));
 
         // Add EntityOwnershipListener registrations.
 
         support.addEntityOwnershipListener(entity1, mockListener1);
+        support.addEntityOwnershipListener(entity1, mockListener1); // register again - should be noop
         support.addEntityOwnershipListener(entity2, mockListener1);
         support.addEntityOwnershipListener(entity1, mockListener2);
+        support.addEntityOwnershipListener(entity1.getType(), mockListener3);
 
-        // Notify entity1 changed and verify both listeners are notified.
+        // Notify entity1 changed and verify listeners are notified.
 
         support.notifyEntityOwnershipListeners(entity1, false, true);
 
         verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true);
         verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
-        assertEquals("# of listener actors", 2, actorContext.children().size());
+        verify(mockListener3, timeout(5000)).ownershipChanged(entity1, false, true);
+        assertEquals("# of listener actors", 3, actorContext.children().size());
 
-        // Notify entity2 changed and verify only mockListener1 is notified.
+        // Notify entity2 changed and verify only mockListener1 and mockListener3 are notified.
 
         support.notifyEntityOwnershipListeners(entity2, false, true);
 
         verify(mockListener1, timeout(5000)).ownershipChanged(entity2, false, true);
-        verify(mockListener2, never()).ownershipChanged(entity2, false, true);
-        assertEquals("# of listener actors", 2, actorContext.children().size());
+        verify(mockListener3, timeout(5000)).ownershipChanged(entity2, false, true);
+        Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+        verify(mockListener2, never()).ownershipChanged(eq(entity2), anyBoolean(), anyBoolean());
+        assertEquals("# of listener actors", 3, actorContext.children().size());
 
-        // Notify entity3 changed and verify neither listener is notified.
+        // Notify entity3 changed and verify only mockListener3 is notified.
 
-        Entity entity3 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id3")));
         support.notifyEntityOwnershipListeners(entity3, false, true);
 
-        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verify(mockListener3, timeout(5000)).ownershipChanged(entity3, false, true);
+        Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+        verify(mockListener1, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean());
+        verify(mockListener2, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean());
 
-        verify(mockListener1, never()).ownershipChanged(entity3, false, true);
-        verify(mockListener2, never()).ownershipChanged(entity3, false, true);
+        // Notify entity4 changed and verify no listeners are notified.
 
-        reset(mockListener1, mockListener2);
+        support.notifyEntityOwnershipListeners(entity4, false, true);
+
+        Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+        verify(mockListener1, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+        verify(mockListener2, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+        verify(mockListener3, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+
+        // Notify entity5 changed and verify no listener is notified.
+
+        support.notifyEntityOwnershipListeners(entity5, false, true);
 
-        // Unregister mockListener1 for entity1, issue a change and verify only mockListener2 is notified.
+        Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+        verify(mockListener1, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+        verify(mockListener2, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+        verify(mockListener3, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+
+        reset(mockListener1, mockListener2, mockListener3);
+
+        // Unregister mockListener1 for entity1, issue a change and verify only mockListeners 2 & 3 are notified.
 
         support.removeEntityOwnershipListener(entity1, mockListener1);
+        support.notifyEntityOwnershipListeners(entity1, false, true);
+
+        verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
+        verify(mockListener3, timeout(5000)).ownershipChanged(entity1, false, true);
+        Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+        verify(mockListener1, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+
+        // Unregister mockListener3, issue a change for entity1 and verify only mockListeners2 is notified.
 
+        reset(mockListener1, mockListener2, mockListener3);
+
+        support.removeEntityOwnershipListener(entity1.getType(), mockListener3);
         support.notifyEntityOwnershipListeners(entity1, false, true);
 
         verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
-        verify(mockListener1, never()).ownershipChanged(entity1, false, true);
+        Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+        verify(mockListener1, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+        verify(mockListener3, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
 
-        // Completely unregister both listeners and verify their listener actors are destroyed.
+        // Completely unregister all listeners and verify their listener actors are destroyed.
 
         Iterable<ActorRef> listenerActors = actorContext.children();
         assertEquals("# of listener actors", 2, listenerActors.size());
 
-        Iterator<ActorRef> iter = listenerActors.iterator();
-        ActorRef listenerActor1 = iter.next();
-        ActorRef listenerActor2 = iter.next();
-
-        JavaTestKit kit1 = new JavaTestKit(getSystem());
-        kit1.watch(listenerActor1);
-
-        JavaTestKit kit2 = new JavaTestKit(getSystem());
-        kit2.watch(listenerActor2);
+        List<JavaTestKit> watchers = new ArrayList<>();
+        for(Iterator<ActorRef> iter = listenerActors.iterator(); iter.hasNext();) {
+            JavaTestKit kit = new JavaTestKit(getSystem());
+            kit.watch(iter.next());
+            watchers.add(kit);
+        }
 
         support.removeEntityOwnershipListener(entity2, mockListener1);
+        support.removeEntityOwnershipListener(entity2, mockListener1); // un-register again - shoild be noop
         support.removeEntityOwnershipListener(entity1, mockListener2);
 
-        kit1.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor1);
-        kit2.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor2);
+        Iterator<ActorRef> iter = listenerActors.iterator();
+        for(JavaTestKit kit: watchers) {
+            kit.expectTerminated(JavaTestKit.duration("3 seconds"), iter.next());
+        }
+
         assertEquals("# of listener actors", 0, actorContext.children().size());
 
         // Re-register mockListener1 for entity1 and verify it is notified.
@@ -132,7 +176,8 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
         support.notifyEntityOwnershipListeners(entity1, false, true);
 
         verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true);
-        verify(mockListener2, never()).ownershipChanged(entity2, false, true);
+        verify(mockListener2, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+        verify(mockListener3, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
 
         // Quickly register and unregister mockListener2 - expecting no exceptions.
 
index 8ba17c0..146916a 100644 (file)
@@ -7,9 +7,13 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.hamcrest.CoreMatchers.either;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -36,6 +40,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.hamcrest.Matcher;
 import org.junit.After;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
@@ -45,7 +50,9 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
@@ -65,6 +72,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -234,16 +242,17 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
         ShardTestKit kit = new ShardTestKit(getSystem());
 
-        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
                 shardBatchedModificationCount(5);
 
         String peerId = newShardId("leader").toString();
         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
 
-        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
-                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
-                withDispatcher(Dispatchers.DefaultDispatcherId()));
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
+                TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME),
+                        ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build(),
+                        dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
 
         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
                 DataStoreVersions.CURRENT_VERSION), peer);
@@ -259,8 +268,6 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
                 LOCAL_MEMBER_NAME);
 
-        shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
-
         // Test with initial commit timeout and subsequent retry.
 
         leader.modificationsReceived = new CountDownLatch(1);
@@ -588,6 +595,90 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
     }
 
+    @Test
+    public void testListenerRegistration() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+        kit.waitUntilLeader(shard);
+        ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+        String otherEntityType = "otherEntityType";
+        Entity entity1 = new Entity(ENTITY_TYPE, ENTITY_ID1);
+        Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2);
+        Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3);
+        Entity entity4 = new Entity(otherEntityType, ENTITY_ID3);
+        EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        // Register listener
+
+        shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        // Register a couple candidates for the desired entity type and verify listener is notified.
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verify(listener, timeout(5000)).ownershipChanged(entity1, false, true);
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity2), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verify(listener, timeout(5000)).ownershipChanged(entity2, false, true);
+        reset(listener);
+
+        // Register another candidate for another entity type and verify listener is not notified.
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity4), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verify(listener, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+
+        // Register remote candidate for entity1
+
+        String remoteMemberName = "remoteMember";
+        writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getId(), remoteMemberName),
+                shardDataTree);
+        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getId(), remoteMemberName);
+
+        // Unregister the local candidate for entity1 and verify listener is notified
+
+        shard.tell(new UnregisterCandidateLocal(candidate, entity1), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verify(listener, timeout(5000)).ownershipChanged(entity1, true, false);
+        reset(listener);
+
+        // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
+
+        shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity3), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME);
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+        verify(listener, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+        // Re-register the listener and verify it gets notified of current locally owned entities
+
+        reset(listener, candidate);
+
+        shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        Matcher<Entity> entityMatcher = either(equalTo(entity2)).or(equalTo(entity3));
+        verify(listener, timeout(5000).times(2)).ownershipChanged(argThat(entityMatcher), eq(false), eq(true));
+        Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
+        verify(listener, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean());
+        verify(listener, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean());
+        verify(candidate, never()).ownershipChanged(eq(entity2), anyBoolean(), anyBoolean());
+        verify(candidate, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean());
+    }
+
     private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
             JavaTestKit sender) {
         BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
@@ -674,6 +765,23 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
                 type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
     }
 
+    public static class TestEntityOwnershipShard extends EntityOwnershipShard {
+
+        TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
+                DatastoreContext datastoreContext) {
+            super(name, peerAddresses, datastoreContext, SCHEMA_CONTEXT, LOCAL_MEMBER_NAME);
+        }
+
+        @Override
+        public void onReceiveCommand(Object message) throws Exception {
+            if(!(message instanceof ElectionTimeout)) {
+                super.onReceiveCommand(message);
+            }
+        }
+
+
+    }
+
     public static class MockFollower extends UntypedActor {
         volatile boolean grantVote;
         volatile boolean dropAppendEntries;

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.