import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.spi.NotificationInvokerFactory.NotificationInvoker;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
private final ListenerRegistry<NotificationInterestListener> interestListeners =
ListenerRegistry.create();
- private final GenerationalListenerMap listeners = new GenerationalListenerMap();
+ private final AtomicReference<ListenerMapGeneration> listeners = new AtomicReference<>(new ListenerMapGeneration());
private final ExecutorService executor;
public NotificationBrokerImpl(final ExecutorService executor) {
@Override
public void publish(final Notification notification, final ExecutorService service) {
- for (NotificationListenerRegistration<?> r : listeners.listenersFor(notification)) {
+ for (NotificationListenerRegistration<?> r : listeners.get().listenersFor(notification)) {
service.submit(new NotifyTask(r, notification));
}
}
+ @GuardedBy("this")
+ private Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> mutableListeners() {
+ return HashMultimap.create(listeners.get().getListeners());
+ }
+
private final void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
- listeners.addRegistrations(registrations);
+ synchronized (this) {
+ final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
+ mutableListeners();
+ for (NotificationListenerRegistration<?> reg : registrations) {
+ newListeners.put(reg.getType(), reg);
+ }
+
+ listeners.set(new ListenerMapGeneration(newListeners));
+ }
+
+ // Notifications are dispatched out of lock...
for (NotificationListenerRegistration<?> reg : registrations) {
announceNotificationSubscription(reg.getType());
}
}
+ private synchronized void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
+ final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
+ mutableListeners();
+
+ for (NotificationListenerRegistration<?> reg : registrations) {
+ newListeners.remove(reg.getType(), reg);
+ }
+
+ listeners.set(new ListenerMapGeneration(newListeners));
+ }
+
private void announceNotificationSubscription(final Class<? extends Notification> notification) {
for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
try {
@Override
public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
- for (final Class<? extends Notification> notification : listeners.getKnownTypes()) {
+
+ for (final Class<? extends Notification> notification : listeners.get().getKnownTypes()) {
interestListener.onNotificationSubscribtion(notification);
}
return registration;
final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
@Override
protected void removeRegistration() {
- listeners.removeRegistrations(this);
+ removeRegistrations(this);
}
};
return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
@Override
protected void removeRegistration() {
- listeners.removeRegistrations(regs);
+ removeRegistrations(regs);
for (ListenerRegistration<?> reg : regs) {
reg.close();
}