Move streams support classes
[netconf.git] / restconf / restconf-nb-rfc8040 / src / main / java / org / opendaylight / restconf / nb / rfc8040 / streams / listeners / ListenerAdapter.java
index 8e3de8fde7a0a70cae8dc6b7c441d6dc024ee795..b438f36b0223c50b501916933b8b4a412c40d7c3 100644 (file)
@@ -7,43 +7,33 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
 import java.time.Instant;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
-import javax.xml.xpath.XPathExpressionException;
+import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
-import org.opendaylight.restconf.common.formatters.DataTreeCandidateFormatter;
-import org.opendaylight.restconf.common.formatters.DataTreeCandidateFormatterFactory;
-import org.opendaylight.restconf.common.formatters.JSONDataTreeCandidateFormatter;
-import org.opendaylight.restconf.common.formatters.XMLDataTreeCandidateFormatter;
+import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.codec.gson.JSONCodecFactorySupplier;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
  */
-public class ListenerAdapter extends AbstractCommonSubscriber implements ClusteredDOMDataTreeChangeListener {
+public class ListenerAdapter extends AbstractCommonSubscriber<YangInstanceIdentifier, Collection<DataTreeCandidate>>
+        implements ClusteredDOMDataTreeChangeListener {
     private static final Logger LOG = LoggerFactory.getLogger(ListenerAdapter.class);
-    private static final String PATH = "path";
     private static final DataTreeCandidateFormatterFactory JSON_FORMATTER_FACTORY =
             JSONDataTreeCandidateFormatter.createFactory(JSONCodecFactorySupplier.RFC7951);
 
-    private final YangInstanceIdentifier path;
-    private final String streamName;
-    private final NotificationOutputType outputType;
-
-    @VisibleForTesting DataTreeCandidateFormatter formatter;
-
     /**
      * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
      *
@@ -51,19 +41,13 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
      * @param streamName The name of the stream.
      * @param outputType Type of output on notification (JSON, XML).
      */
-    ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
+    @VisibleForTesting
+    public ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
             final NotificationOutputType outputType) {
-        setLocalNameOfPath(path.getLastPathArgument().getNodeType().getLocalName());
-
-        this.outputType = requireNonNull(outputType);
-        this.path = requireNonNull(path);
-        this.streamName = requireNonNull(streamName);
-        checkArgument(!streamName.isEmpty());
-
-        formatter = getFormatterFactory().getFormatter();
+        super(path.getLastPathArgument().getNodeType(), streamName, path, outputType, getFormatterFactory(outputType));
     }
 
-    private DataTreeCandidateFormatterFactory getFormatterFactory() {
+    private static DataTreeCandidateFormatterFactory getFormatterFactory(final NotificationOutputType outputType) {
         switch (outputType) {
             case JSON:
                 return JSON_FORMATTER_FACTORY;
@@ -74,33 +58,22 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
         }
     }
 
-    private DataTreeCandidateFormatter getFormatter(final String filter) throws XPathExpressionException {
-        final DataTreeCandidateFormatterFactory factory = getFormatterFactory();
-        return filter == null || filter.isEmpty() ? factory.getFormatter() : factory.getFormatter(filter);
-    }
-
     @Override
-    public void setQueryParams(final Instant start, final Instant stop, final String filter,
-                               final boolean leafNodesOnly, final boolean skipNotificationData) {
-        super.setQueryParams(start, stop, filter, leafNodesOnly, skipNotificationData);
-        try {
-            this.formatter = getFormatter(filter);
-        } catch (final XPathExpressionException e) {
-            throw new IllegalArgumentException("Failed to get filter", e);
-        }
+    public void onInitialData() {
+        // No-op
     }
 
     @Override
     @SuppressWarnings("checkstyle:IllegalCatch")
-    public void onDataTreeChanged(final Collection<DataTreeCandidate> dataTreeCandidates) {
+    public void onDataTreeChanged(final List<DataTreeCandidate> dataTreeCandidates) {
         final Instant now = Instant.now();
-        if (!checkStartStop(now, this)) {
+        if (!checkStartStop(now)) {
             return;
         }
 
         final Optional<String> maybeData;
         try {
-            maybeData = formatter.eventData(schemaHandler.get(), dataTreeCandidates, now, getLeafNodesOnly(),
+            maybeData = formatter().eventData(schemaHandler.get(), dataTreeCandidates, now, getLeafNodesOnly(),
                     isSkipNotificationData());
         } catch (final Exception e) {
             LOG.error("Failed to process notification {}",
@@ -113,36 +86,31 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
         }
     }
 
-    /**
-     * Gets the name of the stream.
-     *
-     * @return The name of the stream.
-     */
-    @Override
-    public String getStreamName() {
-        return this.streamName;
-    }
-
-    @Override
-    public String getOutputType() {
-        return this.outputType.getName();
-    }
-
     /**
      * Get path pointed to data in data store.
      *
      * @return Path pointed to data in data store.
      */
     public YangInstanceIdentifier getPath() {
-        return this.path;
+        return path();
     }
 
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-                .add(PATH, path)
-                .add("stream-name", streamName)
-                .add("output-type", outputType)
-                .toString();
+    /**
+     * Register data change listener in DOM data broker and set it to listener on stream.
+     *
+     * @param domDataBroker data broker for register data change listener
+     * @param datastore     {@link LogicalDatastoreType}
+     */
+    public final synchronized void listen(final DOMDataBroker domDataBroker, final LogicalDatastoreType datastore) {
+        if (!isListening()) {
+            final DOMDataTreeChangeService changeService = domDataBroker.getExtensions()
+                .getInstance(DOMDataTreeChangeService.class);
+            if (changeService == null) {
+                throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
+            }
+
+            setRegistration(changeService.registerDataTreeChangeListener(
+                new DOMDataTreeIdentifier(datastore, getPath()), this));
+        }
     }
 }