Bug 7746: Fix intermittent EOS test failure and synchronization
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipListenerSupport.java
index be82582e06d2e356b4446e0e51d86c4f7a125f7b..d2b4dd3077d60b80d1112534246e7c0cded97c99 100644 (file)
@@ -11,13 +11,16 @@ import akka.actor.ActorContext;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimap;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
@@ -30,14 +33,20 @@ import org.slf4j.LoggerFactory;
  *
  * @author Thomas Pantelis
  */
-class EntityOwnershipListenerSupport {
+@ThreadSafe
+class EntityOwnershipListenerSupport extends EntityOwnershipChangePublisher {
     private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class);
 
     private final String logId;
     private final ActorContext actorContext;
+    private final ReadWriteLock listenerLock = new ReentrantReadWriteLock();
+
+    @GuardedBy("listenerLock")
     private final Map<DOMEntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
-    private final Set<DOMEntity> entitiesWithCandidateSet = new HashSet<>();
+
+    @GuardedBy("listenerLock")
     private final Multimap<String, DOMEntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
+
     private volatile boolean inJeopardy = false;
 
     EntityOwnershipListenerSupport(ActorContext actorContext, String logId) {
@@ -61,52 +70,80 @@ class EntityOwnershipListenerSupport {
         return wasInJeopardy;
     }
 
-    boolean hasCandidateForEntity(DOMEntity entity) {
-        return entitiesWithCandidateSet.contains(entity);
-    }
-
-    void setHasCandidateForEntity(DOMEntity entity) {
-        entitiesWithCandidateSet.add(entity);
-    }
-
-    void unsetHasCandidateForEntity(DOMEntity entity) {
-        entitiesWithCandidateSet.remove(entity);
-    }
-
     void addEntityOwnershipListener(String entityType, DOMEntityOwnershipListener listener) {
         LOG.debug("{}: Adding EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
 
-        addListener(listener, entityType);
+        listenerLock.writeLock().lock();
+        try {
+            if (entityTypeListenerMap.put(entityType, listener)) {
+                ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+                if (listenerEntry == null) {
+                    listenerActorMap.put(listener, new ListenerActorRefEntry(listener));
+                } else {
+                    listenerEntry.referenceCount++;
+                }
+            }
+        } finally {
+            listenerLock.writeLock().unlock();
+        }
     }
 
     void removeEntityOwnershipListener(String entityType, DOMEntityOwnershipListener listener) {
         LOG.debug("{}: Removing EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
 
-        removeListener(listener, entityType);
+        listenerLock.writeLock().lock();
+        try {
+            if (entityTypeListenerMap.remove(entityType, listener)) {
+                ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+
+                LOG.debug("{}: Found {}", logId, listenerEntry);
+
+                listenerEntry.referenceCount--;
+                if (listenerEntry.referenceCount <= 0) {
+                    listenerActorMap.remove(listener);
+
+                    if (listenerEntry.actorRef != null) {
+                        LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef);
+                        listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+                    }
+                }
+            }
+        } finally {
+            listenerLock.writeLock().unlock();
+        }
     }
 
+    @Override
     void notifyEntityOwnershipListeners(DOMEntity entity, boolean wasOwner, boolean isOwner, boolean hasOwner) {
-        notifyListeners(entity, entity.getType(), wasOwner, isOwner, hasOwner);
+        listenerLock.readLock().lock();
+        try {
+            Collection<DOMEntityOwnershipListener> listeners = entityTypeListenerMap.get(entity.getType());
+            if (!listeners.isEmpty()) {
+                notifyListeners(entity, wasOwner, isOwner, hasOwner, listeners.stream().map(
+                    listener -> listenerActorMap.get(listener)).collect(Collectors.toList()));
+            }
+        } finally {
+            listenerLock.readLock().unlock();
+        }
     }
 
     void notifyEntityOwnershipListener(DOMEntity entity, boolean wasOwner, boolean isOwner, boolean hasOwner,
             DOMEntityOwnershipListener listener) {
-        notifyListeners(entity, wasOwner, isOwner, hasOwner, Collections.singleton(listener));
-    }
-
-    private void notifyListeners(DOMEntity entity, String mapKey, boolean wasOwner, boolean isOwner, boolean hasOwner) {
-        Collection<DOMEntityOwnershipListener> listeners = entityTypeListenerMap.get(mapKey);
-        if (!listeners.isEmpty()) {
-            notifyListeners(entity, wasOwner, isOwner, hasOwner, listeners);
+        listenerLock.readLock().lock();
+        try {
+            notifyListeners(entity, wasOwner, isOwner, hasOwner, ImmutableList.of(listenerActorMap.get(listener)));
+        } finally {
+            listenerLock.readLock().unlock();
         }
     }
 
+    @GuardedBy("listenerLock")
     private void notifyListeners(DOMEntity entity, boolean wasOwner, boolean isOwner, boolean hasOwner,
-            Collection<DOMEntityOwnershipListener> listeners) {
+            Collection<ListenerActorRefEntry> listenerEntries) {
         DOMEntityOwnershipChange changed = new DOMEntityOwnershipChange(entity,
                 EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner), inJeopardy);
-        for (DOMEntityOwnershipListener listener: listeners) {
-            ActorRef listenerActor = listenerActorFor(listener);
+        for (ListenerActorRefEntry entry: listenerEntries) {
+            ActorRef listenerActor = entry.actorFor();
 
             LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);
 
@@ -114,44 +151,20 @@ class EntityOwnershipListenerSupport {
         }
     }
 
-    private void addListener(DOMEntityOwnershipListener listener, String mapKey) {
-        if (entityTypeListenerMap.put(mapKey, listener)) {
-            ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
-            if (listenerEntry == null) {
-                listenerActorMap.put(listener, new ListenerActorRefEntry());
-            } else {
-                listenerEntry.referenceCount++;
-            }
-        }
-    }
-
-    private void removeListener(DOMEntityOwnershipListener listener, String mapKey) {
-        if (entityTypeListenerMap.remove(mapKey, listener)) {
-            ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+    private class ListenerActorRefEntry {
+        final DOMEntityOwnershipListener listener;
 
-            LOG.debug("{}: Found {}", logId, listenerEntry);
+        @GuardedBy("listenerLock")
+        ActorRef actorRef;
 
-            listenerEntry.referenceCount--;
-            if (listenerEntry.referenceCount <= 0) {
-                listenerActorMap.remove(listener);
+        @GuardedBy("listenerLock")
+        int referenceCount = 1;
 
-                if (listenerEntry.actorRef != null) {
-                    LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef);
-                    listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
-                }
-            }
+        ListenerActorRefEntry(final DOMEntityOwnershipListener listener) {
+            this.listener = listener;
         }
-    }
-
-    private ActorRef listenerActorFor(DOMEntityOwnershipListener listener) {
-        return listenerActorMap.get(listener).actorFor(listener);
-    }
-
-    private class ListenerActorRefEntry {
-        ActorRef actorRef;
-        int referenceCount = 1;
 
-        ActorRef actorFor(DOMEntityOwnershipListener listener) {
+        ActorRef actorFor() {
             if (actorRef == null) {
                 actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener));