From 4f2d31367df3b74d326d4a7c69bd20789d020198 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 17 Aug 2015 05:57:53 -0400 Subject: [PATCH] Bug 4105: Implement EntityOwnershipListener registration/notification Change-Id: I49ee7f4b5f48ddde4779d37ba34c88dd776dd47b Signed-off-by: Tom Pantelis --- .../EntityOwnershipCandidateRegistration.java | 15 ++- .../EntityOwnershipListenerRegistration.java | 7 +- .../clustering/EntityOwnershipService.java | 27 ++-- ...tEntityOwnershipCandidateRegistration.java | 18 ++- ...ctEntityOwnershipListenerRegistration.java | 25 ++-- ...dEntityOwnershipCandidateRegistration.java | 10 +- ...edEntityOwnershipListenerRegistration.java | 38 ++++++ .../DistributedEntityOwnershipService.java | 26 +++- .../entityownership/EntityOwnersModel.java | 2 + .../EntityOwnershipListenerSupport.java | 80 +++++++++--- .../entityownership/EntityOwnershipShard.java | 75 +++++++++-- .../messages/RegisterListenerLocal.java | 38 ++++++ .../messages/UnregisterListenerLocal.java | 38 ++++++ ...DistributedEntityOwnershipServiceTest.java | 42 +++++- .../EntityOwnershipListenerSupportTest.java | 101 +++++++++++---- .../EntityOwnershipShardTest.java | 120 +++++++++++++++++- 16 files changed, 557 insertions(+), 105 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipListenerRegistration.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterListenerLocal.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterListenerLocal.java diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipCandidateRegistration.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipCandidateRegistration.java index 43bf779b89..c3c43d92ba 100644 --- a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipCandidateRegistration.java +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipCandidateRegistration.java @@ -8,10 +8,23 @@ package org.opendaylight.controller.md.sal.common.api.clustering; +import javax.annotation.Nonnull; +import org.opendaylight.yangtools.concepts.ObjectRegistration; + /** * An EntityOwnershipCandidateRegistration records a request to register a Candidate for a given Entity. Calling * close on the EntityOwnershipCandidateRegistration will remove the Candidate from any future ownership considerations * for that Entity and will also remove it as a Listener for ownership status changes. */ -public interface EntityOwnershipCandidateRegistration extends EntityOwnershipListenerRegistration { +public interface EntityOwnershipCandidateRegistration extends ObjectRegistration { + /** + * Returns the entity that the listener was registered for + */ + @Nonnull Entity getEntity(); + + /** + * Unregister the listener + */ + @Override + void close(); } diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipListenerRegistration.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipListenerRegistration.java index ff9c851b14..ff7ce5548e 100644 --- a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipListenerRegistration.java +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipListenerRegistration.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.md.sal.common.api.clustering; +import javax.annotation.Nonnull; import org.opendaylight.yangtools.concepts.ObjectRegistration; /** @@ -18,13 +19,13 @@ import org.opendaylight.yangtools.concepts.ObjectRegistration; public interface EntityOwnershipListenerRegistration extends ObjectRegistration { /** - * - * @return the entity that the listener was registered for + * Return the entity type that the listener was registered for */ - Entity getEntity(); + @Nonnull String getEntityType(); /** * Unregister the listener */ + @Override void close(); } diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipService.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipService.java index fc15dac9d3..52009a2374 100644 --- a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipService.java +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityOwnershipService.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.md.sal.common.api.clustering; +import javax.annotation.Nonnull; + /** *

* The EntityOwnershipService provides the means for a component/application to request ownership for a given @@ -22,26 +24,33 @@ package org.opendaylight.controller.md.sal.common.api.clustering; public interface EntityOwnershipService { /** - * Registers as a Candidate that wants to own the given Entity. Only one such request can be made per process. - * If multiple requests for registering a Candidate for a given Entity are received in the current process a - * CandidateAlreadyRegisteredException will be thrown + * Registers a candidate for ownership of the given entity. Only one such request can be made per entity + * per process. If multiple requests for registering a candidate for a given entity are received in the + * current process a CandidateAlreadyRegisteredException will be thrown. + *

+ * The registration is performed asynchronously and the {@link EntityOwnershipCandidate} instance is + * notified whenever its process instance is granted ownership of the entity and also whenever it loses + * ownership. Note that the {@link EntityOwnershipCandidate} is not notified when another process instance + * is granted ownership. * * @param entity the entity which the Candidate wants to own * @param candidate the Candidate that wants to own the entity * @return a registration object that can be used to unregister the Candidate * @throws org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException */ - EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate) + EntityOwnershipCandidateRegistration registerCandidate(@Nonnull Entity entity, @Nonnull EntityOwnershipCandidate candidate) throws CandidateAlreadyRegisteredException; /** - * Registers a Listener that is interested in the ownership status of the given Entity. On registration the Listener - * will be notified of the ownership status of the given Entity at the time of registration. + * Registers a listener that is interested in ownership changes for entities of the given entity type. The + * listener is notified whenever its process instance is granted ownership of the entity and also whenever + * it loses ownership. On registration the listener will be notified of all entities its process instance + * currently owns at the time of registration. * - * @param entity the Entity whose ownership status the Listener is interested in - * @param listener the Listener that is interested in the entity + * @param entityType the type of entities whose ownership status the Listener is interested in + * @param listener the listener that is interested in the entities * @return a registration object that can be used to unregister the Listener */ - EntityOwnershipListenerRegistration registerListener(Entity entity, EntityOwnershipListener listener); + EntityOwnershipListenerRegistration registerListener(@Nonnull String entityType, @Nonnull EntityOwnershipListener listener); } diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java index e1184583f1..afef3aa8b5 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java @@ -7,20 +7,30 @@ */ package org.opendaylight.controller.md.sal.common.impl.clustering; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; /** * Abstract base class for an EntityOwnershipCandidateRegistration. * * @author Thomas Pantelis */ -public abstract class AbstractEntityOwnershipCandidateRegistration - extends AbstractEntityOwnershipListenerRegistration +public abstract class AbstractEntityOwnershipCandidateRegistration extends AbstractObjectRegistration implements EntityOwnershipCandidateRegistration { + private final Entity entity; - protected AbstractEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) { - super(candidate, entity); + protected AbstractEntityOwnershipCandidateRegistration(@Nonnull EntityOwnershipCandidate candidate, + @Nonnull Entity entity) { + super(candidate); + this.entity = Preconditions.checkNotNull(entity, "entity cannot be null"); + } + + @Override + public Entity getEntity() { + return entity; } } diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java index 881d6624d6..77118a0f43 100644 --- a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java @@ -7,32 +7,29 @@ */ package org.opendaylight.controller.md.sal.common.impl.clustering; -import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import com.google.common.base.Preconditions; +import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration; +import org.opendaylight.yangtools.concepts.AbstractObjectRegistration; /** * Abstract base class for an EntityOwnershipListenerRegistration. * * @author Thomas Pantelis */ -public abstract class AbstractEntityOwnershipListenerRegistration +public abstract class AbstractEntityOwnershipListenerRegistration extends AbstractObjectRegistration implements EntityOwnershipListenerRegistration { - private final T listener; - private final Entity entity; + private final String entityType; - protected AbstractEntityOwnershipListenerRegistration(T listener, Entity entity) { - this.listener = listener; - this.entity = entity; + protected AbstractEntityOwnershipListenerRegistration(@Nonnull EntityOwnershipListener listener, + @Nonnull String entityType) { + super(listener); + this.entityType = Preconditions.checkNotNull(entityType, "entityType cannot be null"); } @Override - public T getInstance() { - return listener; - } - - @Override - public Entity getEntity() { - return entity; + public String getEntityType() { + return entityType; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java index 94c7aa0976..b92a124b28 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java @@ -26,7 +26,15 @@ class DistributedEntityOwnershipCandidateRegistration extends AbstractEntityOwne } @Override - public void close() { + protected void removeRegistration() { service.unregisterCandidate(getEntity(), getInstance()); } + + @Override + public String toString() { + return "DistributedEntityOwnershipCandidateRegistration [entity=" + getEntity() + ", candidate=" + + getInstance() + "]"; + } + + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipListenerRegistration.java new file mode 100644 index 0000000000..9498e8fccf --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipListenerRegistration.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.controller.md.sal.common.impl.clustering.AbstractEntityOwnershipListenerRegistration; + +/** + * Implementation of EntityOwnershipListenerRegistration. + * + * @author Thomas Pantelis + */ +class DistributedEntityOwnershipListenerRegistration extends AbstractEntityOwnershipListenerRegistration { + + private final DistributedEntityOwnershipService service; + + DistributedEntityOwnershipListenerRegistration(EntityOwnershipListener listener, String entityType, + DistributedEntityOwnershipService service) { + super(listener, entityType); + this.service = service; + } + + @Override + protected void removeRegistration() { + service.unregisterListener(getEntityType(), getInstance()); + } + + @Override + public String toString() { + return "DistributedEntityOwnershipListenerRegistration [entityType=" + getEntityType() + + ", listener=" + getInstance() + "]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java index af35ebdc05..cd45ef58a9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java @@ -11,6 +11,7 @@ import akka.actor.ActorRef; import akka.dispatch.OnComplete; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -19,7 +20,9 @@ import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.config.Configuration; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy; import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; @@ -113,6 +116,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService @Override public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate) throws CandidateAlreadyRegisteredException { + Preconditions.checkNotNull(entity, "entity cannot be null"); + Preconditions.checkNotNull(candidate, "candidate cannot be null"); EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate); if(currentCandidate != null) { @@ -128,16 +133,29 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService } void unregisterCandidate(Entity entity, EntityOwnershipCandidate entityOwnershipCandidate) { - LOG.debug("Unregistering candidate for {}", entity); + LOG.debug("Unregistering candidate {} for {}", entityOwnershipCandidate, entity); executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entityOwnershipCandidate, entity)); registeredEntities.remove(entity); } @Override - public EntityOwnershipListenerRegistration registerListener(Entity entity, EntityOwnershipListener listener) { - // TODO Auto-generated method stub - return null; + public EntityOwnershipListenerRegistration registerListener(String entityType, EntityOwnershipListener listener) { + Preconditions.checkNotNull(entityType, "entityType cannot be null"); + Preconditions.checkNotNull(listener, "listener cannot be null"); + + RegisterListenerLocal registerListener = new RegisterListenerLocal(listener, entityType); + + LOG.debug("Registering listener with message: {}", registerListener); + + executeLocalEntityOwnershipShardOperation(registerListener); + return new DistributedEntityOwnershipListenerRegistration(listener, entityType, this); + } + + void unregisterListener(String entityType, EntityOwnershipListener listener) { + LOG.debug("Unregistering listener {} for entity type {}", listener, entityType); + + executeLocalEntityOwnershipShardOperation(new UnregisterListenerLocal(listener, entityType)); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java index 59d2844990..385bb70649 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java @@ -37,6 +37,8 @@ final class EntityOwnersModel { static final NodeIdentifier ENTITY_OWNERS_NODE_ID = new NodeIdentifier(EntityOwners.QNAME); static final NodeIdentifier ENTITY_OWNER_NODE_ID = new NodeIdentifier(ENTITY_OWNER_QNAME); static final NodeIdentifier ENTITY_NODE_ID = new NodeIdentifier(ENTITY_QNAME); + static final NodeIdentifier ENTITY_ID_NODE_ID = new NodeIdentifier(ENTITY_ID_QNAME); + static final NodeIdentifier ENTITY_TYPE_NODE_ID = new NodeIdentifier(ENTITY_TYPE_QNAME); static final NodeIdentifier CANDIDATE_NODE_ID = new NodeIdentifier(Candidate.QNAME); static final NodeIdentifier CANDIDATE_NAME_NODE_ID = new NodeIdentifier(CANDIDATE_NAME_QNAME); static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java index 5220ea29e2..7941bc088c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java @@ -12,6 +12,7 @@ import akka.actor.ActorRef; import akka.actor.PoisonPill; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import java.util.Arrays; import java.util.Collection; import java.util.IdentityHashMap; import java.util.Map; @@ -32,6 +33,7 @@ class EntityOwnershipListenerSupport { private final ActorContext actorContext; private final Map listenerActorMap = new IdentityHashMap<>(); private final Multimap entityListenerMap = HashMultimap.create(); + private final Multimap entityTypeListenerMap = HashMultimap.create(); EntityOwnershipListenerSupport(ActorContext actorContext) { this.actorContext = actorContext; @@ -40,7 +42,60 @@ class EntityOwnershipListenerSupport { void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) { LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity); - if(entityListenerMap.put(entity, listener)) { + addListener(listener, entity, entityListenerMap); + } + + void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) { + LOG.debug("Adding EntityOwnershipListener {} for entity type {}", listener, entityType); + + addListener(listener, entityType, entityTypeListenerMap); + } + + void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) { + LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity); + + removeListener(listener, entity, entityListenerMap); + } + + void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) { + LOG.debug("Removing EntityOwnershipListener {} for entity type {}", listener, entityType); + + removeListener(listener, entityType, entityTypeListenerMap); + } + + void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) { + notifyListeners(entity, entity, wasOwner, isOwner, entityListenerMap); + notifyListeners(entity, entity.getType(), wasOwner, isOwner, entityTypeListenerMap); + } + + void notifyEntityOwnershipListener(Entity entity, boolean wasOwner, boolean isOwner, + EntityOwnershipListener listener) { + notifyListeners(entity, wasOwner, isOwner, Arrays.asList(listener)); + } + + private void notifyListeners(Entity entity, T mapKey, boolean wasOwner, boolean isOwner, + Multimap listenerMap) { + Collection listeners = listenerMap.get(mapKey); + if(!listeners.isEmpty()) { + notifyListeners(entity, wasOwner, isOwner, listeners); + } + } + + private void notifyListeners(Entity entity, boolean wasOwner, boolean isOwner, + Collection listeners) { + EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner); + for(EntityOwnershipListener listener: listeners) { + ActorRef listenerActor = listenerActorFor(listener); + + LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor, changed); + + listenerActor.tell(changed, ActorRef.noSender()); + } + } + + private void addListener(EntityOwnershipListener listener, T mapKey, + Multimap toListenerMap) { + if(toListenerMap.put(mapKey, listener)) { ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener); if(listenerEntry == null) { listenerActorMap.put(listener, new ListenerActorRefEntry()); @@ -50,10 +105,9 @@ class EntityOwnershipListenerSupport { } } - void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) { - LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity); - - if(entityListenerMap.remove(entity, listener)) { + private void removeListener(EntityOwnershipListener listener, T mapKey, + Multimap fromListenerMap) { + if(fromListenerMap.remove(mapKey, listener)) { ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener); LOG.debug("Found {}", listenerEntry); @@ -70,22 +124,6 @@ class EntityOwnershipListenerSupport { } } - void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) { - Collection listeners = entityListenerMap.get(entity); - if(listeners.isEmpty()) { - return; - } - - EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner); - for(EntityOwnershipListener listener: listeners) { - ActorRef listenerActor = listenerActorFor(listener); - - LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor,changed); - - listenerActor.tell(changed, ActorRef.noSender()); - } - } - private ActorRef listenerActorFor(EntityOwnershipListener listener) { return listenerActorMap.get(listener).actorFor(listener); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index a71c86dd9f..4dfbc87eb9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -9,11 +9,13 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NODE_ID; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_ID_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNER_QNAME; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPES_PATH; +import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_TYPE_NODE_ID; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath; import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate; import akka.actor.ActorRef; @@ -34,7 +36,9 @@ import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded; import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.PeerDown; @@ -111,6 +115,10 @@ class EntityOwnershipShard extends Shard { onPeerDown((PeerDown) message); } else if(message instanceof PeerUp) { onPeerUp((PeerUp) message); + } if(message instanceof RegisterListenerLocal) { + onRegisterListenerLocal((RegisterListenerLocal)message); + } if(message instanceof UnregisterListenerLocal) { + onUnregisterListenerLocal((UnregisterListenerLocal)message); } else if(!commitCoordinator.handleMessage(message, this)) { super.onReceiveCommand(message); } @@ -140,6 +148,36 @@ class EntityOwnershipShard extends Shard { getSender().tell(SuccessReply.INSTANCE, getSelf()); } + private void onRegisterListenerLocal(final RegisterListenerLocal registerListener) { + LOG.debug("{}: onRegisterListenerLocal: {}", persistenceId(), registerListener); + + listenerSupport.addEntityOwnershipListener(registerListener.getEntityType(), registerListener.getListener()); + + getSender().tell(SuccessReply.INSTANCE, getSelf()); + + searchForEntitiesOwnedBy(localMemberName, new EntityWalker() { + @Override + public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) { + Optional> possibleType = + entityTypeNode.getChild(ENTITY_TYPE_NODE_ID); + String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null; + if(registerListener.getEntityType().equals(entityType)) { + Entity entity = new Entity(entityType, + (YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue()); + listenerSupport.notifyEntityOwnershipListener(entity, false, true, registerListener.getListener()); + } + } + }); + } + + private void onUnregisterListenerLocal(UnregisterListenerLocal unregisterListener) { + LOG.debug("{}: onUnregisterListenerLocal: {}", persistenceId(), unregisterListener); + + listenerSupport.removeEntityOwnershipListener(unregisterListener.getEntityType(), unregisterListener.getListener()); + + getSender().tell(SuccessReply.INSTANCE, getSelf()); + } + void tryCommitModifications(final BatchedModifications modifications) { if(isLeader()) { LOG.debug("{}: Committing BatchedModifications {} locally", persistenceId(), modifications.getTransactionID()); @@ -247,6 +285,26 @@ class EntityOwnershipShard extends Shard { } private void selectNewOwnerForEntitiesOwnedBy(String owner) { + final BatchedModifications modifications = commitCoordinator.newBatchedModifications(); + searchForEntitiesOwnedBy(owner, new EntityWalker() { + @Override + public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) { + Object newOwner = newOwner(getCandidateNames(entityNode)); + YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH). + node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()). + node(ENTITY_OWNER_NODE_ID).build(); + + LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner); + + modifications.addModification(new WriteModification(entityPath, + ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner))); + } + }); + + commitCoordinator.commitModifications(modifications, this); + } + + private void searchForEntitiesOwnedBy(String owner, EntityWalker walker) { DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot(); Optional> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH); if(!possibleEntityTypes.isPresent()) { @@ -255,7 +313,6 @@ class EntityOwnershipShard extends Shard { LOG.debug("{}: Searching for entities owned by {}", persistenceId(), owner); - BatchedModifications modifications = commitCoordinator.newBatchedModifications(); for(MapEntryNode entityType: ((MapNode) possibleEntityTypes.get()).getValue()) { Optional> possibleEntities = entityType.getChild(ENTITY_NODE_ID); @@ -267,20 +324,10 @@ class EntityOwnershipShard extends Shard { Optional> possibleOwner = entity.getChild(ENTITY_OWNER_NODE_ID); if(possibleOwner.isPresent() && owner.equals(possibleOwner.get().getValue().toString())) { - Object newOwner = newOwner(getCandidateNames(entity)); - YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH). - node(entityType.getIdentifier()).node(ENTITY_NODE_ID).node(entity.getIdentifier()). - node(ENTITY_OWNER_NODE_ID).build(); - - LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner); - - modifications.addModification(new WriteModification(entityPath, - ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner))); + walker.onEntity(entityType, entity); } } } - - commitCoordinator.commitModifications(modifications, this); } private Collection getCandidateNames(MapEntryNode entity) { @@ -341,4 +388,8 @@ class EntityOwnershipShard extends Shard { return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName); } } + + private static interface EntityWalker { + void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterListenerLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterListenerLocal.java new file mode 100644 index 0000000000..6c64fb5bd4 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterListenerLocal.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership.messages; + +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; + +/** + * Message sent to the local EntityOwnershipShard to register an EntityOwnershipListener. + * + * @author Thomas Pantelis + */ +public class RegisterListenerLocal { + private final EntityOwnershipListener listener; + private final String entityType; + + public RegisterListenerLocal(EntityOwnershipListener listener, String entityType) { + this.listener = listener; + this.entityType = entityType; + } + + public EntityOwnershipListener getListener() { + return listener; + } + + public String getEntityType() { + return entityType; + } + + @Override + public String toString() { + return "RegisterListenerLocal [entityType=" + entityType + ", listener=" + listener + "]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterListenerLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterListenerLocal.java new file mode 100644 index 0000000000..2bfa81a72b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterListenerLocal.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership.messages; + +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; + +/** + * Message sent to the local EntityOwnershipShard to unregister an EntityOwnershipListener. + * + * @author Thomas Pantelis + */ +public class UnregisterListenerLocal { + private final EntityOwnershipListener listener; + private final String entityType; + + public UnregisterListenerLocal(EntityOwnershipListener listener, String entityType) { + this.listener = listener; + this.entityType = entityType; + } + + public EntityOwnershipListener getListener() { + return listener; + } + + public String getEntityType() { + return entityType; + } + + @Override + public String toString() { + return "UnregisterListenerLocal [entityType=" + entityType + ", listener=" + listener + "]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index 0db6ef6f9f..51f042ddac 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -33,7 +33,9 @@ import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl; import org.opendaylight.controller.cluster.datastore.config.ModuleConfig; import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfigProvider; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; @@ -41,6 +43,8 @@ import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlready import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -192,8 +196,42 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh } @Test - public void testRegisterListener() { - // TODO + public void testListenerRegistration() { + final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) { + @Override + protected EntityOwnershipShardPropsCreator newShardPropsCreator() { + return shardPropsCreator; + } + }; + + service.start(); + + shardPropsCreator.expectShardMessage(RegisterListenerLocal.class); + + YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME); + Entity entity = new Entity(ENTITY_TYPE, entityId); + EntityOwnershipListener listener = mock(EntityOwnershipListener.class); + + EntityOwnershipListenerRegistration reg = service.registerListener(entity.getType(), listener); + + assertNotNull("EntityOwnershipListenerRegistration null", reg); + assertEquals("getEntityType", entity.getType(), reg.getEntityType()); + assertEquals("getInstance", listener, reg.getInstance()); + + RegisterListenerLocal regListener = shardPropsCreator.waitForShardMessage(); + assertSame("getListener", listener, regListener.getListener()); + assertEquals("getEntityType", entity.getType(), regListener.getEntityType()); + + shardPropsCreator.expectShardMessage(UnregisterListenerLocal.class); + + reg.close(); + + UnregisterListenerLocal unregListener = shardPropsCreator.waitForShardMessage(); + assertEquals("getEntityType", entity.getType(), unregListener.getEntityType()); + assertSame("getListener", listener, unregListener.getListener()); + + service.close(); } private void verifyEntityCandidate(ActorRef entityOwnershipShard, String entityType, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java index dbc5c2c0c0..8ddc0b473e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java @@ -8,6 +8,8 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -19,6 +21,8 @@ import akka.actor.UntypedActorContext; import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import com.google.common.util.concurrent.Uninterruptibles; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Test; @@ -55,72 +59,112 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest { EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1"); EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2"); - Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1"))); - Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2"))); + EntityOwnershipListener mockListener3 = mock(EntityOwnershipListener.class, "EntityOwnershipListener3"); + Entity entity1 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id1"))); + Entity entity2 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id2"))); + Entity entity3 = new Entity("type1", YangInstanceIdentifier.of(QName.create("test", "id3"))); + Entity entity4 = new Entity("type2", YangInstanceIdentifier.of(QName.create("test", "id4"))); + Entity entity5 = new Entity("noListener", YangInstanceIdentifier.of(QName.create("test", "id5"))); // Add EntityOwnershipListener registrations. support.addEntityOwnershipListener(entity1, mockListener1); + support.addEntityOwnershipListener(entity1, mockListener1); // register again - should be noop support.addEntityOwnershipListener(entity2, mockListener1); support.addEntityOwnershipListener(entity1, mockListener2); + support.addEntityOwnershipListener(entity1.getType(), mockListener3); - // Notify entity1 changed and verify both listeners are notified. + // Notify entity1 changed and verify listeners are notified. support.notifyEntityOwnershipListeners(entity1, false, true); verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true); verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true); - assertEquals("# of listener actors", 2, actorContext.children().size()); + verify(mockListener3, timeout(5000)).ownershipChanged(entity1, false, true); + assertEquals("# of listener actors", 3, actorContext.children().size()); - // Notify entity2 changed and verify only mockListener1 is notified. + // Notify entity2 changed and verify only mockListener1 and mockListener3 are notified. support.notifyEntityOwnershipListeners(entity2, false, true); verify(mockListener1, timeout(5000)).ownershipChanged(entity2, false, true); - verify(mockListener2, never()).ownershipChanged(entity2, false, true); - assertEquals("# of listener actors", 2, actorContext.children().size()); + verify(mockListener3, timeout(5000)).ownershipChanged(entity2, false, true); + Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + verify(mockListener2, never()).ownershipChanged(eq(entity2), anyBoolean(), anyBoolean()); + assertEquals("# of listener actors", 3, actorContext.children().size()); - // Notify entity3 changed and verify neither listener is notified. + // Notify entity3 changed and verify only mockListener3 is notified. - Entity entity3 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id3"))); support.notifyEntityOwnershipListeners(entity3, false, true); - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verify(mockListener3, timeout(5000)).ownershipChanged(entity3, false, true); + Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + verify(mockListener1, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean()); + verify(mockListener2, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean()); - verify(mockListener1, never()).ownershipChanged(entity3, false, true); - verify(mockListener2, never()).ownershipChanged(entity3, false, true); + // Notify entity4 changed and verify no listeners are notified. - reset(mockListener1, mockListener2); + support.notifyEntityOwnershipListeners(entity4, false, true); + + Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + verify(mockListener1, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean()); + verify(mockListener2, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean()); + verify(mockListener3, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean()); + + // Notify entity5 changed and verify no listener is notified. + + support.notifyEntityOwnershipListeners(entity5, false, true); - // Unregister mockListener1 for entity1, issue a change and verify only mockListener2 is notified. + Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + verify(mockListener1, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean()); + verify(mockListener2, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean()); + verify(mockListener3, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean()); + + reset(mockListener1, mockListener2, mockListener3); + + // Unregister mockListener1 for entity1, issue a change and verify only mockListeners 2 & 3 are notified. support.removeEntityOwnershipListener(entity1, mockListener1); + support.notifyEntityOwnershipListeners(entity1, false, true); + + verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true); + verify(mockListener3, timeout(5000)).ownershipChanged(entity1, false, true); + Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + verify(mockListener1, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean()); + + // Unregister mockListener3, issue a change for entity1 and verify only mockListeners2 is notified. + reset(mockListener1, mockListener2, mockListener3); + + support.removeEntityOwnershipListener(entity1.getType(), mockListener3); support.notifyEntityOwnershipListeners(entity1, false, true); verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true); - verify(mockListener1, never()).ownershipChanged(entity1, false, true); + Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + verify(mockListener1, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean()); + verify(mockListener3, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean()); - // Completely unregister both listeners and verify their listener actors are destroyed. + // Completely unregister all listeners and verify their listener actors are destroyed. Iterable listenerActors = actorContext.children(); assertEquals("# of listener actors", 2, listenerActors.size()); - Iterator iter = listenerActors.iterator(); - ActorRef listenerActor1 = iter.next(); - ActorRef listenerActor2 = iter.next(); - - JavaTestKit kit1 = new JavaTestKit(getSystem()); - kit1.watch(listenerActor1); - - JavaTestKit kit2 = new JavaTestKit(getSystem()); - kit2.watch(listenerActor2); + List watchers = new ArrayList<>(); + for(Iterator iter = listenerActors.iterator(); iter.hasNext();) { + JavaTestKit kit = new JavaTestKit(getSystem()); + kit.watch(iter.next()); + watchers.add(kit); + } support.removeEntityOwnershipListener(entity2, mockListener1); + support.removeEntityOwnershipListener(entity2, mockListener1); // un-register again - shoild be noop support.removeEntityOwnershipListener(entity1, mockListener2); - kit1.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor1); - kit2.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor2); + Iterator iter = listenerActors.iterator(); + for(JavaTestKit kit: watchers) { + kit.expectTerminated(JavaTestKit.duration("3 seconds"), iter.next()); + } + assertEquals("# of listener actors", 0, actorContext.children().size()); // Re-register mockListener1 for entity1 and verify it is notified. @@ -132,7 +176,8 @@ public class EntityOwnershipListenerSupportTest extends AbstractActorTest { support.notifyEntityOwnershipListeners(entity1, false, true); verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true); - verify(mockListener2, never()).ownershipChanged(entity2, false, true); + verify(mockListener2, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean()); + verify(mockListener3, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean()); // Quickly register and unregister mockListener2 - expecting no exceptions. diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index 8ba17c0cc8..146916a962 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -7,9 +7,13 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static org.hamcrest.CoreMatchers.either; +import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -36,6 +40,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.hamcrest.Matcher; import org.junit.After; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; @@ -45,7 +50,9 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.cluster.datastore.ShardTestKit; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -65,6 +72,7 @@ import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; @@ -234,16 +242,17 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception { ShardTestKit kit = new ShardTestKit(getSystem()); - dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100). + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). shardBatchedModificationCount(5); String peerId = newShardId("leader").toString(); TestActorRef peer = actorFactory.createTestActor(Props.create(MockLeader.class). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); - TestActorRef shard = actorFactory.createTestActor(newShardProps( - ImmutableMap.builder().put(peerId, peer.path().toString()).build()). - withDispatcher(Dispatchers.DefaultDispatcherId())); + TestActorRef shard = actorFactory.createTestActor(Props.create( + TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME), + ImmutableMap.builder().put(peerId, peer.path().toString()).build(), + dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId())); shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.emptyList(), -1L, -1L, DataStoreVersions.CURRENT_VERSION), peer); @@ -259,8 +268,6 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); - shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender()); - // Test with initial commit timeout and subsequent retry. leader.modificationsReceived = new CountDownLatch(1); @@ -588,6 +595,90 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2); } + @Test + public void testListenerRegistration() throws Exception { + ShardTestKit kit = new ShardTestKit(getSystem()); + TestActorRef shard = actorFactory.createTestActor(newShardProps()); + kit.waitUntilLeader(shard); + ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); + + String otherEntityType = "otherEntityType"; + Entity entity1 = new Entity(ENTITY_TYPE, ENTITY_ID1); + Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2); + Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3); + Entity entity4 = new Entity(otherEntityType, ENTITY_ID3); + EntityOwnershipListener listener = mock(EntityOwnershipListener.class); + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + // Register listener + + shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Register a couple candidates for the desired entity type and verify listener is notified. + + shard.tell(new RegisterCandidateLocal(candidate, entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + verify(listener, timeout(5000)).ownershipChanged(entity1, false, true); + + shard.tell(new RegisterCandidateLocal(candidate, entity2), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + verify(listener, timeout(5000)).ownershipChanged(entity2, false, true); + reset(listener); + + // Register another candidate for another entity type and verify listener is not notified. + + shard.tell(new RegisterCandidateLocal(candidate, entity4), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verify(listener, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean()); + + // Register remote candidate for entity1 + + String remoteMemberName = "remoteMember"; + writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getId(), remoteMemberName), + shardDataTree); + verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getId(), remoteMemberName); + + // Unregister the local candidate for entity1 and verify listener is notified + + shard.tell(new UnregisterCandidateLocal(candidate, entity1), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + verify(listener, timeout(5000)).ownershipChanged(entity1, true, false); + reset(listener); + + // Unregister the listener, add a candidate for entity3 and verify listener isn't notified + + shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + shard.tell(new RegisterCandidateLocal(candidate, entity3), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME); + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + verify(listener, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean()); + + // Re-register the listener and verify it gets notified of current locally owned entities + + reset(listener, candidate); + + shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + Matcher entityMatcher = either(equalTo(entity2)).or(equalTo(entity3)); + verify(listener, timeout(5000).times(2)).ownershipChanged(argThat(entityMatcher), eq(false), eq(true)); + Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS); + verify(listener, never()).ownershipChanged(eq(entity4), anyBoolean(), anyBoolean()); + verify(listener, never()).ownershipChanged(eq(entity1), anyBoolean(), anyBoolean()); + verify(candidate, never()).ownershipChanged(eq(entity2), anyBoolean(), anyBoolean()); + verify(candidate, never()).ownershipChanged(eq(entity3), anyBoolean(), anyBoolean()); + } + private void commitModification(TestActorRef shard, NormalizedNode node, JavaTestKit sender) { BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, ""); @@ -674,6 +765,23 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build(); } + public static class TestEntityOwnershipShard extends EntityOwnershipShard { + + TestEntityOwnershipShard(ShardIdentifier name, Map peerAddresses, + DatastoreContext datastoreContext) { + super(name, peerAddresses, datastoreContext, SCHEMA_CONTEXT, LOCAL_MEMBER_NAME); + } + + @Override + public void onReceiveCommand(Object message) throws Exception { + if(!(message instanceof ElectionTimeout)) { + super.onReceiveCommand(message); + } + } + + + } + public static class MockFollower extends UntypedActor { volatile boolean grantVote; volatile boolean dropAppendEntries; -- 2.36.6