+++ /dev/null
-/**
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.binding.impl;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.concurrent.GuardedBy;
-
-import org.opendaylight.yangtools.yang.binding.Notification;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * A multi-generation, isolated notification type to listener map.
- */
-final class GenerationalListenerMap {
- private final AtomicReference<ListenerMapGeneration> current = new AtomicReference<>(new ListenerMapGeneration());
-
- Iterable<NotificationListenerRegistration<?>> listenersFor(final Notification notification) {
- return current.get().listenersFor(notification);
- }
-
- Iterable<Class<? extends Notification>> getKnownTypes() {
- // Note: this relies on current having immutable listeners
- return current.get().getListeners().keySet();
- }
-
- @GuardedBy("this")
- private Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> mutableListeners() {
- return HashMultimap.create(current.get().getListeners());
- }
-
- synchronized void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
- Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> listeners =
- mutableListeners();
-
- for (NotificationListenerRegistration<?> reg : registrations) {
- listeners.put(reg.getType(), reg);
- }
-
- current.set(new ListenerMapGeneration(listeners));
- }
-
- synchronized void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
- Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> listeners =
- mutableListeners();
-
- for (NotificationListenerRegistration<?> reg : registrations) {
- listeners.remove(reg.getType(), reg);
- }
-
- current.set(new ListenerMapGeneration(listeners));
- }
-}
import java.util.Set;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.sal.binding.api.NotificationListener;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
public class NotificationBrokerImpl implements NotificationProviderService, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(NotificationBrokerImpl.class);
private final ListenerRegistry<NotificationInterestListener> interestListeners =
ListenerRegistry.create();
- private final GenerationalListenerMap listeners = new GenerationalListenerMap();
+ private final AtomicReference<ListenerMapGeneration> listeners = new AtomicReference<>(new ListenerMapGeneration());
private final ExecutorService executor;
public NotificationBrokerImpl(final ExecutorService executor) {
@Override
public void publish(final Notification notification, final ExecutorService service) {
- for (NotificationListenerRegistration<?> r : listeners.listenersFor(notification)) {
+ for (NotificationListenerRegistration<?> r : listeners.get().listenersFor(notification)) {
service.submit(new NotifyTask(r, notification));
}
}
+ @GuardedBy("this")
+ private Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> mutableListeners() {
+ return HashMultimap.create(listeners.get().getListeners());
+ }
+
private final void addRegistrations(final NotificationListenerRegistration<?>... registrations) {
- listeners.addRegistrations(registrations);
+ synchronized (this) {
+ final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
+ mutableListeners();
+ for (NotificationListenerRegistration<?> reg : registrations) {
+ newListeners.put(reg.getType(), reg);
+ }
+
+ listeners.set(new ListenerMapGeneration(newListeners));
+ }
+
+ // Notifications are dispatched out of lock...
for (NotificationListenerRegistration<?> reg : registrations) {
announceNotificationSubscription(reg.getType());
}
}
+ private synchronized void removeRegistrations(final NotificationListenerRegistration<?>... registrations) {
+ final Multimap<Class<? extends Notification>, NotificationListenerRegistration<?>> newListeners =
+ mutableListeners();
+
+ for (NotificationListenerRegistration<?> reg : registrations) {
+ newListeners.remove(reg.getType(), reg);
+ }
+
+ listeners.set(new ListenerMapGeneration(newListeners));
+ }
+
private void announceNotificationSubscription(final Class<? extends Notification> notification) {
for (final ListenerRegistration<NotificationInterestListener> listener : interestListeners) {
try {
@Override
public ListenerRegistration<NotificationInterestListener> registerInterestListener(final NotificationInterestListener interestListener) {
final ListenerRegistration<NotificationInterestListener> registration = this.interestListeners.register(interestListener);
- for (final Class<? extends Notification> notification : listeners.getKnownTypes()) {
+
+ for (final Class<? extends Notification> notification : listeners.get().getKnownTypes()) {
interestListener.onNotificationSubscribtion(notification);
}
return registration;
final NotificationListenerRegistration<T> reg = new AbstractNotificationListenerRegistration<T>(notificationType, listener) {
@Override
protected void removeRegistration() {
- listeners.removeRegistrations(this);
+ removeRegistrations(this);
}
};
return new AbstractListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener>(listener) {
@Override
protected void removeRegistration() {
- listeners.removeRegistrations(regs);
+ removeRegistrations(regs);
for (ListenerRegistration<?> reg : regs) {
reg.close();
}