import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
-import java.util.Arrays;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Map;
-import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
-import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
-import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
+import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*
* @author Thomas Pantelis
*/
-class EntityOwnershipListenerSupport {
+@ThreadSafe
+class EntityOwnershipListenerSupport extends EntityOwnershipChangePublisher {
private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class);
private final String logId;
private final ActorContext actorContext;
- private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
- private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
- private final Multimap<String, EntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
+ private final ReadWriteLock listenerLock = new ReentrantReadWriteLock();
- EntityOwnershipListenerSupport(ActorContext actorContext, String logId) {
+ @GuardedBy("listenerLock")
+ private final Map<DOMEntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
+
+ @GuardedBy("listenerLock")
+ private final Multimap<String, DOMEntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
+
+ private volatile boolean inJeopardy = false;
+
+ EntityOwnershipListenerSupport(final ActorContext actorContext, final String logId) {
this.actorContext = actorContext;
this.logId = logId;
}
+ @Override
String getLogId() {
return logId;
}
- boolean hasCandidateForEntity(Entity entity) {
- for(EntityOwnershipListener listener: entityListenerMap.get(entity)) {
- if(listener instanceof EntityOwnershipCandidate) {
- return true;
- }
- }
-
- return false;
- }
-
- void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
- LOG.debug("{}: Adding EntityOwnershipListener {} for {}", logId, listener, entity);
-
- addListener(listener, entity, entityListenerMap);
+ /**
+ * Set the in-jeopardy flag and indicate its previous state.
+ *
+ * @param inJeopardy new value of the in-jeopardy flag
+ * @return Previous value of the flag.
+ */
+ @SuppressWarnings("checkstyle:hiddenField")
+ boolean setInJeopardy(final boolean inJeopardy) {
+ final boolean wasInJeopardy = this.inJeopardy;
+ this.inJeopardy = inJeopardy;
+ return wasInJeopardy;
}
- void addEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
+ void addEntityOwnershipListener(final String entityType, final DOMEntityOwnershipListener listener) {
LOG.debug("{}: Adding EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
- addListener(listener, entityType, entityTypeListenerMap);
- }
-
- void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
- LOG.debug("{}: Removing EntityOwnershipListener {} for {}", logId, listener, entity);
-
- removeListener(listener, entity, entityListenerMap);
+ listenerLock.writeLock().lock();
+ try {
+ if (entityTypeListenerMap.put(entityType, listener)) {
+ ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+ if (listenerEntry == null) {
+ listenerActorMap.put(listener, new ListenerActorRefEntry(listener));
+ } else {
+ listenerEntry.referenceCount++;
+ }
+ }
+ } finally {
+ listenerLock.writeLock().unlock();
+ }
}
- void removeEntityOwnershipListener(String entityType, EntityOwnershipListener listener) {
+ void removeEntityOwnershipListener(final String entityType, final DOMEntityOwnershipListener listener) {
LOG.debug("{}: Removing EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
- removeListener(listener, entityType, entityTypeListenerMap);
- }
+ listenerLock.writeLock().lock();
+ try {
+ if (entityTypeListenerMap.remove(entityType, listener)) {
+ ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
- void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) {
- notifyListeners(entity, entity, wasOwner, isOwner, entityListenerMap);
- notifyListeners(entity, entity.getType(), wasOwner, isOwner, entityTypeListenerMap);
- }
+ LOG.debug("{}: Found {}", logId, listenerEntry);
- void notifyEntityOwnershipListener(Entity entity, boolean wasOwner, boolean isOwner,
- EntityOwnershipListener listener) {
- notifyListeners(entity, wasOwner, isOwner, Arrays.asList(listener));
- }
+ listenerEntry.referenceCount--;
+ if (listenerEntry.referenceCount <= 0) {
+ listenerActorMap.remove(listener);
- private <T> void notifyListeners(Entity entity, T mapKey, boolean wasOwner, boolean isOwner,
- Multimap<T, EntityOwnershipListener> listenerMap) {
- Collection<EntityOwnershipListener> listeners = listenerMap.get(mapKey);
- if(!listeners.isEmpty()) {
- notifyListeners(entity, wasOwner, isOwner, listeners);
+ if (listenerEntry.actorRef != null) {
+ LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef);
+ listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+ }
+ }
+ } finally {
+ listenerLock.writeLock().unlock();
}
}
- private void notifyListeners(Entity entity, boolean wasOwner, boolean isOwner,
- Collection<EntityOwnershipListener> listeners) {
- EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner);
- for(EntityOwnershipListener listener: listeners) {
- ActorRef listenerActor = listenerActorFor(listener);
-
- LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);
-
- listenerActor.tell(changed, ActorRef.noSender());
+ @Override
+ void notifyEntityOwnershipListeners(final DOMEntity entity, final boolean wasOwner, final boolean isOwner,
+ final boolean hasOwner) {
+ listenerLock.readLock().lock();
+ try {
+ Collection<DOMEntityOwnershipListener> listeners = entityTypeListenerMap.get(entity.getType());
+ if (!listeners.isEmpty()) {
+ notifyListeners(entity, wasOwner, isOwner, hasOwner,
+ listeners.stream().map(listenerActorMap::get).collect(Collectors.toList()));
+ }
+ } finally {
+ listenerLock.readLock().unlock();
}
}
- private <T> void addListener(EntityOwnershipListener listener, T mapKey,
- Multimap<T, EntityOwnershipListener> toListenerMap) {
- if(toListenerMap.put(mapKey, listener)) {
- ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
- if(listenerEntry == null) {
- listenerActorMap.put(listener, new ListenerActorRefEntry());
- } else {
- listenerEntry.referenceCount++;
- }
+ void notifyEntityOwnershipListener(final DOMEntity entity, final boolean wasOwner, final boolean isOwner,
+ final boolean hasOwner, final DOMEntityOwnershipListener listener) {
+ listenerLock.readLock().lock();
+ try {
+ notifyListeners(entity, wasOwner, isOwner, hasOwner, ImmutableList.of(listenerActorMap.get(listener)));
+ } finally {
+ listenerLock.readLock().unlock();
}
}
- private <T> void removeListener(EntityOwnershipListener listener, T mapKey,
- Multimap<T, EntityOwnershipListener> fromListenerMap) {
- if(fromListenerMap.remove(mapKey, listener)) {
- ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+ @GuardedBy("listenerLock")
+ private void notifyListeners(final DOMEntity entity, final boolean wasOwner, final boolean isOwner,
+ final boolean hasOwner, final Collection<ListenerActorRefEntry> listenerEntries) {
+ DOMEntityOwnershipChange changed = new DOMEntityOwnershipChange(entity,
+ EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner), inJeopardy);
+ for (ListenerActorRefEntry entry: listenerEntries) {
+ ActorRef listenerActor = entry.actorFor();
- LOG.debug("{}: Found {}", logId, listenerEntry);
-
- listenerEntry.referenceCount--;
- if(listenerEntry.referenceCount <= 0) {
- listenerActorMap.remove(listener);
+ LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);
- if(listenerEntry.actorRef != null) {
- LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef);
- listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
+ listenerActor.tell(changed, ActorRef.noSender());
}
}
- private ActorRef listenerActorFor(EntityOwnershipListener listener) {
- return listenerActorMap.get(listener).actorFor(listener);
- }
-
private class ListenerActorRefEntry {
+ final DOMEntityOwnershipListener listener;
+
+ @GuardedBy("listenerLock")
ActorRef actorRef;
+
+ @GuardedBy("listenerLock")
int referenceCount = 1;
- ActorRef actorFor(EntityOwnershipListener listener) {
- if(actorRef == null) {
+ ListenerActorRefEntry(final DOMEntityOwnershipListener listener) {
+ this.listener = listener;
+ }
+
+ ActorRef actorFor() {
+ if (actorRef == null) {
actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener));
LOG.debug("{}: Created EntityOwnershipListenerActor {} for listener {}", logId, actorRef, listener);