Bug 7746: Fix intermittent EOS test failure and synchronization
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipListenerSupport.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import akka.actor.ActorContext;
11 import akka.actor.ActorRef;
12 import akka.actor.PoisonPill;
13 import com.google.common.collect.HashMultimap;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Multimap;
16 import java.util.Collection;
17 import java.util.IdentityHashMap;
18 import java.util.Map;
19 import java.util.concurrent.locks.ReadWriteLock;
20 import java.util.concurrent.locks.ReentrantReadWriteLock;
21 import java.util.stream.Collectors;
22 import javax.annotation.concurrent.GuardedBy;
23 import javax.annotation.concurrent.ThreadSafe;
24 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipChangeState;
25 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
26 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
27 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Manages EntityOwnershipListener registrations and notifications for the EntityOwnershipShard.
33  *
34  * @author Thomas Pantelis
35  */
36 @ThreadSafe
37 class EntityOwnershipListenerSupport extends EntityOwnershipChangePublisher {
38     private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class);
39
40     private final String logId;
41     private final ActorContext actorContext;
42     private final ReadWriteLock listenerLock = new ReentrantReadWriteLock();
43
44     @GuardedBy("listenerLock")
45     private final Map<DOMEntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
46
47     @GuardedBy("listenerLock")
48     private final Multimap<String, DOMEntityOwnershipListener> entityTypeListenerMap = HashMultimap.create();
49
50     private volatile boolean inJeopardy = false;
51
52     EntityOwnershipListenerSupport(ActorContext actorContext, String logId) {
53         this.actorContext = actorContext;
54         this.logId = logId;
55     }
56
57     String getLogId() {
58         return logId;
59     }
60
61     /**
62      * Set the in-jeopardy flag and indicate its previous state.
63      *
64      * @param inJeopardy new value of the in-jeopardy flag
65      * @return Previous value of the flag.
66      */
67     boolean setInJeopardy(final boolean inJeopardy) {
68         final boolean wasInJeopardy = this.inJeopardy;
69         this.inJeopardy = inJeopardy;
70         return wasInJeopardy;
71     }
72
73     void addEntityOwnershipListener(String entityType, DOMEntityOwnershipListener listener) {
74         LOG.debug("{}: Adding EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
75
76         listenerLock.writeLock().lock();
77         try {
78             if (entityTypeListenerMap.put(entityType, listener)) {
79                 ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
80                 if (listenerEntry == null) {
81                     listenerActorMap.put(listener, new ListenerActorRefEntry(listener));
82                 } else {
83                     listenerEntry.referenceCount++;
84                 }
85             }
86         } finally {
87             listenerLock.writeLock().unlock();
88         }
89     }
90
91     void removeEntityOwnershipListener(String entityType, DOMEntityOwnershipListener listener) {
92         LOG.debug("{}: Removing EntityOwnershipListener {} for entity type {}", logId, listener, entityType);
93
94         listenerLock.writeLock().lock();
95         try {
96             if (entityTypeListenerMap.remove(entityType, listener)) {
97                 ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
98
99                 LOG.debug("{}: Found {}", logId, listenerEntry);
100
101                 listenerEntry.referenceCount--;
102                 if (listenerEntry.referenceCount <= 0) {
103                     listenerActorMap.remove(listener);
104
105                     if (listenerEntry.actorRef != null) {
106                         LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef);
107                         listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
108                     }
109                 }
110             }
111         } finally {
112             listenerLock.writeLock().unlock();
113         }
114     }
115
116     @Override
117     void notifyEntityOwnershipListeners(DOMEntity entity, boolean wasOwner, boolean isOwner, boolean hasOwner) {
118         listenerLock.readLock().lock();
119         try {
120             Collection<DOMEntityOwnershipListener> listeners = entityTypeListenerMap.get(entity.getType());
121             if (!listeners.isEmpty()) {
122                 notifyListeners(entity, wasOwner, isOwner, hasOwner, listeners.stream().map(
123                     listener -> listenerActorMap.get(listener)).collect(Collectors.toList()));
124             }
125         } finally {
126             listenerLock.readLock().unlock();
127         }
128     }
129
130     void notifyEntityOwnershipListener(DOMEntity entity, boolean wasOwner, boolean isOwner, boolean hasOwner,
131             DOMEntityOwnershipListener listener) {
132         listenerLock.readLock().lock();
133         try {
134             notifyListeners(entity, wasOwner, isOwner, hasOwner, ImmutableList.of(listenerActorMap.get(listener)));
135         } finally {
136             listenerLock.readLock().unlock();
137         }
138     }
139
140     @GuardedBy("listenerLock")
141     private void notifyListeners(DOMEntity entity, boolean wasOwner, boolean isOwner, boolean hasOwner,
142             Collection<ListenerActorRefEntry> listenerEntries) {
143         DOMEntityOwnershipChange changed = new DOMEntityOwnershipChange(entity,
144                 EntityOwnershipChangeState.from(wasOwner, isOwner, hasOwner), inJeopardy);
145         for (ListenerActorRefEntry entry: listenerEntries) {
146             ActorRef listenerActor = entry.actorFor();
147
148             LOG.debug("{}: Notifying EntityOwnershipListenerActor {} with {}", logId, listenerActor, changed);
149
150             listenerActor.tell(changed, ActorRef.noSender());
151         }
152     }
153
154     private class ListenerActorRefEntry {
155         final DOMEntityOwnershipListener listener;
156
157         @GuardedBy("listenerLock")
158         ActorRef actorRef;
159
160         @GuardedBy("listenerLock")
161         int referenceCount = 1;
162
163         ListenerActorRefEntry(final DOMEntityOwnershipListener listener) {
164             this.listener = listener;
165         }
166
167         ActorRef actorFor() {
168             if (actorRef == null) {
169                 actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener));
170
171                 LOG.debug("{}: Created EntityOwnershipListenerActor {} for listener {}", logId, actorRef, listener);
172             }
173
174             return actorRef;
175         }
176
177         @Override
178         public String toString() {
179             return "ListenerActorRefEntry [actorRef=" + actorRef + ", referenceCount=" + referenceCount + "]";
180         }
181     }
182 }