Bump versions to 2.0.0-SNAPSHOT
[netconf.git] / netconf / messagebus-netconf / src / main / java / org / opendaylight / netconf / messagebus / eventsources / netconf / StreamNotificationTopicRegistration.java
index b01a57e704535cd574ceb520ffb523d69d56a098..d6752179558e1000306c69293ff44c0724b8cf38 100644 (file)
@@ -7,70 +7,51 @@
  */
 package org.opendaylight.netconf.messagebus.eventsources.netconf;
 
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import org.opendaylight.controller.md.sal.dom.api.DOMMountPoint;
-import org.opendaylight.controller.md.sal.dom.api.DOMNotificationService;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
-import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import java.util.concurrent.ExecutionException;
+import org.opendaylight.mdsal.dom.api.DOMNotificationListener;
+import org.opendaylight.mdsal.dom.api.DOMRpcResult;
 import org.opendaylight.yang.gen.v1.urn.cisco.params.xml.ns.yang.messagebus.eventaggregator.rev141202.TopicId;
-import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netconf.notification._1._0.rev080714.CreateSubscriptionInput;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.netmod.notification.rev080714.netconf.streams.Stream;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.DataContainerNodeAttrBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Topic registration for notification stream.
+ * Topic registration for notification with specified namespace from stream.
  */
-public class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
+@Deprecated(forRemoval = true)
+class StreamNotificationTopicRegistration extends NotificationTopicRegistration {
 
     private static final Logger LOG = LoggerFactory.getLogger(StreamNotificationTopicRegistration.class);
-    private static final NodeIdentifier STREAM_QNAME = NodeIdentifier.create(
-        QName.create(CreateSubscriptionInput.QNAME, "stream"));
-    private static final SchemaPath CREATE_SUBSCRIPTION = SchemaPath
-        .create(true, QName.create(CreateSubscriptionInput.QNAME, "create-subscription"));
-    private static final NodeIdentifier START_TIME_SUBSCRIPTION = NodeIdentifier.create(
-        QName.create(CreateSubscriptionInput.QNAME, "startTime"));
-    private static final NodeIdentifier CREATE_SUBSCRIPTION_INPUT = NodeIdentifier.create(
-        CreateSubscriptionInput.QNAME);
-
-    final private DOMMountPoint domMountPoint;
-    final private String nodeId;
-    final private NetconfEventSource netconfEventSource;
-    final private Stream stream;
-    private Date lastEventTime;
-
-    private ConcurrentHashMap<SchemaPath, ListenerRegistration<NetconfEventSource>> notificationRegistrationMap = new ConcurrentHashMap<>();
-    private ConcurrentHashMap<SchemaPath, ArrayList<TopicId>> notificationTopicMap = new ConcurrentHashMap<>();
+
+    private final String nodeId;
+    private final NetconfEventSource netconfEventSource;
+    private final NetconfEventSourceMount mountPoint;
+    private final ConcurrentHashMap<SchemaPath, ListenerRegistration<DOMNotificationListener>>
+            notificationRegistrationMap = new ConcurrentHashMap<>();
+    private final Stream stream;
 
     /**
      * Creates registration to notification stream.
-     * @param stream stream
+     *
+     * @param stream             stream
      * @param notificationPrefix notifications namespace
      * @param netconfEventSource event source
      */
-    public StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
-        NetconfEventSource netconfEventSource) {
+    StreamNotificationTopicRegistration(final Stream stream, final String notificationPrefix,
+                                        final NetconfEventSource netconfEventSource) {
         super(NotificationSourceType.NetconfDeviceStream, stream.getName().getValue(), notificationPrefix);
-        this.domMountPoint = netconfEventSource.getDOMMountPoint();
-        this.nodeId = netconfEventSource.getNode().getNodeId().getValue().toString();
         this.netconfEventSource = netconfEventSource;
+        this.mountPoint = netconfEventSource.getMount();
+        this.nodeId = mountPoint.getNode().getNodeId().getValue();
         this.stream = stream;
-        this.lastEventTime = null;
-        setReplaySupported(this.stream.isReplaySupport());
+        setReplaySupported(stream.getReplaySupport());
         setActive(false);
         LOG.info("StreamNotificationTopicRegistration initialized for {}", getStreamName());
     }
@@ -78,21 +59,17 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
     /**
      * Subscribes to notification stream associated with this registration.
      */
+    @Override
     void activateNotificationSource() {
-        if (isActive() == false) {
+        if (!isActive()) {
             LOG.info("Stream {} is not active on node {}. Will subscribe.", this.getStreamName(), this.nodeId);
-            final ContainerNode input = Builders.containerBuilder()
-                .withNodeIdentifier(CREATE_SUBSCRIPTION_INPUT)
-                .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName())).build();
-            CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get()
-                .invokeRpc(CREATE_SUBSCRIPTION, input);
+            final ListenableFuture<? extends DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream);
             try {
-                csFuture.checkedGet();
+                result.get();
                 setActive(true);
-            } catch (DOMRpcException e) {
-                LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.warn("Can not subscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
                 setActive(false);
-                return;
             }
         } else {
             LOG.info("Stream {} is now active on node {}", this.getStreamName(), this.nodeId);
@@ -100,39 +77,34 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
     }
 
     /**
-     * Subscribes to notification stream associated with this registration. If replay is supported, notifications from last
+     * Subscribes to notification stream associated with this registration. If replay is supported, notifications
+     * from last
      * received event time will be requested.
      */
+    @Override
     void reActivateNotificationSource() {
         if (isActive()) {
             LOG.info("Stream {} is reactivating on node {}.", this.getStreamName(), this.nodeId);
-            DataContainerNodeAttrBuilder<NodeIdentifier, ContainerNode> inputBuilder = Builders.containerBuilder()
-                .withNodeIdentifier(CREATE_SUBSCRIPTION_INPUT)
-                .withChild(ImmutableNodes.leafNode(STREAM_QNAME, this.getStreamName()));
-            if (isReplaySupported() && this.getLastEventTime() != null) {
-                inputBuilder.withChild(ImmutableNodes.leafNode(START_TIME_SUBSCRIPTION, this.getLastEventTime()));
-            }
-            final ContainerNode input = inputBuilder.build();
-            CheckedFuture<DOMRpcResult, DOMRpcException> csFuture = domMountPoint.getService(DOMRpcService.class).get()
-                .invokeRpc(CREATE_SUBSCRIPTION, input);
+            final ListenableFuture<? extends DOMRpcResult> result = mountPoint.invokeCreateSubscription(stream,
+                getLastEventTime());
             try {
-                csFuture.checkedGet();
+                result.get();
                 setActive(true);
-            } catch (DOMRpcException e) {
-                LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId);
+            } catch (InterruptedException | ExecutionException e) {
+                LOG.warn("Can not resubscribe stream {} on node {}", this.getSourceName(), this.nodeId, e);
                 setActive(false);
-                return;
             }
         }
     }
 
