X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-notifications-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnotifications%2Fimpl%2FNetconfNotificationManager.java;fp=opendaylight%2Fnetconf%2Fnetconf-notifications-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Fnotifications%2Fimpl%2FNetconfNotificationManager.java;h=d2dbcaf4162f76b5802b509dd8bdf2ea2a06770e;hb=65cde02cd20da08f68934329711d5c28b12f6e56;hp=0000000000000000000000000000000000000000;hpb=c1f5a28943e916b6d61cbe707894b4d5af325af8;p=controller.git diff --git a/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManager.java b/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManager.java new file mode 100644 index 0000000000..d2dbcaf416 --- /dev/null +++ b/opendaylight/netconf/netconf-notifications-impl/src/main/java/org/opendaylight/controller/netconf/notifications/impl/NetconfNotificationManager.java @@ -0,0 +1,283 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.notifications.impl; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; +import com.google.common.collect.Sets; +import java.util.Map; +import java.util.Set; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; +import org.opendaylight.controller.netconf.notifications.BaseNotificationPublisherRegistration; +import org.opendaylight.controller.netconf.notifications.NetconfNotification; +import org.opendaylight.controller.netconf.notifications.NetconfNotificationCollector; +import org.opendaylight.controller.netconf.notifications.NetconfNotificationListener; +import org.opendaylight.controller.netconf.notifications.NetconfNotificationRegistry; +import org.opendaylight.controller.netconf.notifications.NotificationListenerRegistration; +import org.opendaylight.controller.netconf.notifications.NotificationPublisherRegistration; +import org.opendaylight.controller.netconf.notifications.NotificationRegistration; +import org.opendaylight.controller.netconf.notifications.impl.ops.NotificationsTransformUtil; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.StreamNameType; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.Streams; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.StreamsBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamBuilder; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.StreamKey; +import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.netconf.notifications.rev120206.NetconfCapabilityChange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ThreadSafe +public class NetconfNotificationManager implements NetconfNotificationCollector, NetconfNotificationRegistry, NetconfNotificationListener, AutoCloseable { + + public static final StreamNameType BASE_STREAM_NAME = new StreamNameType("NETCONF"); + public static final Stream BASE_NETCONF_STREAM; + + static { + BASE_NETCONF_STREAM = new StreamBuilder() + .setName(BASE_STREAM_NAME) + .setKey(new StreamKey(BASE_STREAM_NAME)) + .setReplaySupport(false) + .setDescription("Default Event Stream") + .build(); + } + + private static final Logger LOG = LoggerFactory.getLogger(NetconfNotificationManager.class); + + // TODO excessive synchronization provides thread safety but is most likely not optimal (combination of concurrent collections might improve performance) + // And also calling callbacks from a synchronized block is dangerous since the listeners/publishers can block the whole notification processing + + @GuardedBy("this") + private final Multimap notificationListeners = HashMultimap.create(); + + @GuardedBy("this") + private final Set streamListeners = Sets.newHashSet(); + + @GuardedBy("this") + private final Map streamMetadata = Maps.newHashMap(); + + @GuardedBy("this") + private final Multiset availableStreams = HashMultiset.create(); + + @GuardedBy("this") + private final Set notificationPublishers = Sets.newHashSet(); + + @Override + public synchronized void onNotification(final StreamNameType stream, final NetconfNotification notification) { + LOG.debug("Notification of type {} detected", stream); + if(LOG.isTraceEnabled()) { + LOG.debug("Notification of type {} detected: {}", stream, notification); + } + + for (final GenericNotificationListenerReg listenerReg : notificationListeners.get(BASE_STREAM_NAME)) { + listenerReg.getListener().onNotification(BASE_STREAM_NAME, notification); + } + } + + @Override + public synchronized NotificationListenerRegistration registerNotificationListener(final StreamNameType stream, final NetconfNotificationListener listener) { + Preconditions.checkNotNull(stream); + Preconditions.checkNotNull(listener); + + LOG.trace("Notification listener registered for stream: {}", stream); + + final GenericNotificationListenerReg genericNotificationListenerReg = new GenericNotificationListenerReg(listener) { + @Override + public void close() { + synchronized (NetconfNotificationManager.this) { + LOG.trace("Notification listener unregistered for stream: {}", stream); + super.close(); + } + } + }; + + notificationListeners.put(BASE_STREAM_NAME, genericNotificationListenerReg); + return genericNotificationListenerReg; + } + + @Override + public synchronized Streams getNotificationPublishers() { + return new StreamsBuilder().setStream(Lists.newArrayList(streamMetadata.values())).build(); + } + + @Override + public synchronized boolean isStreamAvailable(final StreamNameType streamNameType) { + return availableStreams.contains(streamNameType); + } + + @Override + public synchronized NotificationRegistration registerStreamListener(final NetconfNotificationStreamListener listener) { + streamListeners.add(listener); + + // Notify about all already available + for (final Stream availableStream : streamMetadata.values()) { + listener.onStreamRegistered(availableStream); + } + + return new NotificationRegistration() { + @Override + public void close() { + synchronized(NetconfNotificationManager.this) { + streamListeners.remove(listener); + } + } + }; + } + + @Override + public synchronized void close() { + // Unregister all listeners + for (final GenericNotificationListenerReg genericNotificationListenerReg : notificationListeners.values()) { + genericNotificationListenerReg.close(); + } + notificationListeners.clear(); + + // Unregister all publishers + for (final GenericNotificationPublisherReg notificationPublisher : notificationPublishers) { + notificationPublisher.close(); + } + notificationPublishers.clear(); + + // Clear stream Listeners + streamListeners.clear(); + } + + @Override + public synchronized NotificationPublisherRegistration registerNotificationPublisher(final Stream stream) { + Preconditions.checkNotNull(stream); + final StreamNameType streamName = stream.getName(); + + LOG.debug("Notification publisher registered for stream: {}", streamName); + if(LOG.isTraceEnabled()) { + LOG.trace("Notification publisher registered for stream: {}", stream); + } + + if(streamMetadata.containsKey(streamName)) { + LOG.warn("Notification stream {} already registered as: {}. Will be reused", streamName, streamMetadata.get(streamName)); + } else { + streamMetadata.put(streamName, stream); + } + + availableStreams.add(streamName); + + final GenericNotificationPublisherReg genericNotificationPublisherReg = new GenericNotificationPublisherReg(this, streamName) { + @Override + public void close() { + synchronized (NetconfNotificationManager.this) { + super.close(); + } + } + }; + + notificationPublishers.add(genericNotificationPublisherReg); + + notifyStreamAdded(stream); + return genericNotificationPublisherReg; + } + + private void unregisterNotificationPublisher(final StreamNameType streamName, final GenericNotificationPublisherReg genericNotificationPublisherReg) { + availableStreams.remove(streamName); + notificationPublishers.remove(genericNotificationPublisherReg); + + LOG.debug("Notification publisher unregistered for stream: {}", streamName); + + // Notify stream listeners if all publishers are gone and also clear metadata for stream + if (!isStreamAvailable(streamName)) { + LOG.debug("Notification stream: {} became unavailable", streamName); + streamMetadata.remove(streamName); + notifyStreamRemoved(streamName); + } + } + + private synchronized void notifyStreamAdded(final Stream stream) { + for (final NetconfNotificationStreamListener streamListener : streamListeners) { + streamListener.onStreamRegistered(stream); + } + } + private synchronized void notifyStreamRemoved(final StreamNameType stream) { + for (final NetconfNotificationStreamListener streamListener : streamListeners) { + streamListener.onStreamUnregistered(stream); + } + } + + @Override + public BaseNotificationPublisherRegistration registerBaseNotificationPublisher() { + final NotificationPublisherRegistration notificationPublisherRegistration = registerNotificationPublisher(BASE_NETCONF_STREAM); + return new BaseNotificationPublisherReg(notificationPublisherRegistration); + } + + private static class GenericNotificationPublisherReg implements NotificationPublisherRegistration { + private NetconfNotificationManager baseListener; + private final StreamNameType registeredStream; + + public GenericNotificationPublisherReg(final NetconfNotificationManager baseListener, final StreamNameType registeredStream) { + this.baseListener = baseListener; + this.registeredStream = registeredStream; + } + + @Override + public void close() { + baseListener.unregisterNotificationPublisher(registeredStream, this); + baseListener = null; + } + + @Override + public void onNotification(final StreamNameType stream, final NetconfNotification notification) { + Preconditions.checkState(baseListener != null, "Already closed"); + Preconditions.checkArgument(stream.equals(registeredStream)); + baseListener.onNotification(stream, notification); + } + } + + private static class BaseNotificationPublisherReg implements BaseNotificationPublisherRegistration { + + private final NotificationPublisherRegistration baseRegistration; + + public BaseNotificationPublisherReg(final NotificationPublisherRegistration baseRegistration) { + this.baseRegistration = baseRegistration; + } + + @Override + public void close() { + baseRegistration.close(); + } + + @Override + public void onCapabilityChanged(final NetconfCapabilityChange capabilityChange) { + baseRegistration.onNotification(BASE_STREAM_NAME, serializeNotification(capabilityChange)); + } + + private static NetconfNotification serializeNotification(final NetconfCapabilityChange capabilityChange) { + return NotificationsTransformUtil.transform(capabilityChange); + } + } + + private class GenericNotificationListenerReg implements NotificationListenerRegistration { + private final NetconfNotificationListener listener; + + public GenericNotificationListenerReg(final NetconfNotificationListener listener) { + this.listener = listener; + } + + public NetconfNotificationListener getListener() { + return listener; + } + + @Override + public void close() { + notificationListeners.remove(BASE_STREAM_NAME, this); + } + } +}