import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.YangLibraryChange;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.library.rev190104.YangLibraryUpdate;
import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
+import org.opendaylight.yangtools.concepts.AbstractRegistration;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.model.api.stmt.SchemaNodeIdentifier.Absolute;
private final Multiset<StreamNameType> availableStreams = HashMultiset.create();
@GuardedBy("this")
- private final Set<GenericNotificationPublisherReg> notificationPublishers = new HashSet<>();
+ private final Set<PublisherReg> notificationPublishers = new HashSet<>();
private final NotificationsTransformUtil transformUtil;
@Inject
notificationListeners.clear();
// Unregister all publishers
- // Use new list to avoid ConcurrentModificationException
- for (final GenericNotificationPublisherReg notificationPublisher : new ArrayList<>(notificationPublishers)) {
- notificationPublisher.close();
- }
+ // Use new list to avoid ConcurrentModificationException when the registration removes itself
+ List.copyOf(notificationPublishers).forEach(PublisherReg::close);
notificationPublishers.clear();
// Clear stream Listeners
@Override
public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) {
- final StreamNameType streamName = requireNonNull(stream).getName();
+ final var streamName = requireNonNull(stream).getName();
+ final var reg = new PublisherReg(this, streamName);
LOG.debug("Notification publisher registered for stream: {}", streamName);
if (LOG.isTraceEnabled()) {
LOG.trace("Notification publisher registered for stream: {}", stream);
}
- if (streamMetadata.containsKey(streamName)) {
- LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName,
- streamMetadata.get(streamName));
- } else {
- streamMetadata.put(streamName, stream);
+ final var prev = streamMetadata.putIfAbsent(streamName, stream);
+ if (prev != null) {
+ LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName, prev);
}
availableStreams.add(streamName);
- final GenericNotificationPublisherReg reg = new GenericNotificationPublisherReg(this, streamName) {
- @Override
- public void close() {
- synchronized (NetconfNotificationManager.this) {
- super.close();
- }
- }
- };
-
notificationPublishers.add(reg);
notifyStreamAdded(stream);
return reg;
}
- private void unregisterNotificationPublisher(
- final StreamNameType streamName,
- final GenericNotificationPublisherReg genericNotificationPublisherReg) {
+ private synchronized void unregisterNotificationPublisher(final StreamNameType streamName, final PublisherReg reg) {
availableStreams.remove(streamName);
- notificationPublishers.remove(genericNotificationPublisherReg);
+ notificationPublishers.remove(reg);
LOG.debug("Notification publisher unregistered for stream: {}", streamName);
return new YangLibraryPublisherReg(transformUtil, notificationPublisherRegistration);
}
- private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration {
- private NetconfNotificationManager baseListener;
+ private static final class PublisherReg extends AbstractRegistration implements NotificationPublisherRegistration {
private final StreamNameType registeredStream;
- GenericNotificationPublisherReg(final NetconfNotificationManager baseListener,
- final StreamNameType registeredStream) {
- this.baseListener = baseListener;
- this.registeredStream = registeredStream;
+ private NetconfNotificationManager manager;
+
+ PublisherReg(final NetconfNotificationManager manager, final StreamNameType registeredStream) {
+ this.manager = requireNonNull(manager);
+ this.registeredStream = requireNonNull(registeredStream);
}
@Override
- public void close() {
- baseListener.unregisterNotificationPublisher(registeredStream, this);
- baseListener = null;
+ public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
+ checkArgument(stream.equals(registeredStream),
+ "Registered on %s, cannot publish to %s", registeredStream, stream);
+ checkState(notClosed(), "Already closed");
+ manager.onNotification(stream, notification);
}
@Override
- public void onNotification(final StreamNameType stream, final NotificationMessage notification) {
- checkState(baseListener != null, "Already closed");
- checkArgument(stream.equals(registeredStream));
- baseListener.onNotification(stream, notification);
+ protected void removeRegistration() {
+ manager.unregisterNotificationPublisher(registeredStream, this);
+ manager = null;
}
}