-    @Override void deActivateNotificationSource() {
+    @Override
+    void deActivateNotificationSource() {
         // no operations need
     }
 
     private void closeStream() {
         if (isActive()) {
-            for (ListenerRegistration<NetconfEventSource> reg : notificationRegistrationMap.values()) {
+            for (ListenerRegistration<DOMNotificationListener> reg : notificationRegistrationMap.values()) {
                 reg.close();
             }
             notificationRegistrationMap.clear();
@@ -145,51 +117,35 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
         return getSourceName();
     }
 
-    @Override ArrayList<TopicId> getNotificationTopicIds(SchemaPath notificationPath) {
-        return notificationTopicMap.get(notificationPath);
-    }
-
-    @Override boolean registerNotificationTopic(SchemaPath notificationPath, TopicId topicId) {
-
-        if (checkNotificationPath(notificationPath) == false) {
+    @Override
+    boolean registerNotificationTopic(final SchemaPath notificationPath, final TopicId topicId) {
+        if (!checkNotificationPath(notificationPath)) {
             LOG.debug("Bad SchemaPath for notification try to register");
             return false;
         }
 
-        final Optional<DOMNotificationService> notifyService = domMountPoint.getService(DOMNotificationService.class);
-        if (notifyService.isPresent() == false) {
-            LOG.debug("DOMNotificationService is not present");
-            return false;
-        }
-
         activateNotificationSource();
-        if (isActive() == false) {
+        if (!isActive()) {
             LOG.warn("Stream {} is not active, listener for notification {} is not registered.", getStreamName(),
-                notificationPath.toString());
+                    notificationPath);
             return false;
         }
 
-        ListenerRegistration<NetconfEventSource> registration = notifyService.get()
-            .registerNotificationListener(this.netconfEventSource, notificationPath);
+        ListenerRegistration<DOMNotificationListener> registration =
+                mountPoint.registerNotificationListener(netconfEventSource, notificationPath);
         notificationRegistrationMap.put(notificationPath, registration);
-        ArrayList<TopicId> topicIds = getNotificationTopicIds(notificationPath);
-        if (topicIds == null) {
-            topicIds = new ArrayList<>();
-            topicIds.add(topicId);
-        } else {
-            if (topicIds.contains(topicId) == false) {
-                topicIds.add(topicId);
-            }
-        }
+        Set<TopicId> topicIds = getTopicsForNotification(notificationPath);
+        topicIds.add(topicId);
 
         notificationTopicMap.put(notificationPath, topicIds);
         return true;
     }
 
-    @Override synchronized void unRegisterNotificationTopic(TopicId topicId) {
+    @Override
+    synchronized void unRegisterNotificationTopic(final TopicId topicId) {
         List<SchemaPath> notificationPathToRemove = new ArrayList<>();
         for (SchemaPath notifKey : notificationTopicMap.keySet()) {
-            ArrayList<TopicId> topicList = notificationTopicMap.get(notifKey);
+            Set<TopicId> topicList = notificationTopicMap.get(notifKey);
             if (topicList != null) {
                 topicList.remove(topicId);
                 if (topicList.isEmpty()) {
@@ -199,22 +155,15 @@ public class StreamNotificationTopicRegistration extends NotificationTopicRegist
         }
         for (SchemaPath notifKey : notificationPathToRemove) {
             notificationTopicMap.remove(notifKey);
-            ListenerRegistration<NetconfEventSource> reg = notificationRegistrationMap.remove(notifKey);
+            ListenerRegistration<DOMNotificationListener> reg = notificationRegistrationMap.remove(notifKey);
             if (reg != null) {
                 reg.close();
             }
         }
     }
 
-    Optional<Date> getLastEventTime() {
-        return Optional.fromNullable(lastEventTime);
-    }
-
-    void setLastEventTime(Date lastEventTime) {
-        this.lastEventTime = lastEventTime;
-    }
-
-    @Override public void close() throws Exception {
+    @Override
+    public void close() {
         closeStream();
     }