Refactoring of web-sockets in RESTCONF RFC-8040 43/80943/12
authorJaroslav Tóth <jtoth@frinx.io>
Sun, 17 Mar 2019 19:21:10 +0000 (20:21 +0100)
committerJaroslav Tóth <jtoth@frinx.io>
Thu, 2 May 2019 13:07:03 +0000 (15:07 +0200)
- Notificator is removed and replaced by ListenersBroker; mods.:
  A. One stream-name can be associated only with one YANG
     notification listener, because it is sufficient (XML/JSON
     format is distinguished directly in stream name).
  B. YANG notification streams are divided from data-change event
     streams; refactored method signatures.
  C. Added synchronization (R/W locking that also differs between
     access to data-change and YANG notification listeners).
- Exposing of public subsriber methods to BaseListenerInterface.
- Refactoring of CreateStream utilities and RPC processors.
- Refactoring of SubsribeToStream utilities and RPC processors.

Change-Id: I521f3b893a5cfa53d59a438e871924c54d6f4d7d
Signed-off-by: Jaroslav Tóth <jtoth@frinx.io>
21 files changed:
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfDataServiceImpl.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfInvokeOperationsServiceImpl.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImpl.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/CreateStreamUtil.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/ReadDataTransactionUtil.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/RestconfStreamsConstants.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/rests/utils/SubscribeToStreamUtil.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractCommonSubscriber.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/AbstractQueryParams.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/BaseListenerInterface.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/EventBusChangeRecorder.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapter.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java [new file with mode: 0644]
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerAdapter.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Notificator.java [deleted file]
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServer.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerHandler.java
restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/websockets/WebSocketServerInitializer.java
restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/rests/services/impl/RestconfStreamsSubscriptionServiceImplTest.java
restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenerAdapterTest.java
restconf/restconf-nb-rfc8040/src/test/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/NotificationListenerTest.java

index 4cf01c5992fd2c0670ef47cfd35063f666d4cba3..5e21c2c3cf2e36ee4034391de576cd5a51af0d4a 100644 (file)
@@ -153,8 +153,7 @@ public class RestconfDataServiceImpl implements RestconfDataService {
                 && identifier.contains(STREAM_LOCATION_PATH_PART)) {
             final String value = (String) node.getValue();
             final String streamName = value.substring(
-                    value.indexOf(CREATE_NOTIFICATION_STREAM.toString() + RestconfConstants.SLASH),
-                    value.length());
+                    value.indexOf(CREATE_NOTIFICATION_STREAM + RestconfConstants.SLASH));
             this.delegRestconfSubscrService.subscribeToStream(streamName, uriInfo);
         }
         if (node == null) {
index 55c6be3d8dc0c735e6529f35228a87193caf7d79..58942fbca199dc9cacc2a950339efaf497a97974 100644 (file)
@@ -60,7 +60,7 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
 
     @Override
     public NormalizedNodeContext invokeRpc(final String identifier, final NormalizedNodeContext payload,
-                                           final UriInfo uriInfo) {
+            final UriInfo uriInfo) {
         final SchemaContextRef refSchemaCtx = new SchemaContextRef(this.schemaContextHandler.get());
         final SchemaPath schemaPath = payload.getInstanceIdentifierContext().getSchemaNode().getPath();
         final DOMMountPoint mountPoint = payload.getInstanceIdentifierContext().getMountPoint();
@@ -70,8 +70,8 @@ public class RestconfInvokeOperationsServiceImpl implements RestconfInvokeOperat
         SchemaContextRef schemaContextRef;
 
         if (mountPoint == null) {
-            if (namespace.toString().equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE)) {
-                if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCR)) {
+            if (namespace.equals(RestconfStreamsConstants.SAL_REMOTE_NAMESPACE.getNamespace())) {
+                if (identifier.contains(RestconfStreamsConstants.CREATE_DATA_SUBSCRIPTION)) {
                     response = CreateStreamUtil.createDataChangeNotifiStream(payload, refSchemaCtx);
                 } else {
                     throw new RestconfDocumentedException("Not supported operation", ErrorType.RPC,
index 9d4309e584ac5ab36a6d2916adc87ca2e631187f..d9a3b6773f27ff3a9a51a60ffbd926bac008532c 100644 (file)
@@ -30,7 +30,6 @@ import org.opendaylight.restconf.nb.rfc8040.handlers.TransactionChainHandler;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.api.RestconfStreamsSubscriptionService;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
 import org.opendaylight.restconf.nb.rfc8040.rests.utils.SubscribeToStreamUtil;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.NormalizedNodeBuilder;
@@ -41,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of {@link RestconfStreamsSubscriptionService}.
- *
  */
 @Path("/")
 public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSubscriptionService {
@@ -83,11 +81,11 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
         final NotificationQueryParams notificationQueryParams = NotificationQueryParams.fromUriInfo(uriInfo);
 
         URI response = null;
-        if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCR)) {
-            response = SubscribeToStreamUtil.notifiDataStream(identifier, uriInfo, notificationQueryParams,
+        if (identifier.contains(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
+            response = SubscribeToStreamUtil.subscribeToDataStream(identifier, uriInfo, notificationQueryParams,
                     this.handlersHolder);
         } else if (identifier.contains(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
-            response = SubscribeToStreamUtil.notifYangStream(identifier, uriInfo, notificationQueryParams,
+            response = SubscribeToStreamUtil.subscribeToYangStream(identifier, uriInfo, notificationQueryParams,
                     this.handlersHolder);
         }
 
@@ -97,8 +95,7 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
                     SubscribeToStreamUtil.prepareIIDSubsStreamOutput(this.handlersHolder.getSchemaHandler());
             final NormalizedNodeBuilder<NodeIdentifier, Object, LeafNode<Object>> builder =
                     ImmutableLeafNodeBuilder.create().withValue(response.toString());
-            builder.withNodeIdentifier(
-                    NodeIdentifier.create(QName.create("subscribe:to:notification", "2016-10-28", "location")));
+            builder.withNodeIdentifier(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME));
 
             // prepare new header with location
             final Map<String, Object> headers = new HashMap<>();
@@ -170,7 +167,6 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
 
     /**
      * Parser and holder of query paramteres from uriInfo for notifications.
-     *
      */
     public static final class NotificationQueryParams {
 
@@ -255,5 +251,4 @@ public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSu
             return Optional.ofNullable(filter);
         }
     }
-
-}
+}
\ No newline at end of file
index 97cfb02d79426abc486c3de165f8cae831aa380d..e6cab92554540dbb85f6cb1cf65d584b08287b91 100644 (file)
@@ -7,9 +7,8 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.rests.utils;
 
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.List;
+import static java.util.Objects.requireNonNull;
+
 import java.util.Optional;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.mdsal.dom.api.DOMRpcResult;
@@ -20,8 +19,8 @@ import org.opendaylight.restconf.common.errors.RestconfError.ErrorTag;
 import org.opendaylight.restconf.common.errors.RestconfError.ErrorType;
 import org.opendaylight.restconf.common.util.DataChangeScope;
 import org.opendaylight.restconf.nb.rfc8040.references.SchemaContextRef;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.ParserIdentifier;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -36,119 +35,153 @@ import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableCo
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Util class for streams.
- *
- * <ul>
- * <li>create stream
- * <li>subscribe
- * </ul>
- *
+ * Utility class for creation of data-change-event or YANG notification streams.
  */
 public final class CreateStreamUtil {
 
     private static final Logger LOG = LoggerFactory.getLogger(CreateStreamUtil.class);
-    private static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type";
 
     private CreateStreamUtil() {
-        throw new UnsupportedOperationException("Util class");
+        throw new UnsupportedOperationException("Utility class");
     }
 
     /**
-     * Create stream with POST operation via RPC.
-     *
-     * @param payload
-     *             input of rpc - example in JSON:
-     *
-     *            <pre>
-     *            {@code
-     *            {
-     *                "input": {
-     *                    "path": "/toaster:toaster/toaster:toasterStatus",
-     *                    "sal-remote-augment:datastore": "OPERATIONAL",
-     *                    "sal-remote-augment:scope": "ONE"
-     *                }
-     *            }
-     *            }
-     *            </pre>
-     *
-     * @param refSchemaCtx
-     *             reference to {@link SchemaContext} -
-     *            {@link SchemaContextRef}
-     * @return {@link DOMRpcResult} - This means output of RPC - example in JSON:
+     * Create data-change-event or notification stream with POST operation via RPC.
      *
-     *         <pre>
-     *         {@code
+     * @param payload      Input of RPC - example in JSON (data-change-event stream):
+     *                     <pre>
+     *                     {@code
+     *                         {
+     *                             "input": {
+     *                                 "path": "/toaster:toaster/toaster:toasterStatus",
+     *                                 "sal-remote-augment:datastore": "OPERATIONAL",
+     *                                 "sal-remote-augment:scope": "ONE"
+     *                             }
+     *                         }
+     *                     }
+     *                     </pre>
+     * @param refSchemaCtx Reference to {@link SchemaContext} - {@link SchemaContextRef}.
+     * @return {@link DOMRpcResult} - Output of RPC - example in JSON:
+     *     <pre>
+     *     {@code
      *         {
      *             "output": {
      *                 "stream-name": "toaster:toaster/toaster:toasterStatus/datastore=OPERATIONAL/scope=ONE"
      *             }
      *         }
-     *         }
-     *         </pre>
-     *
+     *     }
+     *     </pre>
      */
     public static DOMRpcResult createDataChangeNotifiStream(final NormalizedNodeContext payload,
             final SchemaContextRef refSchemaCtx) {
-        final ContainerNode data = (ContainerNode) payload.getData();
+        // parsing out of container with settings and path
+        final ContainerNode data = (ContainerNode) requireNonNull(payload).getData();
         final QName qname = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
         final YangInstanceIdentifier path = preparePath(data, qname);
-        String streamName = prepareDataChangeNotifiStreamName(path, refSchemaCtx.get(), data);
-
-        final QName outputQname = QName.create(qname, "output");
-        final QName streamNameQname = QName.create(qname, "stream-name");
 
+        // building of stream name
+        final StringBuilder streamNameBuilder = new StringBuilder(
+                prepareDataChangeNotifiStreamName(path, requireNonNull(refSchemaCtx).get(), data));
         final NotificationOutputType outputType = prepareOutputType(data);
         if (outputType.equals(NotificationOutputType.JSON)) {
-            streamName = streamName + "/JSON";
+            streamNameBuilder.append('/').append(outputType.getName());
         }
+        final String streamName = streamNameBuilder.toString();
 
-        if (!Notificator.existListenerFor(streamName)) {
-            Notificator.createListener(path, streamName, outputType);
-        }
+        // registration of the listener
+        ListenersBroker.getInstance().registerDataChangeListener(path, streamName, outputType);
+
+        // building of output
+        final QName outputQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_CONTAINER_NAME);
+        final QName streamNameQname = QName.create(qname, RestconfStreamsConstants.OUTPUT_STREAM_NAME);
 
-        final ContainerNode output =
-                ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(outputQname))
-                        .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+        final ContainerNode output = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(outputQname))
+                .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
         return new DefaultDOMRpcResult(output);
     }
 
     /**
-     * Prepare {@code NotificationOutputType}.
+     * Prepare {@link NotificationOutputType}.
      *
-     * @param data
-     *             data of notification
-     * @return output type fo notification
+     * @param data Container with stream settings (RPC create-stream).
+     * @return Parsed {@link NotificationOutputType}.
      */
     private static NotificationOutputType prepareOutputType(final ContainerNode data) {
-        NotificationOutputType outputType = parseEnum(data, NotificationOutputType.class, OUTPUT_TYPE_PARAM_NAME);
+        NotificationOutputType outputType = parseEnum(
+                data, NotificationOutputType.class, RestconfStreamsConstants.OUTPUT_TYPE_PARAM_NAME);
         return outputType == null ? NotificationOutputType.XML : outputType;
     }
 
+    /**
+     * Prepare stream name.
+     *
+     * @param path          Path of element from which data-change-event notifications are going to be generated.
+     * @param schemaContext Schema context.
+     * @param data          Container with stream settings (RPC create-stream).
+     * @return Parsed stream name.
+     */
     private static String prepareDataChangeNotifiStreamName(final YangInstanceIdentifier path,
-                                                            final SchemaContext schemaContext,
-            final ContainerNode data) {
-        LogicalDatastoreType ds = parseEnum(data, LogicalDatastoreType.class,
-                RestconfStreamsConstants.DATASTORE_PARAM_NAME);
-        ds = ds == null ? RestconfStreamsConstants.DEFAULT_DS : ds;
+            final SchemaContext schemaContext, final ContainerNode data) {
+        LogicalDatastoreType datastoreType = parseEnum(
+                data, LogicalDatastoreType.class, RestconfStreamsConstants.DATASTORE_PARAM_NAME);
+        datastoreType = datastoreType == null ? RestconfStreamsConstants.DEFAULT_DS : datastoreType;
 
         DataChangeScope scope = parseEnum(data, DataChangeScope.class, RestconfStreamsConstants.SCOPE_PARAM_NAME);
         scope = scope == null ? RestconfStreamsConstants.DEFAULT_SCOPE : scope;
 
-        final String streamName = RestconfStreamsConstants.DATA_SUBSCR + "/"
-                + Notificator
-                .createStreamNameFromUri(ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
-                + RestconfStreamsConstants.DS_URI + ds + RestconfStreamsConstants.SCOPE_URI + scope);
-        return streamName;
+        return RestconfStreamsConstants.DATA_SUBSCRIPTION
+                + "/"
+                + ListenersBroker.createStreamNameFromUri(
+                ParserIdentifier.stringFromYangInstanceIdentifier(path, schemaContext)
+                        + RestconfStreamsConstants.DS_URI
+                        + datastoreType
+                        + RestconfStreamsConstants.SCOPE_URI
+                        + scope);
+    }
+
+    /**
+     * Prepare {@link YangInstanceIdentifier} of stream source.
+     *
+     * @param data          Container with stream settings (RPC create-stream).
+     * @param qualifiedName QName of the input RPC context (used only in debugging).
+     * @return Parsed {@link YangInstanceIdentifier} of data element from which the data-change-event notifications
+     *     are going to be generated.
+     */
+    private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
+        final Optional<DataContainerChild<? extends PathArgument, ?>> path = data.getChild(
+                new YangInstanceIdentifier.NodeIdentifier(QName.create(
+                        qualifiedName,
+                        RestconfStreamsConstants.STREAM_PATH_PARAM_NAME)));
+        Object pathValue = null;
+        if (path.isPresent()) {
+            pathValue = path.get().getValue();
+        }
+        if (!(pathValue instanceof YangInstanceIdentifier)) {
+            LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
+            throw new RestconfDocumentedException(
+                    "Instance identifier was not normalized correctly",
+                    ErrorType.APPLICATION,
+                    ErrorTag.OPERATION_FAILED);
+        }
+        return (YangInstanceIdentifier) pathValue;
     }
 
+    /**
+     * Parsing out of enumeration from RPC create-stream body.
+     *
+     * @param data      Container with stream settings (RPC create-stream).
+     * @param clazz     Enum type to be parsed out from input container.
+     * @param paramName Local name of the enum element.
+     * @return Parsed enumeration.
+     */
     private static <T> T parseEnum(final ContainerNode data, final Class<T> clazz, final String paramName) {
         final Optional<DataContainerChild<? extends PathArgument, ?>> optAugNode = data.getChild(
-            RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
+                RestconfStreamsConstants.SAL_REMOTE_AUG_IDENTIFIER);
         if (!optAugNode.isPresent()) {
             return null;
         }
@@ -157,7 +190,7 @@ public final class CreateStreamUtil {
             return null;
         }
         final Optional<DataContainerChild<? extends PathArgument, ?>> enumNode = ((AugmentationNode) augNode).getChild(
-            new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
+                new NodeIdentifier(QName.create(RestconfStreamsConstants.SAL_REMOTE_AUGMENT, paramName)));
         if (!enumNode.isPresent()) {
             return null;
         }
@@ -169,65 +202,43 @@ public final class CreateStreamUtil {
         return ResolveEnumUtil.resolveEnum(clazz, (String) value);
     }
 
-    private static YangInstanceIdentifier preparePath(final ContainerNode data, final QName qualifiedName) {
-        final Optional<DataContainerChild<? extends PathArgument, ?>> path = data
-                .getChild(new YangInstanceIdentifier.NodeIdentifier(QName.create(qualifiedName, "path")));
-        Object pathValue = null;
-        if (path.isPresent()) {
-            pathValue = path.get().getValue();
-        }
-        if (!(pathValue instanceof YangInstanceIdentifier)) {
-            LOG.debug("Instance identifier {} was not normalized correctly", qualifiedName);
-            throw new RestconfDocumentedException("Instance identifier was not normalized correctly",
-                ErrorType.APPLICATION, ErrorTag.OPERATION_FAILED);
-        }
-        return (YangInstanceIdentifier) pathValue;
-    }
-
     /**
-     * Create stream with POST operation via RPC.
+     * Create YANG notification stream using notification definition in YANG schema.
      *
-     * @param notificatinoDefinition
-     *             input of RPC
-     * @param refSchemaCtx
-     *             schemaContext
-     * @param outputType
-     *              output type
-     * @return {@link DOMRpcResult}
+     * @param notificationDefinition YANG notification definition.
+     * @param refSchemaCtx           Reference to {@link SchemaContext} - {@link SchemaContextRef}.
+     * @param outputType             Output type (XML or JSON).
+     * @return {@link NotificationListenerAdapter}
      */
-    public static List<NotificationListenerAdapter> createYangNotifiStream(
-            final NotificationDefinition notificatinoDefinition, final SchemaContextRef refSchemaCtx,
-            final String outputType) {
-        final List<SchemaPath> paths = new ArrayList<>();
-        final QName notificatinoDefinitionQName = notificatinoDefinition.getQName();
-        final Module module =
-                refSchemaCtx.findModuleByNamespaceAndRevision(notificatinoDefinitionQName.getModule().getNamespace(),
-                        notificatinoDefinitionQName.getModule().getRevision());
-        Preconditions.checkNotNull(module,
-                "Module for namespace " + notificatinoDefinitionQName.getModule().getNamespace() + " does not exist");
-        NotificationDefinition notifiDef = null;
-        for (final NotificationDefinition notification : module.getNotifications()) {
-            if (notification.getQName().equals(notificatinoDefinitionQName)) {
-                notifiDef = notification;
-                break;
-            }
-        }
-        final String moduleName = module.getName();
-        Preconditions.checkNotNull(notifiDef,
-                "Notification " + notificatinoDefinitionQName + "doesn't exist in module " + moduleName);
-        paths.add(notifiDef.getPath());
-        String streamName = RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM + "/";
-        streamName = streamName + moduleName + ":" + notificatinoDefinitionQName.getLocalName();
-        if (outputType.equals("JSON")) {
-            streamName = streamName + "/JSON";
-        }
+    public static NotificationListenerAdapter createYangNotifiStream(
+            final NotificationDefinition notificationDefinition, final SchemaContextRef refSchemaCtx,
+            final NotificationOutputType outputType) {
+        final String streamName = parseNotificationStreamName(requireNonNull(notificationDefinition),
+                requireNonNull(refSchemaCtx), requireNonNull(outputType.getName()));
+        final Optional<NotificationListenerAdapter> listenerForStreamName = ListenersBroker.getInstance()
+                .getNotificationListenerFor(streamName);
+        return listenerForStreamName.orElseGet(() -> ListenersBroker.getInstance().registerNotificationListener(
+                notificationDefinition.getPath(), streamName, outputType));
+    }
 
-        if (Notificator.existNotificationListenerFor(streamName)) {
-            final List<NotificationListenerAdapter> notificationListenerFor =
-                    Notificator.getNotificationListenerFor(streamName);
-            return SubscribeToStreamUtil.pickSpecificListenerByOutput(notificationListenerFor, outputType);
+    private static String parseNotificationStreamName(final NotificationDefinition notificationDefinition,
+            final SchemaContextRef refSchemaCtx, final String outputType) {
+        final QName notificationDefinitionQName = notificationDefinition.getQName();
+        final Module module = refSchemaCtx.findModuleByNamespaceAndRevision(
+                notificationDefinitionQName.getModule().getNamespace(),
+                notificationDefinitionQName.getModule().getRevision());
+        requireNonNull(module, String.format("Module for namespace %s does not exist.",
+                notificationDefinitionQName.getModule().getNamespace()));
+
+        final StringBuilder streamNameBuilder = new StringBuilder();
+        streamNameBuilder.append(RestconfStreamsConstants.CREATE_NOTIFICATION_STREAM)
+                .append('/')
+                .append(module.getName())
+                .append(':')
+                .append(notificationDefinitionQName.getLocalName());
+        if (outputType.equals(NotificationOutputType.JSON.getName())) {
+            streamNameBuilder.append(NotificationOutputType.JSON.getName());
         }
-
-        return Notificator.createNotificationListener(paths, streamName, outputType);
+        return streamNameBuilder.toString();
     }
-}
+}
\ No newline at end of file
index 8d265d33f38fe85068ba0712eac2311b38c9c8ab..a2f2c55e7bbbf968f0b2e2f6c930d478ccc30e3a 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.restconf.nb.rfc8040.rests.utils;
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAMS_PATH;
 import static org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants.STREAM_PATH_PART;
 
-import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.FluentFuture;
 import java.net.URI;
@@ -272,33 +271,41 @@ public final class ReadDataTransactionUtil {
                                                 final SchemaContextRef schemaContextRef, final UriInfo uriInfo) {
         final SchemaContext schemaContext = schemaContextRef.get();
         if (identifier.contains(STREAMS_PATH) && !identifier.contains(STREAM_PATH_PART)) {
-            final DOMDataTreeReadWriteTransaction wTx = transactionNode.getTransactionChain().newReadWriteTransaction();
-            final boolean exist = SubscribeToStreamUtil.checkExist(schemaContext, wTx);
-
-            for (final NotificationDefinition notificationDefinition : schemaContextRef.get().getNotifications()) {
-                final List<NotificationListenerAdapter> notifiStreamXML =
-                        CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
-                                NotificationOutputType.XML.getName());
-                final List<NotificationListenerAdapter> notifiStreamJSON =
-                        CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
-                                NotificationOutputType.JSON.getName());
-                for (final NotificationListenerAdapter listener : Iterables.concat(notifiStreamXML, notifiStreamJSON)) {
-                    final URI uri = SubscribeToStreamUtil.prepareUriByStreamName(uriInfo, listener.getStreamName());
-                    final NormalizedNode mapToStreams =
-                            RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
-                                    listener.getSchemaPath().getLastComponent(), schemaContext.getNotifications(),
-                                    null, listener.getOutputType(), uri,
-                                    SubscribeToStreamUtil.getMonitoringModule(schemaContext), exist);
-                    SubscribeToStreamUtil.writeDataToDS(schemaContext,
-                            listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
-                            mapToStreams);
-                }
-            }
-            SubscribeToStreamUtil.submitData(wTx);
+            createAllYangNotificationStreams(transactionNode, schemaContextRef, uriInfo);
         }
         return readData(content, transactionNode, withDefa, schemaContext);
     }
 
+    private static void createAllYangNotificationStreams(final TransactionVarsWrapper transactionNode,
+            final SchemaContextRef schemaContextRef, final UriInfo uriInfo) {
+        final DOMDataTreeReadWriteTransaction wTx = transactionNode.getTransactionChain().newReadWriteTransaction();
+        final boolean exist = SubscribeToStreamUtil.checkExist(schemaContextRef.get(), wTx);
+
+        for (final NotificationDefinition notificationDefinition : schemaContextRef.get().getNotifications()) {
+            final NotificationListenerAdapter notifiStreamXML =
+                    CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
+                            NotificationOutputType.XML);
+            final NotificationListenerAdapter notifiStreamJSON =
+                    CreateStreamUtil.createYangNotifiStream(notificationDefinition, schemaContextRef,
+                            NotificationOutputType.JSON);
+            writeNotificationStreamToDatastore(schemaContextRef, uriInfo, wTx, exist, notifiStreamXML);
+            writeNotificationStreamToDatastore(schemaContextRef, uriInfo, wTx, exist, notifiStreamJSON);
+        }
+        SubscribeToStreamUtil.submitData(wTx);
+    }
+
+    private static void writeNotificationStreamToDatastore(final SchemaContextRef schemaContextRef,
+            final UriInfo uriInfo, final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
+            final NotificationListenerAdapter listener) {
+        final URI uri = SubscribeToStreamUtil.prepareUriByStreamName(uriInfo, listener.getStreamName());
+        final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+                listener.getSchemaPath().getLastComponent(), schemaContextRef.get().getNotifications(), null,
+                listener.getOutputType(), uri,
+                SubscribeToStreamUtil.getMonitoringModule(schemaContextRef.get()), exist);
+        SubscribeToStreamUtil.writeDataToDS(schemaContextRef.get(),
+                listener.getSchemaPath().getLastComponent().getLocalName(), readWriteTransaction, exist, mapToStreams);
+    }
+
     private static NormalizedNode<?, ?> prepareDataByParamWithDef(final NormalizedNode<?, ?> result,
             final YangInstanceIdentifier path, final String withDefa, final SchemaContext ctx) {
         boolean trim;
index 94aab8f84fb4d35d7b244e2f2a3abfaa87b6a3eb..0b4d14ec3434598fd9a2b14b5080750fce7f5d26 100644 (file)
@@ -20,43 +20,52 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.Augmentat
 
 /**
  * Constants for streams.
- *
  */
 public final class RestconfStreamsConstants {
-    public static final String SAL_REMOTE_NAMESPACE = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote";
 
+    public static final QNameModule SAL_REMOTE_AUGMENT = QNameModule.create(
+            URI.create("urn:sal:restconf:event:subscription"),
+            Revision.of("2014-07-08"));
+    public static final QNameModule SUBSCRIBE_TO_NOTIFICATION = QNameModule.create(
+            URI.create("subscribe:to:notification"),
+            Revision.of("2016-10-28"));
+
+    public static final QName SAL_REMOTE_NAMESPACE = QName.create(
+            "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
+            "2014-01-14",
+            "sal-remote");
+
+    public static final String STREAM_PATH_PARAM_NAME = "path";
     public static final String DATASTORE_PARAM_NAME = "datastore";
+    public static final String SCOPE_PARAM_NAME = "scope";
+    public static final String OUTPUT_TYPE_PARAM_NAME = "notification-output-type";
+    public static final String OUTPUT_CONTAINER_NAME = "output";
+    public static final String OUTPUT_STREAM_NAME = "stream-name";
 
-    private static final URI NAMESPACE_EVENT_SUBSCRIPTION_AUGMENT = URI.create("urn:sal:restconf:event:subscription");
 
-    public static final QNameModule SAL_REMOTE_AUGMENT = QNameModule.create(NAMESPACE_EVENT_SUBSCRIPTION_AUGMENT,
-        Revision.of("2014-07-08")).intern();
+    public static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(ImmutableSet.of(
+            QName.create(SAL_REMOTE_AUGMENT, SCOPE_PARAM_NAME),
+            QName.create(SAL_REMOTE_AUGMENT, DATASTORE_PARAM_NAME),
+            QName.create(SAL_REMOTE_AUGMENT, OUTPUT_TYPE_PARAM_NAME)));
 
-    public static final AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER = new AugmentationIdentifier(
-        ImmutableSet.of(QName.create(SAL_REMOTE_AUGMENT, "scope"), QName.create(SAL_REMOTE_AUGMENT, "datastore"),
-            QName.create(SAL_REMOTE_AUGMENT, "notification-output-type")));
+    public static final QName LOCATION_QNAME = QName.create(SUBSCRIBE_TO_NOTIFICATION, "location");
+    public static final QName NOTIFI_QNAME = QName.create(SUBSCRIBE_TO_NOTIFICATION, "notifi");
 
     public static final DataChangeScope DEFAULT_SCOPE = DataChangeScope.BASE;
-
     public static final LogicalDatastoreType DEFAULT_DS = LogicalDatastoreType.CONFIGURATION;
 
-    public static final String SCOPE_PARAM_NAME = "scope";
-
     public static final char EQUAL = ParserBuilderConstants.Deserializer.EQUAL;
-
     public static final String DS_URI = RestconfConstants.SLASH + DATASTORE_PARAM_NAME + EQUAL;
-
     public static final String SCOPE_URI = RestconfConstants.SLASH + SCOPE_PARAM_NAME + EQUAL;
+    public static final String SCHEMA_SUBSCRIBE_URI = "ws";
+    public static final String SCHEMA_SUBSCRIBE_SECURED_URI = "wss";
+    public static final String SCHEMA_UPGRADE_URI = "http";
+    public static final String SCHEMA_UPGRADE_SECURED_URI = "https";
 
-    public static final int NOTIFICATION_PORT = 8181;
-
-    public static final String SCHEMA_SUBSCIBRE_URI = "ws";
-
-    public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
-    public static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
-
-    public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
-    public static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+    public static final String DATA_SUBSCRIPTION = "data-change-event-subscription";
+    public static final String CREATE_DATA_SUBSCRIPTION = "create-" + DATA_SUBSCRIPTION;
+    public static final String NOTIFICATION_STREAM = "notification-stream";
+    public static final String CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
 
     public static final String STREAMS_PATH = "ietf-restconf-monitoring:restconf-state/streams";
     public static final String STREAM_PATH_PART = "/stream=";
@@ -67,5 +76,4 @@ public final class RestconfStreamsConstants {
     private RestconfStreamsConstants() {
         throw new UnsupportedOperationException("Util class.");
     }
-
-}
+}
\ No newline at end of file
index 28fe9458fa3635e1b09b8e66be847f9b10798359..eee8e432accb1d6757102e54cb00ce3926831f99 100644 (file)
@@ -21,6 +21,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
@@ -42,16 +43,13 @@ import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.HandlersHolder;
 import org.opendaylight.restconf.nb.rfc8040.rests.services.impl.RestconfStreamsSubscriptionServiceImpl.NotificationQueryParams;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
-import org.opendaylight.restconf.nb.rfc8040.streams.websockets.WebSocketServer;
 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
 import org.opendaylight.restconf.nb.rfc8040.utils.mapping.RestconfMappingNodeUtil;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
@@ -82,199 +80,184 @@ public final class SubscribeToStreamUtil {
             .appendOffset("+HH:MM", "Z").toFormatter();
 
     private SubscribeToStreamUtil() {
-        throw new UnsupportedOperationException("Util class");
+        throw new UnsupportedOperationException("Utility class");
     }
 
     /**
-     * Register listeners by streamName in identifier to listen to yang
-     * notifications, put or delete info about listener to DS according to
-     * ietf-restconf-monitoring.
+     * Register listener by streamName in identifier to listen to yang notifications, and put or delete information
+     * about listener to DS according to ietf-restconf-monitoring.
      *
-     * @param identifier
-     *             identifier as stream name
-     * @param uriInfo
-     *             for getting base URI information
-     * @param notificationQueryParams
-     *             query parameters of notification
-     * @param handlersHolder
-     *             holder of handlers for notifications
-     * @return location for listening
+     * @param identifier              Name of the stream.
+     * @param uriInfo                 URI information.
+     * @param notificationQueryParams Query parameters of notification.
+     * @param handlersHolder          Holder of handlers for notifications.
+     * @return Stream location for listening.
      */
     @SuppressWarnings("rawtypes")
-    public static URI notifYangStream(final String identifier, final UriInfo uriInfo,
+    public static URI subscribeToYangStream(final String identifier, final UriInfo uriInfo,
             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
-        final String streamName = Notificator.createStreamNameFromUri(identifier);
+        final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
         }
-        List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
-        if (identifier.contains(RestconfConstants.SLASH + NotificationOutputType.JSON.getName())) {
-            listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.JSON.getName());
-        } else {
-            listeners = pickSpecificListenerByOutput(listeners, NotificationOutputType.XML.getName());
-        }
-        if (listeners.isEmpty()) {
-            throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+        final Optional<NotificationListenerAdapter> notificationListenerAdapter =
+                ListenersBroker.getInstance().getNotificationListenerFor(streamName);
+
+        if (!notificationListenerAdapter.isPresent()) {
+            throw new RestconfDocumentedException(String.format(
+                    "Stream with name %s was not found.", streamName),
+                    ErrorType.PROTOCOL,
                     ErrorTag.UNKNOWN_ELEMENT);
         }
 
-        final DOMDataTreeReadWriteTransaction wTx =
-                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final DOMDataTreeReadWriteTransaction writeTransaction = handlersHolder
+                .getTransactionChainHandler()
+                .get()
+                .newReadWriteTransaction();
         final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
-        final boolean exist = checkExist(schemaContext, wTx);
-
+        final boolean exist = checkExist(schemaContext, writeTransaction);
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
-        for (final NotificationListenerAdapter listener : listeners) {
-            registerToListenNotification(listener, handlersHolder.getNotificationServiceHandler());
-            listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
-                    notificationQueryParams.getFilter(), false);
-            listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
-            final NormalizedNode mapToStreams = RestconfMappingNodeUtil
-                    .mapYangNotificationStreamByIetfRestconfMonitoring(listener.getSchemaPath().getLastComponent(),
-                            schemaContext.getNotifications(), notificationQueryParams.getStart(),
-                            listener.getOutputType(), uri, getMonitoringModule(schemaContext), exist);
-            writeDataToDS(schemaContext, listener.getSchemaPath().getLastComponent().getLocalName(), wTx, exist,
-                    mapToStreams);
-        }
-        submitData(wTx);
 
+        registerToListenNotification(
+                notificationListenerAdapter.get(), handlersHolder.getNotificationServiceHandler());
+        notificationListenerAdapter.get().setQueryParams(
+                notificationQueryParams.getStart(),
+                notificationQueryParams.getStop().orElse(null),
+                notificationQueryParams.getFilter().orElse(null),
+                false);
+        notificationListenerAdapter.get().setCloseVars(
+                handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+        final NormalizedNode mapToStreams = RestconfMappingNodeUtil.mapYangNotificationStreamByIetfRestconfMonitoring(
+                notificationListenerAdapter.get().getSchemaPath().getLastComponent(),
+                schemaContext.getNotifications(), notificationQueryParams.getStart(),
+                notificationListenerAdapter.get().getOutputType(), uri, getMonitoringModule(schemaContext),
+                exist);
+        writeDataToDS(schemaContext,
+                notificationListenerAdapter.get().getSchemaPath().getLastComponent().getLocalName(), writeTransaction,
+                exist, mapToStreams);
+        submitData(writeTransaction);
         return uri;
     }
 
-    static List<NotificationListenerAdapter>
-            pickSpecificListenerByOutput(final List<NotificationListenerAdapter> listeners, final String outputType) {
-        for (final NotificationListenerAdapter notificationListenerAdapter : listeners) {
-            if (notificationListenerAdapter.getOutputType().equals(outputType)) {
-                final List<NotificationListenerAdapter> list = new ArrayList<>();
-                list.add(notificationListenerAdapter);
-                return list;
-            }
-        }
-        return listeners;
-    }
-
     /**
      * Prepare InstanceIdentifierContext for Location leaf.
      *
-     * @param schemaHandler
-     *             schemaContext handler
-     * @return InstanceIdentifier of Location leaf
+     * @param schemaHandler Schema context handler.
+     * @return InstanceIdentifier of Location leaf.
      */
     public static InstanceIdentifierContext<?> prepareIIDSubsStreamOutput(final SchemaContextHandler schemaHandler) {
-        final QName qnameBase = QName.create("subscribe:to:notification", "2016-10-28", "notifi");
-        final DataSchemaNode location = ((ContainerSchemaNode) schemaHandler.get()
-                .findModule(qnameBase.getModule()).get()
-                .getDataChildByName(qnameBase)).getDataChildByName(QName.create(qnameBase, "location"));
-        final List<PathArgument> path = new ArrayList<>();
-        path.add(NodeIdentifier.create(qnameBase));
-        path.add(NodeIdentifier.create(QName.create(qnameBase, "location")));
+        final Optional<Module> module = schemaHandler.get()
+                .findModule(RestconfStreamsConstants.NOTIFI_QNAME.getModule());
+        Preconditions.checkState(module.isPresent());
+        final Optional<DataSchemaNode> notify = module.get()
+                .findDataChildByName(RestconfStreamsConstants.NOTIFI_QNAME);
+        Preconditions.checkState(notify.isPresent());
+        final Optional<DataSchemaNode> location = ((ContainerSchemaNode) notify.get())
+                .findDataChildByName(RestconfStreamsConstants.LOCATION_QNAME);
+        Preconditions.checkState(location.isPresent());
 
-        return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location, null,
-                schemaHandler.get());
+        final List<PathArgument> path = new ArrayList<>();
+        path.add(NodeIdentifier.create(RestconfStreamsConstants.NOTIFI_QNAME));
+        path.add(NodeIdentifier.create(RestconfStreamsConstants.LOCATION_QNAME));
+        return new InstanceIdentifierContext<SchemaNode>(YangInstanceIdentifier.create(path), location.get(),
+                null, schemaHandler.get());
     }
 
     /**
-     * Register listener by streamName in identifier to listen to data change
-     * notifications, put or delete info about listener to DS according to
-     * ietf-restconf-monitoring.
+     * Register listener by streamName in identifier to listen to data change notifications, and put or delete
+     * information about listener to DS according to ietf-restconf-monitoring.
      *
-     * @param identifier
-     *             identifier as stream name
-     * @param uriInfo
-     *             for getting base URI information
-     * @param notificationQueryParams
-     *             query parameters of notification
-     * @param handlersHolder
-     *             holder of handlers for notifications
-     * @return location for listening
+     * @param identifier              Identifier as stream name.
+     * @param uriInfo                 Base URI information.
+     * @param notificationQueryParams Query parameters of notification.
+     * @param handlersHolder          Holder of handlers for notifications.
+     * @return Location for listening.
      */
     @SuppressWarnings("rawtypes")
-    public static URI notifiDataStream(final String identifier, final UriInfo uriInfo,
+    public static URI subscribeToDataStream(final String identifier, final UriInfo uriInfo,
             final NotificationQueryParams notificationQueryParams, final HandlersHolder handlersHolder) {
-        final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
-
-        final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+        final Map<String, String> mapOfValues = mapValuesFromUri(identifier);
+        final LogicalDatastoreType datastoreType = parseURIEnum(
+                LogicalDatastoreType.class,
                 mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
-        if (ds == null) {
-            final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
-            LOG.debug(msg);
-            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        if (datastoreType == null) {
+            final String message = "Stream name doesn't contain datastore value (pattern /datastore=)";
+            LOG.debug(message);
+            throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
         }
 
-        final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+        final DataChangeScope scope = parseURIEnum(
+                DataChangeScope.class,
                 mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
         if (scope == null) {
-            final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
-            LOG.warn(msg);
-            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+            final String message = "Stream name doesn't contains datastore value (pattern /scope=)";
+            LOG.warn(message);
+            throw new RestconfDocumentedException(message, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
         }
 
-        final String streamName = Notificator.createStreamNameFromUri(identifier);
-
-        final ListenerAdapter listener = Notificator.getListenerFor(streamName);
-        Preconditions.checkNotNull(listener, "Listener doesn't exist : " + streamName);
-
-        listener.setQueryParams(notificationQueryParams.getStart(), notificationQueryParams.getStop(),
-                notificationQueryParams.getFilter(), false);
-        listener.setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+        final String streamName = ListenersBroker.createStreamNameFromUri(identifier);
+        final Optional<ListenerAdapter> listener = ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
+        Preconditions.checkArgument(listener.isPresent(), "Listener doesn't exist : " + streamName);
 
-        registration(ds, scope, listener, handlersHolder.getDomDataBrokerHandler().get());
+        listener.get().setQueryParams(
+                notificationQueryParams.getStart(),
+                notificationQueryParams.getStop().orElse(null),
+                notificationQueryParams.getFilter().orElse(null),
+                false);
+        listener.get().setCloseVars(handlersHolder.getTransactionChainHandler(), handlersHolder.getSchemaHandler());
+        registration(datastoreType, listener.get(), handlersHolder.getDomDataBrokerHandler().get());
 
         final URI uri = prepareUriByStreamName(uriInfo, streamName);
-
-        final DOMDataTreeReadWriteTransaction wTx =
-                handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
+        final DOMDataTreeReadWriteTransaction writeTransaction
+                = handlersHolder.getTransactionChainHandler().get().newReadWriteTransaction();
         final SchemaContext schemaContext = handlersHolder.getSchemaHandler().get();
-        final boolean exist = checkExist(schemaContext, wTx);
+        final boolean exist = checkExist(schemaContext, writeTransaction);
 
         final NormalizedNode mapToStreams = RestconfMappingNodeUtil
-                .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.getPath(),
-                        notificationQueryParams.getStart(), listener.getOutputType(), uri,
+                .mapDataChangeNotificationStreamByIetfRestconfMonitoring(listener.get().getPath(),
+                        notificationQueryParams.getStart(), listener.get().getOutputType(), uri,
                         getMonitoringModule(schemaContext), exist, schemaContext);
-        writeDataToDS(schemaContext, listener.getPath().getLastPathArgument().getNodeType().getLocalName(), wTx, exist,
-                mapToStreams);
-        submitData(wTx);
+        writeDataToDS(schemaContext, listener.get().getPath().getLastPathArgument().getNodeType().getLocalName(),
+                writeTransaction, exist, mapToStreams);
+        submitData(writeTransaction);
         return uri;
     }
 
-    public static Module getMonitoringModule(final SchemaContext schemaContext) {
+    static Module getMonitoringModule(final SchemaContext schemaContext) {
         return schemaContext.findModule(MonitoringModule.MODULE_QNAME).orElse(null);
     }
 
     /**
-     * Parse input of query parameters - start-time or stop-time - from
-     * {@link DateAndTime} format to {@link Instant} format.
+     * Parse input of query parameters - start-time or stop-time - from {@link DateAndTime} format
+     * to {@link Instant} format.
      *
-     * @param entry
-     *             start-time or stop-time as string in {@link DateAndTime}
-     *            format
-     * @return parsed {@link Instant} by entry
+     * @param entry Start-time or stop-time as string in {@link DateAndTime} format.
+     * @return Parsed {@link Instant} by entry.
      */
     public static Instant parseDateFromQueryParam(final Entry<String, List<String>> entry) {
         final DateAndTime event = new DateAndTime(entry.getValue().iterator().next());
         final String value = event.getValue();
-        final TemporalAccessor p;
+        final TemporalAccessor accessor;
         try {
-            p = FORMATTER.parse(value);
+            accessor = FORMATTER.parse(value);
         } catch (final DateTimeParseException e) {
             throw new RestconfDocumentedException("Cannot parse of value in date: " + value, e);
         }
-        return Instant.from(p);
-
+        return Instant.from(accessor);
     }
 
     @SuppressWarnings("rawtypes")
-    static void writeDataToDS(final SchemaContext schemaContext,
-                              final String name, final DOMDataTreeReadWriteTransaction readWriteTransaction,
-                              final boolean exist, final NormalizedNode mapToStreams) {
-        String pathId = "";
+    static void writeDataToDS(final SchemaContext schemaContext, final String name,
+            final DOMDataTreeReadWriteTransaction readWriteTransaction, final boolean exist,
+            final NormalizedNode mapToStreams) {
+        String pathId;
         if (exist) {
             pathId = MonitoringModule.PATH_TO_STREAM_WITHOUT_KEY + name;
         } else {
             pathId = MonitoringModule.PATH_TO_STREAMS;
         }
-        readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL, IdentifierCodec.deserialize(pathId, schemaContext),
-                mapToStreams);
+        readWriteTransaction.merge(LogicalDatastoreType.OPERATIONAL,
+                IdentifierCodec.deserialize(pathId, schemaContext), mapToStreams);
     }
 
     static void submitData(final DOMDataTreeReadWriteTransaction readWriteTransaction) {
@@ -286,13 +269,12 @@ public final class SubscribeToStreamUtil {
     }
 
     /**
-     * Prepare map of values from URI.
+     * Prepare map of URI parameter-values.
      *
-     * @param identifier
-     *             URI
-     * @return {@link Map}
+     * @param identifier String identification of URI.
+     * @return Map od URI parameters and values.
      */
-    public static Map<String, String> mapValuesFromUri(final String identifier) {
+    private static Map<String, String> mapValuesFromUri(final String identifier) {
         final HashMap<String, String> result = new HashMap<>();
         for (final String token : RestconfConstants.SLASH_SPLITTER.split(identifier)) {
             final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
@@ -304,28 +286,28 @@ public final class SubscribeToStreamUtil {
     }
 
     static URI prepareUriByStreamName(final UriInfo uriInfo, final String streamName) {
+        final String scheme = uriInfo.getAbsolutePath().getScheme();
         final UriBuilder uriBuilder = uriInfo.getBaseUriBuilder();
-
-        prepareNotificationPort(uriInfo.getBaseUri().getPort());
-        uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+        switch (scheme) {
+            case RestconfStreamsConstants.SCHEMA_UPGRADE_SECURED_URI:
+                uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_SECURED_URI);
+                break;
+            case RestconfStreamsConstants.SCHEMA_UPGRADE_URI:
+            default:
+                uriBuilder.scheme(RestconfStreamsConstants.SCHEMA_SUBSCRIBE_URI);
+        }
         return uriBuilder.replacePath(streamName).build();
     }
 
     /**
-     * Register data change listener in dom data broker and set it to listener
-     * on stream.
+     * Register data change listener in DOM data broker and set it to listener on stream.
      *
-     * @param ds
-     *             {@link LogicalDatastoreType}
-     * @param scope
-     *             {@link DataChangeScope}
-     * @param listener
-     *             listener on specific stream
-     * @param domDataBroker
-     *             data broker for register data change listener
+     * @param datastore     {@link LogicalDatastoreType}
+     * @param listener      listener on specific stream
+     * @param domDataBroker data broker for register data change listener
      */
-    private static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
-            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+    private static void registration(final LogicalDatastoreType datastore, final ListenerAdapter listener,
+            final DOMDataBroker domDataBroker) {
         if (listener.isListening()) {
             return;
         }
@@ -336,37 +318,21 @@ public final class SubscribeToStreamUtil {
             throw new UnsupportedOperationException("DOMDataBroker does not support the DOMDataTreeChangeService");
         }
 
-        final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(ds, listener.getPath());
+        final DOMDataTreeIdentifier root = new DOMDataTreeIdentifier(datastore, listener.getPath());
         final ListenerRegistration<ListenerAdapter> registration =
-                                    changeService.registerDataTreeChangeListener(root, listener);
-
+                changeService.registerDataTreeChangeListener(root, listener);
         listener.setRegistration(registration);
     }
 
-    /**
-     * Get port from web socket server. If doesn't exit, create it.
-     *
-     * @param port
-     *            - port
-     */
-    private static void prepareNotificationPort(final int port) {
-        try {
-            WebSocketServer.getInstance();
-        } catch (final NullPointerException e) {
-            WebSocketServer.createInstance(port);
-        }
-    }
-
     static boolean checkExist(final SchemaContext schemaContext,
                               final DOMDataTreeReadOperations readWriteTransaction) {
         boolean exist;
         try {
-            exist = readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
+            return readWriteTransaction.exists(LogicalDatastoreType.OPERATIONAL,
                     IdentifierCodec.deserialize(MonitoringModule.PATH_TO_STREAMS, schemaContext)).get();
-        } catch (final InterruptedException | ExecutionException e) {
-            throw new RestconfDocumentedException("Problem while checking data if exists", e);
+        } catch (final InterruptedException | ExecutionException exception) {
+            throw new RestconfDocumentedException("Problem while checking data if exists", exception);
         }
-        return exist;
     }
 
     private static void registerToListenNotification(final NotificationListenerAdapter listener,
@@ -378,18 +344,15 @@ public final class SubscribeToStreamUtil {
         final SchemaPath path = listener.getSchemaPath();
         final ListenerRegistration<DOMNotificationListener> registration =
                 notificationServiceHandler.get().registerNotificationListener(listener, path);
-
         listener.setRegistration(registration);
     }
 
     /**
-     * Parse enum from URI.
+     * Parse out enumeration from URI.
      *
-     * @param clazz
-     *             enum type
-     * @param value
-     *             string of enum value
-     * @return enum
+     * @param clazz Target enumeration type.
+     * @param value String representation of enumeration value.
+     * @return Parsed enumeration type.
      */
     private static <T> T parseURIEnum(final Class<T> clazz, final String value) {
         if (value == null || value.equals("")) {
index 06dec11304dfe3ed0a256143ced8250f6254b6cd..7c65f5ca4f311da85591ba48b8e4e47750b41a7a 100644 (file)
@@ -27,7 +27,6 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     private final Set<Channel> subscribers = ConcurrentHashMap.newKeySet();
     private final EventBus eventBus;
 
-    @SuppressWarnings("rawtypes")
     private EventBusChangeRecorder eventBusChangeRecorder;
 
     private volatile ListenerRegistration<?> registration;
@@ -35,7 +34,7 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     /**
      * Creating {@link EventBus}.
      */
-    protected AbstractCommonSubscriber() {
+    AbstractCommonSubscriber() {
         this.eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
     }
 
@@ -55,18 +54,11 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
             this.registration.close();
             this.registration = null;
         }
-
         deleteDataInDS();
         unregister();
     }
 
-    /**
-     * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
-     * subscriber to the event and post event into event bus.
-     *
-     * @param subscriber
-     *            Channel
-     */
+    @Override
     public void addSubscriber(final Channel subscriber) {
         if (!subscriber.isActive()) {
             LOG.debug("Channel is not active between websocket server and subscriber {}", subscriber.remoteAddress());
@@ -76,12 +68,7 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
         this.eventBus.post(event);
     }
 
-    /**
-     * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
-     * subscriber to the event and posts event into event bus.
-     *
-     * @param subscriber subscriber channel
-     */
+    @Override
     public void removeSubscriber(final Channel subscriber) {
         LOG.debug("Subscriber {} is removed.", subscriber.remoteAddress());
         final Event event = new Event(EventType.DEREGISTER);
@@ -89,21 +76,12 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
         this.eventBus.post(event);
     }
 
-    /**
-     * Sets {@link ListenerRegistration} registration.
-     *
-     * @param registration
-     *            DOMDataChangeListener registration
-     */
+    @Override
     public void setRegistration(final ListenerRegistration<?> registration) {
         this.registration = registration;
     }
 
-    /**
-     * Checks if {@link ListenerRegistration} registration exist.
-     *
-     * @return True if exist, false otherwise.
-     */
+    @Override
     public boolean isListening() {
         return this.registration != null;
     }
@@ -112,11 +90,10 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
      * Creating and registering {@link EventBusChangeRecorder} of specific
      * listener on {@link EventBus}.
      *
-     * @param listener
-     *            specific listener of notifications
+     * @param listener Specific listener of notifications.
      */
-    @SuppressWarnings({ "unchecked", "rawtypes" })
-    protected <T extends BaseListenerInterface> void register(final T listener) {
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    <T extends BaseListenerInterface> void register(final T listener) {
         this.eventBusChangeRecorder = new EventBusChangeRecorder(listener);
         this.eventBus.register(this.eventBusChangeRecorder);
     }
@@ -124,18 +101,16 @@ abstract class AbstractCommonSubscriber extends AbstractQueryParams implements B
     /**
      * Post event to event bus.
      *
-     * @param event
-     *            data of incoming notifications
+     * @param event Data of incoming notifications.
      */
     protected void post(final Event event) {
         this.eventBus.post(event);
     }
 
     /**
-     * Removes all subscribers and unregisters event bus change recorder form
-     * event bus.
+     * Removes all subscribers and unregisters event bus change recorder form event bus.
      */
-    protected void unregister() {
+    private void unregister() {
         this.subscribers.clear();
         this.eventBus.unregister(this.eventBusChangeRecorder);
     }
index 1854a1dfc549a03a45c6a6f1890c647564f1177e..feb60142b4f6fbc6556bae582987f547a76922e5 100644 (file)
@@ -11,7 +11,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.StringReader;
 import java.time.Instant;
-import java.util.Optional;
 import javax.xml.XMLConstants;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.parsers.ParserConfigurationException;
@@ -24,7 +23,6 @@ import org.xml.sax.InputSource;
 
 /**
  * Features of query parameters part of both notifications.
- *
  */
 abstract class AbstractQueryParams extends AbstractNotificationsData {
     // FIXME: BUG-7956: switch to using UntrustedXML
@@ -62,21 +60,17 @@ abstract class AbstractQueryParams extends AbstractNotificationsData {
     /**
      * Set query parameters for listener.
      *
-     * @param start
-     *            start-time of getting notification
-     * @param stop
-     *            stop-time of getting notification
-     * @param filter
-     *            indicate which subset of all possible events are of interest
-     * @param leafNodesOnly
-     *            if true, notifications will contain changes to leaf nodes only
+     * @param start         Start-time of getting notification.
+     * @param stop          Stop-time of getting notification.
+     * @param filter        Indicates which subset of all possible events are of interest.
+     * @param leafNodesOnly If TRUE, notifications will contain changes of leaf nodes only.
      */
     @SuppressWarnings("checkstyle:hiddenField")
-    public void setQueryParams(final Instant start, final Optional<Instant> stop, final Optional<String> filter,
-                               final boolean leafNodesOnly) {
+    public void setQueryParams(final Instant start, final Instant stop, final String filter,
+            final boolean leafNodesOnly) {
         this.start = Preconditions.checkNotNull(start);
-        this.stop = stop.orElse(null);
-        this.filter = filter.orElse(null);
+        this.stop = stop;
+        this.filter = filter;
         this.leafNodesOnly = leafNodesOnly;
     }
 
@@ -85,7 +79,7 @@ abstract class AbstractQueryParams extends AbstractNotificationsData {
      *
      * @return true if this query should only notify about leaf node changes
      */
-    public boolean getLeafNodesOnly() {
+    boolean getLeafNodesOnly() {
         return leafNodesOnly;
     }
 
@@ -116,14 +110,13 @@ abstract class AbstractQueryParams extends AbstractNotificationsData {
     /**
      * Check if is filter used and then prepare and post data do client.
      *
-     * @param xml   data of notification
+     * @param xml XML data of notification.
      */
     @SuppressWarnings("checkstyle:IllegalCatch")
     boolean checkFilter(final String xml) {
         if (this.filter == null) {
             return true;
         }
-
         try {
             return parseFilterParam(xml);
         } catch (final Exception e) {
@@ -132,11 +125,10 @@ abstract class AbstractQueryParams extends AbstractNotificationsData {
     }
 
     /**
-     * Parse and evaluate filter value by xml.
+     * Parse and evaluate filter statement by XML format.
      *
-     * @return true or false - depends on filter expression and data of
-     *         notifiaction
-     * @throws Exception if operation fails
+     * @return {@code true} or {@code false} depending on filter expression and data of notification.
+     * @throws Exception If operation fails.
      */
     private boolean parseFilterParam(final String xml) throws Exception {
         final Document docOfXml = DBF.newDocumentBuilder().parse(new InputSource(new StringReader(xml)));
@@ -144,4 +136,4 @@ abstract class AbstractQueryParams extends AbstractNotificationsData {
         // FIXME: BUG-7956: xPath.setNamespaceContext(nsContext);
         return (boolean) xPath.compile(this.filter).evaluate(docOfXml, XPathConstants.BOOLEAN);
     }
-}
+}
\ No newline at end of file
index fdecd7dc618385051f9387b304510ffc00ffd827..fda095b9a16fa89f6108a2a98e03e743a9b118d4 100644 (file)
@@ -9,39 +9,68 @@ package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
 import io.netty.channel.Channel;
 import java.util.Set;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
 /**
- * Base interface for both listeners({@link ListenerAdapter},
- * {@link NotificationListenerAdapter}).
+ * Base interface for both listeners({@link ListenerAdapter}, {@link NotificationListenerAdapter}).
  */
-interface BaseListenerInterface extends AutoCloseable {
+public interface BaseListenerInterface extends AutoCloseable {
 
     /**
      * Return all subscribers of listener.
      *
-     * @return set of subscribers
+     * @return Set of all subscribers.
      */
     Set<Channel> getSubscribers();
 
     /**
      * Checks if exists at least one {@link Channel} subscriber.
      *
-     * @return True if exist at least one {@link Channel} subscriber, false
-     *         otherwise.
+     * @return {@code true} if exist at least one {@link Channel} subscriber, {@code false} otherwise.
      */
     boolean hasSubscribers();
 
     /**
      * Get name of stream.
      *
-     * @return stream name
+     * @return Stream name.
      */
     String getStreamName();
 
     /**
      * Get output type.
      *
-     * @return outputType
+     * @return Output type (JSON or XML).
      */
     String getOutputType();
+
+    /**
+     * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+     * subscriber to the event and post event into event bus.
+     *
+     * @param subscriber Web-socket channel.
+     */
+    void addSubscriber(Channel subscriber);
+
+    /**
+     * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+     * subscriber to the event and posts event into event bus.
+     *
+     * @param subscriber Subscriber channel.
+     */
+    void removeSubscriber(Channel subscriber);
+
+    /**
+     * Sets {@link ListenerRegistration} registration.
+     *
+     * @param registration DOMDataChangeListener registration.
+     */
+    void setRegistration(ListenerRegistration<?> registration);
+
+    /**
+     * Checks if {@link ListenerRegistration} registration exists.
+     *
+     * @return {@code true} if exists, {@code false} otherwise.
+     */
+    boolean isListening();
 }
index c43acae9e9a020aeb6278f332c6e6f305a749842..944631e3fac9152ef649bc34f5a64b3892e0bdb3 100644 (file)
@@ -21,8 +21,7 @@ class EventBusChangeRecorder<T extends BaseListenerInterface> {
     /**
      * Event bus change recorder of specific listener of notifications.
      *
-     * @param listener
-     *             specific listener
+     * @param listener Specific listener.
      */
     EventBusChangeRecorder(final T listener) {
         this.listener = listener;
@@ -32,12 +31,12 @@ class EventBusChangeRecorder<T extends BaseListenerInterface> {
     public void recordCustomerChange(final Event event) {
         if (event.getType() == EventType.REGISTER) {
             final Channel subscriber = event.getSubscriber();
-            if (!this.listener.getSubscribers().contains(subscriber)) {
-                this.listener.getSubscribers().add(subscriber);
-            }
+            this.listener.getSubscribers().add(subscriber);
         } else if (event.getType() == EventType.DEREGISTER) {
             this.listener.getSubscribers().remove(event.getSubscriber());
-            Notificator.removeListenerIfNoSubscriberExists(this.listener);
+            if (!this.listener.hasSubscribers()) {
+                ListenersBroker.getInstance().removeAndCloseListener(this.listener);
+            }
         } else if (event.getType() == EventType.NOTIFY) {
             for (final Channel subscriber : this.listener.getSubscribers()) {
                 if (subscriber.isActive()) {
@@ -50,4 +49,4 @@ class EventBusChangeRecorder<T extends BaseListenerInterface> {
             }
         }
     }
-}
+}
\ No newline at end of file
index 386f4d07af329ad94f3f03fdf4828a151a337fbf..12a6ae4904dd63b6a2c66b4e1e0c86c12833b313 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.time.Instant;
@@ -30,6 +31,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.UnkeyedListEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.util.DataSchemaContextNode;
 import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -41,8 +43,7 @@ import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
 /**
- * {@link ListenerAdapter} is responsible to track events, which occurred by
- * changing data in data source.
+ * {@link ListenerAdapter} is responsible to track events, which occurred by changing data in data source.
  */
 public class ListenerAdapter extends AbstractCommonSubscriber implements ClusteredDOMDataTreeChangeListener {
 
@@ -53,15 +54,11 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
     private final NotificationOutputType outputType;
 
     /**
-     * Creates new {@link ListenerAdapter} listener specified by path and stream
-     * name and register for subscribing.
+     * Creates new {@link ListenerAdapter} listener specified by path and stream name and register for subscribing.
      *
-     * @param path
-     *            Path to data in data store.
-     * @param streamName
-     *            The name of the stream.
-     * @param outputType
-     *            Type of output on notification (JSON, XML)
+     * @param path       Path to data in data store.
+     * @param streamName The name of the stream.
+     * @param outputType Type of output on notification (JSON, XML).
      */
     ListenerAdapter(final YangInstanceIdentifier path, final String streamName,
             final NotificationOutputType outputType) {
@@ -114,7 +111,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
     /**
      * Prepare data of notification and data to client.
      *
-     * @param xml   data
+     * @param xml XML-formatted data.
      */
     private void prepareAndPostData(final String xml) {
         final Event event = new Event(EventType.NOTIFY);
@@ -126,15 +123,10 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
         post(event);
     }
 
-    /**
-     * Tracks events of data change by customer.
-     */
-
     /**
      * Prepare data in printable form and transform it to String.
      *
-     * @param dataTreeCandidates the DataTreeCandidates to transform
-     *
+     * @param dataTreeCandidates Data-tree candidates to be transformed.
      * @return Data in printable form.
      */
     private String prepareXml(final Collection<DataTreeCandidate> dataTreeCandidates) {
@@ -157,8 +149,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
      */
     @SuppressWarnings("checkstyle:hiddenField")
     private void addValuesToDataChangedNotificationEventElement(final Document doc,
-            final Element dataChangedNotificationEventElement,
-            final Collection<DataTreeCandidate> dataTreeCandidates,
+            final Element dataChangedNotificationEventElement, final Collection<DataTreeCandidate> dataTreeCandidates,
             final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
 
         for (DataTreeCandidate dataTreeCandidate : dataTreeCandidates) {
@@ -177,7 +168,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
             final YangInstanceIdentifier parentYiid, final SchemaContext schemaContext,
             final DataSchemaContextTree dataSchemaContextTree) {
 
-        Optional<NormalizedNode<?,?>> optionalNormalizedNode = Optional.empty();
+        Optional<NormalizedNode<?, ?>> optionalNormalizedNode = Optional.empty();
         switch (candidateNode.getModificationType()) {
             case APPEARED:
             case SUBTREE_MODIFIED:
@@ -198,11 +189,13 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
             return;
         }
 
-        NormalizedNode<?,?> normalizedNode = optionalNormalizedNode.get();
+        NormalizedNode<?, ?> normalizedNode = optionalNormalizedNode.get();
         YangInstanceIdentifier yiid = YangInstanceIdentifier.builder(parentYiid)
-                                                            .append(normalizedNode.getIdentifier()).build();
+                .append(normalizedNode.getIdentifier()).build();
 
-        boolean isNodeMixin = dataSchemaContextTree.getChild(yiid).isMixin();
+        final Optional<DataSchemaContextNode<?>> childrenSchemaNode = dataSchemaContextTree.findChild(yiid);
+        Preconditions.checkState(childrenSchemaNode.isPresent());
+        boolean isNodeMixin = childrenSchemaNode.get().isMixin();
         boolean isSkippedNonLeaf = getLeafNodesOnly() && !(normalizedNode instanceof LeafNode);
         if (!isNodeMixin && !isSkippedNonLeaf) {
             Node node = null;
@@ -216,7 +209,7 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
                     break;
                 case DELETE:
                 case DISAPPEARED:
-                    node = createDataChangeEventElement(doc, yiid, Operation.DELETED, schemaContext);
+                    node = createDataChangeEventElement(doc, yiid, schemaContext);
                     break;
                 case UNMODIFIED:
                 default:
@@ -228,41 +221,35 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
         }
 
         for (DataTreeCandidateNode childNode : candidateNode.getChildNodes()) {
-            addNodeToDataChangeNotificationEventElement(doc, dataChangedNotificationEventElement, childNode,
-                                                                        yiid, schemaContext, dataSchemaContextTree);
+            addNodeToDataChangeNotificationEventElement(
+                    doc, dataChangedNotificationEventElement, childNode, yiid, schemaContext, dataSchemaContextTree);
         }
     }
 
     /**
-     * Creates changed event element from data.
+     * Creates data-changed event element from data.
      *
-     * @param doc
-     *            {@link Document}
-     * @param path
-     *            Path to data in data store.
-     * @param operation
-     *            {@link Operation}
-     * @param schemaContext
-     *            schema context
-     * @return {@link Node} node represented by changed event element.
+     * @param doc           {@link Document}
+     * @param schemaContext Schema context.
+     * @return {@link Node} represented by changed event element.
      */
     private Node createDataChangeEventElement(final Document doc, final YangInstanceIdentifier eventPath,
-            final Operation operation, final SchemaContext schemaContext) {
+            final SchemaContext schemaContext) {
         final Element dataChangeEventElement = doc.createElement("data-change-event");
         final Element pathElement = doc.createElement("path");
         addPathAsValueToElement(eventPath, pathElement, schemaContext);
         dataChangeEventElement.appendChild(pathElement);
 
         final Element operationElement = doc.createElement("operation");
-        operationElement.setTextContent(operation.value);
+        operationElement.setTextContent(Operation.DELETED.value);
         dataChangeEventElement.appendChild(operationElement);
 
         return dataChangeEventElement;
     }
 
-    private Node createCreatedChangedDataChangeEventElement(final Document doc,
-            final YangInstanceIdentifier eventPath, final NormalizedNode<?, ?> normalized, final Operation operation,
-            final SchemaContext schemaContext, final DataSchemaContextTree dataSchemaContextTree) {
+    private Node createCreatedChangedDataChangeEventElement(final Document doc, final YangInstanceIdentifier eventPath,
+            final NormalizedNode<?, ?> normalized, final Operation operation, final SchemaContext schemaContext,
+            final DataSchemaContextTree dataSchemaContextTree) {
         final Element dataChangeEventElement = doc.createElement("data-change-event");
         final Element pathElement = doc.createElement("path");
         addPathAsValueToElement(eventPath, pathElement, schemaContext);
@@ -274,10 +261,12 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
 
         try {
             SchemaPath nodePath;
+            final Optional<DataSchemaContextNode<?>> childrenSchemaNode = dataSchemaContextTree.findChild(eventPath);
+            Preconditions.checkState(childrenSchemaNode.isPresent());
             if (normalized instanceof MapEntryNode || normalized instanceof UnkeyedListEntryNode) {
-                nodePath = dataSchemaContextTree.getChild(eventPath).getDataSchemaNode().getPath();
+                nodePath = childrenSchemaNode.get().getDataSchemaNode().getPath();
             } else {
-                nodePath = dataSchemaContextTree.getChild(eventPath).getDataSchemaNode().getPath().getParent();
+                nodePath = childrenSchemaNode.get().getDataSchemaNode().getPath().getParent();
             }
             final DOMResult domResult = writeNormalizedNode(normalized, schemaContext, nodePath);
             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
@@ -296,12 +285,9 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
     /**
      * Adds path as value to element.
      *
-     * @param eventPath
-     *            Path to data in data store.
-     * @param element
-     *            {@link Element}
-     * @param schemaContext
-     *            schema context
+     * @param eventPath     Path to data in data store.
+     * @param element       {@link Element}
+     * @param schemaContext Schema context.
      */
     @SuppressWarnings("rawtypes")
     private void addPathAsValueToElement(final YangInstanceIdentifier eventPath, final Element element,
@@ -313,14 +299,14 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
                 continue;
             }
             textContent.append("/");
-            writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), schemaContext);
+            writeIdentifierWithNamespacePrefix(textContent, pathArgument.getNodeType(), schemaContext);
             if (pathArgument instanceof NodeIdentifierWithPredicates) {
                 final Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
                 for (final Entry<QName, Object> entry : predicates.entrySet()) {
                     final QName keyValue = entry.getKey();
                     final String predicateValue = String.valueOf(entry.getValue());
                     textContent.append("[");
-                    writeIdentifierWithNamespacePrefix(element, textContent, keyValue, schemaContext);
+                    writeIdentifierWithNamespacePrefix(textContent, keyValue, schemaContext);
                     textContent.append("='");
                     textContent.append(predicateValue);
                     textContent.append("'");
@@ -339,30 +325,32 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
     /**
      * Writes identifier that consists of prefix and QName.
      *
-     * @param element
-     *            {@link Element}
-     * @param textContent
-     *            StringBuilder
-     * @param qualifiedName
-     *            QName
-     * @param schemaContext
-     *            schema context
+     * @param textContent   Text builder that should be supplemented by QName and its modules name.
+     * @param qualifiedName QName of the element.
+     * @param schemaContext Schema context that holds modules which should contain module specified in QName.
      */
-    private static void writeIdentifierWithNamespacePrefix(final Element element, final StringBuilder textContent,
-            final QName qualifiedName, final SchemaContext schemaContext) {
-        final Module module = schemaContext.findModule(qualifiedName.getModule()).get();
-
-        textContent.append(module.getName());
-        textContent.append(":");
-        textContent.append(qualifiedName.getLocalName());
+    private static void writeIdentifierWithNamespacePrefix(final StringBuilder textContent, final QName qualifiedName,
+            final SchemaContext schemaContext) {
+        final Optional<Module> module = schemaContext.findModule(qualifiedName.getModule());
+        if (module.isPresent()) {
+            textContent.append(module.get().getName());
+            textContent.append(":");
+            textContent.append(qualifiedName.getLocalName());
+        } else {
+            LOG.error("Cannot write identifier with namespace prefix in data-change listener adapter: "
+                    + "Cannot find module in schema context for input QName {}.", qualifiedName);
+            throw new IllegalStateException(String.format("Cannot find module in schema context for input QName %s.",
+                    qualifiedName));
+        }
     }
 
     /**
-     * Consists of three types {@link Operation#CREATED},
-     * {@link Operation#UPDATED} and {@link Operation#DELETED}.
+     * Consists of three types {@link Operation#CREATED}, {@link Operation#UPDATED} and {@link Operation#DELETED}.
      */
     private enum Operation {
-        CREATED("created"), UPDATED("updated"), DELETED("deleted");
+        CREATED("created"),
+        UPDATED("updated"),
+        DELETED("deleted");
 
         private final String value;
 
@@ -370,4 +358,13 @@ public class ListenerAdapter extends AbstractCommonSubscriber implements Cluster
             this.value = value;
         }
     }
-}
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("path", path)
+                .add("stream-name", streamName)
+                .add("output-type", outputType)
+                .toString();
+    }
+}
\ No newline at end of file
diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/ListenersBroker.java
new file mode 100644 (file)
index 0000000..85a6569
--- /dev/null
@@ -0,0 +1,363 @@
+/*
+ * Copyright © 2019 FRINX s.r.o. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.StampedLock;
+import java.util.function.Function;
+import org.opendaylight.restconf.nb.rfc8040.rests.utils.RestconfStreamsConstants;
+import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This singleton class is responsible for creation, removal and searching for {@link ListenerAdapter} or
+ * {@link NotificationListenerAdapter} listeners.
+ */
+public final class ListenersBroker {
+    private static final Logger LOG = LoggerFactory.getLogger(ListenersBroker.class);
+    private static ListenersBroker listenersBroker;
+
+    private final StampedLock dataChangeListenersLock = new StampedLock();
+    private final StampedLock notificationListenersLock = new StampedLock();
+    private final BiMap<String, ListenerAdapter> dataChangeListeners = HashBiMap.create();
+    private final BiMap<String, NotificationListenerAdapter> notificationListeners = HashBiMap.create();
+
+    private ListenersBroker() {
+    }
+
+    /**
+     * Creation of the singleton listeners broker.
+     *
+     * @return Reusable instance of {@link ListenersBroker}.
+     */
+    public static synchronized ListenersBroker getInstance() {
+        if (listenersBroker == null) {
+            listenersBroker = new ListenersBroker();
+        }
+        return listenersBroker;
+    }
+
+    /**
+     * Returns set of all data-change-event streams.
+     */
+    public Set<String> getDataChangeStreams() {
+        final long stamp = dataChangeListenersLock.readLock();
+        try {
+            return ImmutableSet.copyOf(dataChangeListeners.keySet());
+        } finally {
+            dataChangeListenersLock.unlockRead(stamp);
+        }
+    }
+
+    /**
+     * Returns set of all notification streams.
+     */
+    public Set<String> getNotificationStreams() {
+        final long stamp = notificationListenersLock.readLock();
+        try {
+            return ImmutableSet.copyOf(notificationListeners.keySet());
+        } finally {
+            notificationListenersLock.unlockRead(stamp);
+        }
+    }
+
+    /**
+     * Gets {@link ListenerAdapter} specified by stream identification.
+     *
+     * @param streamName Stream name.
+     * @return {@link ListenerAdapter} specified by stream name wrapped in {@link Optional} or {@link Optional#empty()}
+     *     if listener with specified stream name doesn't exist.
+     */
+    public Optional<ListenerAdapter> getDataChangeListenerFor(final String streamName) {
+        final long stamp = dataChangeListenersLock.readLock();
+        try {
+            final ListenerAdapter listenerAdapter = dataChangeListeners.get(requireNonNull(streamName));
+            return Optional.ofNullable(listenerAdapter);
+        } finally {
+            dataChangeListenersLock.unlockRead(stamp);
+        }
+    }
+
+    /**
+     * Gets {@link NotificationListenerAdapter} specified by stream name.
+     *
+     * @param streamName Stream name.
+     * @return {@link NotificationListenerAdapter} specified by stream name wrapped in {@link Optional}
+     *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
+     */
+    public Optional<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
+        final long stamp = notificationListenersLock.readLock();
+        try {
+            final NotificationListenerAdapter listenerAdapter = notificationListeners.get(requireNonNull(streamName));
+            return Optional.ofNullable(listenerAdapter);
+        } finally {
+            notificationListenersLock.unlockRead(stamp);
+        }
+    }
+
+    /**
+     * Get listener for stream-name.
+     *
+     * @param streamName Stream name.
+     * @return {@link NotificationListenerAdapter} or {@link ListenerAdapter} object wrapped in {@link Optional}
+     *     or {@link Optional#empty()} if listener with specified stream name doesn't exist.
+     */
+    public Optional<BaseListenerInterface> getListenerFor(final String streamName) {
+        if (streamName.startsWith(RestconfStreamsConstants.NOTIFICATION_STREAM)) {
+            return getNotificationListenerFor(streamName).map(Function.identity());
+        } else if (streamName.startsWith(RestconfStreamsConstants.DATA_SUBSCRIPTION)) {
+            return getDataChangeListenerFor(streamName).map(Function.identity());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Creates new {@link ListenerAdapter} listener using input stream name and path if such listener
+     * hasn't been created yet.
+     *
+     * @param path       Path to data in data repository.
+     * @param streamName Stream name.
+     * @param outputType Specific type of output for notifications - XML or JSON.
+     * @return Created or existing data-change listener adapter.
+     */
+    public ListenerAdapter registerDataChangeListener(final YangInstanceIdentifier path, final String streamName,
+            final NotificationOutputType outputType) {
+        requireNonNull(path);
+        requireNonNull(streamName);
+        requireNonNull(outputType);
+
+        final long stamp = dataChangeListenersLock.writeLock();
+        try {
+            return dataChangeListeners.computeIfAbsent(streamName, stream -> new ListenerAdapter(
+                    path, stream, outputType));
+        } finally {
+            dataChangeListenersLock.unlockWrite(stamp);
+        }
+    }
+
+    /**
+     * Creates new {@link NotificationDefinition} listener using input stream name and schema path
+     * if such listener haven't been created yet.
+     *
+     * @param schemaPath Schema path of YANG notification structure.
+     * @param streamName Stream name.
+     * @param outputType Specific type of output for notifications - XML or JSON.
+     * @return Created or existing notification listener adapter.
+     */
+    public NotificationListenerAdapter registerNotificationListener(final SchemaPath schemaPath,
+            final String streamName, final NotificationOutputType outputType) {
+        requireNonNull(schemaPath);
+        requireNonNull(streamName);
+        requireNonNull(outputType);
+
+        final long stamp = notificationListenersLock.writeLock();
+        try {
+            return notificationListeners.computeIfAbsent(streamName, stream -> new NotificationListenerAdapter(
+                    schemaPath, stream, outputType.getName()));
+        } finally {
+            notificationListenersLock.unlockWrite(stamp);
+        }
+    }
+
+    /**
+     * Removal and closing of all data-change-event and notification listeners.
+     */
+    public synchronized void removeAndCloseAllListeners() {
+        final long stampNotifications = notificationListenersLock.writeLock();
+        final long stampDataChanges = dataChangeListenersLock.writeLock();
+        try {
+            removeAndCloseAllDataChangeListenersTemplate();
+            removeAndCloseAllNotificationListenersTemplate();
+        } finally {
+            dataChangeListenersLock.unlockWrite(stampDataChanges);
+            notificationListenersLock.unlockWrite(stampNotifications);
+        }
+    }
+
+    /**
+     * Closes and removes all data-change listeners.
+     */
+    public void removeAndCloseAllDataChangeListeners() {
+        final long stamp = dataChangeListenersLock.writeLock();
+        try {
+            removeAndCloseAllDataChangeListenersTemplate();
+        } finally {
+            dataChangeListenersLock.unlockWrite(stamp);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void removeAndCloseAllDataChangeListenersTemplate() {
+        dataChangeListeners.values()
+                .forEach(listenerAdapter -> {
+                    try {
+                        listenerAdapter.close();
+                    } catch (final Exception exception) {
+                        LOG.error("Failed to close data-change listener {}.", listenerAdapter, exception);
+                        throw new IllegalStateException(String.format("Failed to close data-change listener %s.",
+                                listenerAdapter), exception);
+                    }
+                });
+        dataChangeListeners.clear();
+    }
+
+    /**
+     * Closes and removes all notification listeners.
+     */
+    public void removeAndCloseAllNotificationListeners() {
+        final long stamp = notificationListenersLock.writeLock();
+        try {
+            removeAndCloseAllNotificationListenersTemplate();
+        } finally {
+            notificationListenersLock.unlockWrite(stamp);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void removeAndCloseAllNotificationListenersTemplate() {
+        notificationListeners.values()
+                .forEach(listenerAdapter -> {
+                    try {
+                        listenerAdapter.close();
+                    } catch (final Exception exception) {
+                        LOG.error("Failed to close notification listener {}.", listenerAdapter, exception);
+                        throw new IllegalStateException(String.format("Failed to close notification listener %s.",
+                                listenerAdapter), exception);
+                    }
+                });
+        notificationListeners.clear();
+    }
+
+    /**
+     * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
+     *
+     * @param listener Listener to be closed and removed.
+     */
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void removeAndCloseDataChangeListener(final ListenerAdapter listener) {
+        final long stamp = dataChangeListenersLock.writeLock();
+        try {
+            removeAndCloseDataChangeListenerTemplate(listener);
+        } catch (final Exception exception) {
+            LOG.error("Data-change listener {} cannot be closed.", listener, exception);
+        } finally {
+            dataChangeListenersLock.unlockWrite(stamp);
+        }
+    }
+
+    /**
+     * Removes and closes data-change listener of type {@link ListenerAdapter} specified in parameter.
+     *
+     * @param listener Listener to be closed and removed.
+     */
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private void removeAndCloseDataChangeListenerTemplate(final ListenerAdapter listener) {
+        final long stamp = dataChangeListenersLock.writeLock();
+        try {
+            requireNonNull(listener).close();
+            if (dataChangeListeners.inverse().remove(listener) == null) {
+                LOG.warn("There isn't any data-change event stream that would match listener adapter {}.", listener);
+            }
+        } catch (final Exception exception) {
+            LOG.error("Data-change listener {} cannot be closed.", listener, exception);
+            throw new IllegalStateException(String.format(
+                    "Data-change listener %s cannot be closed.",
+                    listener), exception);
+        } finally {
+            dataChangeListenersLock.unlockWrite(stamp);
+        }
+    }
+
+    /**
+     * Removes and closes notification listener of type {@link NotificationListenerAdapter} specified in parameter.
+     *
+     * @param listener Listener to be closed and removed.
+     */
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void removeAndCloseNotificationListener(final NotificationListenerAdapter listener) {
+        final long stamp = notificationListenersLock.writeLock();
+        try {
+            removeAndCloseNotificationListenerTemplate(listener);
+        } catch (final Exception exception) {
+            LOG.error("Notification listener {} cannot be closed.", listener, exception);
+        } finally {
+            notificationListenersLock.unlockWrite(stamp);
+        }
+    }
+
+    @SuppressWarnings({"checkstyle:IllegalCatch"})
+    private void removeAndCloseNotificationListenerTemplate(NotificationListenerAdapter listener) {
+        try {
+            requireNonNull(listener).close();
+            if (notificationListeners.inverse().remove(listener) == null) {
+                LOG.warn("There isn't any notification stream that would match listener adapter {}.", listener);
+            }
+        } catch (final Exception exception) {
+            LOG.error("Notification listener {} cannot be closed.", listener, exception);
+            throw new IllegalStateException(String.format(
+                    "Notification listener %s cannot be closed.", listener),
+                    exception);
+        }
+    }
+
+    /**
+     * Removal and closing of general listener (data-change or notification listener).
+     *
+     * @param listener Listener to be closed and removed from cache.
+     */
+    void removeAndCloseListener(final BaseListenerInterface listener) {
+        requireNonNull(listener);
+        if (listener instanceof ListenerAdapter) {
+            removeAndCloseDataChangeListener((ListenerAdapter) listener);
+        } else if (listener instanceof NotificationListenerAdapter) {
+            removeAndCloseNotificationListener((NotificationListenerAdapter) listener);
+        }
+    }
+
+    /**
+     * Creates string representation of stream name from URI. Removes slash from URI in start and end positions.
+     *
+     * @param uri URI for creation of stream name.
+     * @return String representation of stream name.
+     */
+    public static String createStreamNameFromUri(final String uri) {
+        String result = requireNonNull(uri);
+        if (result.startsWith("/")) {
+            result = result.substring(1);
+        }
+        if (result.endsWith("/")) {
+            result = result.substring(0, result.length() - 1);
+        }
+        return result;
+    }
+
+    @VisibleForTesting
+    public synchronized void setDataChangeListeners(final Map<String, ListenerAdapter> listenerAdapterCollection) {
+        final long stamp = dataChangeListenersLock.writeLock();
+        try {
+            dataChangeListeners.clear();
+            dataChangeListeners.putAll(listenerAdapterCollection);
+        } finally {
+            dataChangeListenersLock.unlockWrite(stamp);
+        }
+    }
+}
\ No newline at end of file
index 1814f07113dca03db5ff617736684868bc29e2ed..32a8c66eeb2888d621f1b8df55c540f754d7c45d 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
@@ -34,9 +35,7 @@ import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
 /**
- * {@link NotificationListenerAdapter} is responsible to track events on
- * notifications.
- *
+ * {@link NotificationListenerAdapter} is responsible to track events on notifications.
  */
 public class NotificationListenerAdapter extends AbstractCommonSubscriber implements DOMNotificationListener {
 
@@ -49,12 +48,9 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
     /**
      * Set path of listener and stream name, register event bus.
      *
-     * @param path
-     *             path of notification
-     * @param streamName
-     *             stream name of listener
-     * @param outputType
-     *             type of output on notification (JSON, XML)
+     * @param path       Schema path of YANG notification.
+     * @param streamName Name of the stream.
+     * @param outputType Type of output on notification (JSON or XML).
      */
     NotificationListenerAdapter(final SchemaPath path, final String streamName, final String outputType) {
         register(this);
@@ -67,9 +63,9 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
     }
 
     /**
-     * Get outputType of listener.
+     * Get output type of this listener.
      *
-     * @return the outputType
+     * @return The configured output type (JSON or XML).
      */
     @Override
     public String getOutputType() {
@@ -93,7 +89,7 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
     /**
      * Get stream name of this listener.
      *
-     * @return {@link String}
+     * @return The configured stream name.
      */
     @Override
     public String getStreamName() {
@@ -103,7 +99,7 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
     /**
      * Get schema path of notification.
      *
-     * @return {@link SchemaPath}
+     * @return The configured schema path that points to observing YANG notification schema node.
      */
     public SchemaPath getSchemaPath() {
         return this.path;
@@ -112,7 +108,7 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
     /**
      * Prepare data of notification and data to client.
      *
-     * @param data   data
+     * @param data JSON or XML data that holds notification data.
      */
     private void prepareAndPostData(final String data) {
         final Event event = new Event(EventType.NOTIFY);
@@ -121,9 +117,9 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
     }
 
     /**
-     * Prepare json from notification data.
+     * Creation of JSON from notification data.
      *
-     * @return json as {@link String}
+     * @return Transformed notification data in JSON format.
      */
     @VisibleForTesting
     String prepareJson(final SchemaContext schemaContext, final DOMNotification notification) {
@@ -137,8 +133,8 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
     private static String writeBodyToString(final SchemaContext schemaContext, final DOMNotification notification) {
         final Writer writer = new StringWriter();
         final NormalizedNodeStreamWriter jsonStream = JSONNormalizedNodeStreamWriter.createExclusiveWriter(
-            JSONCodecFactorySupplier.DRAFT_LHOTKA_NETMOD_YANG_JSON_02.getShared(schemaContext), notification.getType(),
-            null, JsonWriterFactory.createJsonWriter(writer));
+                JSONCodecFactorySupplier.DRAFT_LHOTKA_NETMOD_YANG_JSON_02.getShared(schemaContext),
+                notification.getType(), null, JsonWriterFactory.createJsonWriter(writer));
         final NormalizedNodeWriter nodeWriter = NormalizedNodeWriter.forStreamWriter(jsonStream);
         try {
             nodeWriter.write(notification.getBody());
@@ -149,6 +145,11 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
         return writer.toString();
     }
 
+    /**
+     * Creation of XML from notification data.
+     *
+     * @return Transformed notification data in XML format.
+     */
     private String prepareXml(final SchemaContext schemaContext, final DOMNotification notification) {
         final Document doc = createDocument();
         final Element notificationElement = basePartDoc(doc);
@@ -164,7 +165,6 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
     private void addValuesToNotificationEventElement(final Document doc, final Element element,
             final SchemaContext schemaContext, final DOMNotification notification) {
         try {
-
             final DOMResult domResult = writeNormalizedNode(notification.getBody(), schemaContext, this.path);
             final Node result = doc.importNode(domResult.getNode().getFirstChild(), true);
             final Element dataElement = doc.createElement("notification");
@@ -176,4 +176,13 @@ public class NotificationListenerAdapter extends AbstractCommonSubscriber implem
             LOG.error("Error processing stream", e);
         }
     }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .add("path", path)
+                .add("stream-name", streamName)
+                .add("output-type", outputType)
+                .toString();
+    }
 }
diff --git a/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Notificator.java b/restconf/restconf-nb-rfc8040/src/main/java/org/opendaylight/restconf/nb/rfc8040/streams/listeners/Notificator.java
deleted file mode 100644 (file)
index d26fca4..0000000
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.restconf.nb.rfc8040.streams.listeners;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
-import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link Notificator} is responsible to create, remove and find
- * {@link ListenerAdapter} listener.
- */
-public final class Notificator {
-
-    private static Map<String, ListenerAdapter> dataChangeListener = new ConcurrentHashMap<>();
-    private static Map<String, List<NotificationListenerAdapter>> notificationListenersByStreamName =
-            new ConcurrentHashMap<>();
-
-    private static final Logger LOG = LoggerFactory.getLogger(Notificator.class);
-    private static final Lock LOCK = new ReentrantLock();
-
-    private Notificator() {
-    }
-
-    /**
-     * Returns list of all stream names.
-     */
-    public static Set<String> getStreamNames() {
-        return dataChangeListener.keySet();
-    }
-
-    /**
-     * Gets {@link ListenerAdapter} specified by stream name.
-     *
-     * @param streamName
-     *            The name of the stream.
-     * @return {@link ListenerAdapter} specified by stream name.
-     */
-    public static ListenerAdapter getListenerFor(final String streamName) {
-        return dataChangeListener.get(streamName);
-    }
-
-    /**
-     * Checks if the listener specified by {@link YangInstanceIdentifier} path exist.
-     *
-     * @param streamName    name of the stream
-     * @return True if the listener exist, false otherwise.
-     */
-    public static boolean existListenerFor(final String streamName) {
-        return dataChangeListener.containsKey(streamName);
-    }
-
-    /**
-     * Creates new {@link ListenerAdapter} listener from
-     * {@link YangInstanceIdentifier} path and stream name.
-     *
-     * @param path
-     *            Path to data in data repository.
-     * @param streamName
-     *            The name of the stream.
-     * @param outputType
-     *             Spcific type of output for notifications - XML or JSON
-     * @return New {@link ListenerAdapter} listener from
-     *         {@link YangInstanceIdentifier} path and stream name.
-     */
-    public static ListenerAdapter createListener(final YangInstanceIdentifier path, final String streamName,
-            final NotificationOutputType outputType) {
-        final ListenerAdapter listener = new ListenerAdapter(path, streamName, outputType);
-        try {
-            LOCK.lock();
-            dataChangeListener.put(streamName, listener);
-        } finally {
-            LOCK.unlock();
-        }
-        return listener;
-    }
-
-    /**
-     * Looks for listener determined by {@link YangInstanceIdentifier} path and removes it.
-     * Creates String representation of stream name from URI. Removes slash from URI in start and end position.
-     *
-     * @param uri
-     *            URI for creation stream name.
-     * @return String representation of stream name.
-     */
-    public static String createStreamNameFromUri(final String uri) {
-        if (uri == null) {
-            return null;
-        }
-        String result = uri;
-        if (result.startsWith("/")) {
-            result = result.substring(1);
-        }
-        if (result.endsWith("/")) {
-            result = result.substring(0, result.length() - 1);
-        }
-        return result;
-    }
-
-    /**
-     * Removes all listeners.
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    public static void removeAllListeners() {
-        for (final ListenerAdapter listener : dataChangeListener.values()) {
-            try {
-                listener.close();
-            } catch (final Exception e) {
-                LOG.error("Failed to close listener", e);
-            }
-        }
-        try {
-            LOCK.lock();
-            dataChangeListener = new ConcurrentHashMap<>();
-        } finally {
-            LOCK.unlock();
-        }
-    }
-
-    /**
-     * Delete {@link ListenerAdapter} listener specified in parameter.
-     *
-     * @param <T>
-     *
-     * @param listener
-     *            ListenerAdapter
-     */
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private static <T extends BaseListenerInterface> void deleteListener(final T listener) {
-        if (listener != null) {
-            try {
-                listener.close();
-            } catch (final Exception e) {
-                LOG.error("Failed to close listener", e);
-            }
-            try {
-                LOCK.lock();
-                dataChangeListener.remove(listener.getStreamName());
-            } finally {
-                LOCK.unlock();
-            }
-        }
-    }
-
-    /**
-     * Check if the listener specified by qnames of request exist.
-     *
-     * @param streamName
-     *             name of stream
-     * @return True if the listener exist, false otherwise.
-     */
-    public static boolean existNotificationListenerFor(final String streamName) {
-        return notificationListenersByStreamName.containsKey(streamName);
-    }
-
-    /**
-     * Prepare listener for notification ({@link NotificationDefinition}).
-     *
-     * @param paths
-     *             paths of notifications
-     * @param streamName
-     *             name of stream (generated by paths)
-     * @param outputType
-     *             type of output for onNotification - XML or JSON
-     * @return List of {@link NotificationListenerAdapter} by paths
-     */
-    public static List<NotificationListenerAdapter> createNotificationListener(final List<SchemaPath> paths,
-            final String streamName, final String outputType) {
-        final List<NotificationListenerAdapter> listListeners = new ArrayList<>();
-        for (final SchemaPath path : paths) {
-            final NotificationListenerAdapter listener = new NotificationListenerAdapter(path, streamName, outputType);
-            listListeners.add(listener);
-        }
-        try {
-            LOCK.lock();
-            notificationListenersByStreamName.put(streamName, listListeners);
-        } finally {
-            LOCK.unlock();
-        }
-        return listListeners;
-    }
-
-    public static <T extends BaseListenerInterface> void removeListenerIfNoSubscriberExists(final T listener) {
-        if (!listener.hasSubscribers()) {
-            if (listener instanceof NotificationListenerAdapter) {
-                deleteNotificationListener(listener);
-            } else {
-                deleteListener(listener);
-            }
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private static <T extends BaseListenerInterface> void deleteNotificationListener(final T listener) {
-        if (listener != null) {
-            try {
-                listener.close();
-            } catch (final Exception e) {
-                LOG.error("Failed to close listener", e);
-            }
-            try {
-                LOCK.lock();
-                notificationListenersByStreamName.remove(listener.getStreamName());
-            } finally {
-                LOCK.unlock();
-            }
-        }
-    }
-
-    public static List<NotificationListenerAdapter> getNotificationListenerFor(final String streamName) {
-        return notificationListenersByStreamName.get(streamName);
-    }
-}
index 54b11e040e4a1c1102447a3b2f7713de4d2e1c6b..c13d2f591c1391e7626ee8b8129d53ef7504f129 100644 (file)
@@ -14,18 +14,17 @@ import io.netty.channel.Channel;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link WebSocketServer} is the singleton responsible for starting and stopping the
- * web socket server.
+ * {@link WebSocketServer} is the class that is responsible for starting and stopping of web-socket server with
+ * specified listening TCP port.
  */
 public final class WebSocketServer implements Runnable {
 
     private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
-
     private static WebSocketServer instance = null;
 
     private final int port;
@@ -41,8 +40,7 @@ public final class WebSocketServer implements Runnable {
     /**
      * Create singleton instance of {@link WebSocketServer}.
      *
-     * @param port TCP port used for this server
-     * @return instance of {@link WebSocketServer}
+     * @param port TCP port used for this server.
      */
     public static WebSocketServer createInstance(final int port) {
         Preconditions.checkState(instance == null, "createInstance() has already been called");
@@ -53,9 +51,9 @@ public final class WebSocketServer implements Runnable {
     }
 
     /**
-     * Get the websocket of TCP port.
+     * Get the TCP port of websocket server.
      *
-     * @return websocket TCP port
+     * @return TCP port number.
      */
     public int getPort() {
         return port;
@@ -64,7 +62,7 @@ public final class WebSocketServer implements Runnable {
     /**
      * Get instance of {@link WebSocketServer} created by {@link #createInstance(int)}.
      *
-     * @return instance of {@link WebSocketServer}
+     * @return Instance of {@link WebSocketServer}.
      */
     public static WebSocketServer getInstance() {
         Preconditions.checkNotNull(instance, "createInstance() must be called prior to getInstance()");
@@ -96,10 +94,10 @@ public final class WebSocketServer implements Runnable {
 
             channel.closeFuture().sync();
         } catch (final InterruptedException e) {
-            LOG.error("Web socket server encountered an error during startup attempt on port {}", port, e);
+            LOG.error("Web socket server encountered an error during startup attempt on port {}.", port, e);
         } catch (Throwable throwable) {
             // sync() re-throws exceptions declared as Throwable, so the compiler doesn't see them
-            LOG.error("Error while binding to port {}", port, throwable);
+            LOG.error("Error while binding to port {}.", port, throwable);
             throw throwable;
         } finally {
             stop();
@@ -110,8 +108,8 @@ public final class WebSocketServer implements Runnable {
      * Stops the web socket server and removes all listeners.
      */
     private void stop() {
-        LOG.debug("Stopping the web socket server instance on port {}", port);
-        Notificator.removeAllListeners();
+        LOG.debug("Stopping the web socket server instance on port {}.", port);
+        ListenersBroker.getInstance().removeAndCloseAllListeners();
         if (bossGroup != null) {
             bossGroup.shutdownGracefully();
             bossGroup = null;
index 903b0dbf56d5a247453aaec886d39edb2415acb5..880c59a5260b410ea8a3230249bae3d7c89245a4 100755 (executable)
@@ -8,16 +8,6 @@
 
 package org.opendaylight.restconf.nb.rfc8040.streams.websockets;
 
-import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
-import static io.netty.handler.codec.http.HttpMethod.GET;
-import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
-import static io.netty.handler.codec.http.HttpUtil.setContentLength;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
@@ -25,7 +15,12 @@ import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
 import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
@@ -33,10 +28,10 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
 import io.netty.util.CharsetUtil;
-import java.util.List;
+import java.util.Optional;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.NotificationListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
 import org.opendaylight.restconf.nb.rfc8040.utils.RestconfConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,43 +63,47 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
     private void handleHttpRequest(final ChannelHandlerContext ctx, final FullHttpRequest req) {
         // Handle a bad request.
         if (!req.decoderResult().isSuccess()) {
-            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
+            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
+                    HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
             return;
         }
 
         // Allow only GET methods.
-        if (req.method() != GET) {
-            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
+        if (req.method() != HttpMethod.GET) {
+            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
+                    HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN));
             return;
         }
 
-        final String streamName = Notificator.createStreamNameFromUri(req.uri());
+        final String streamName = ListenersBroker.createStreamNameFromUri(req.uri());
         if (streamName.contains(RestconfConstants.DATA_SUBSCR)) {
-            final ListenerAdapter listener = Notificator.getListenerFor(streamName);
-            if (listener != null) {
-                listener.addSubscriber(ctx.channel());
+            final Optional<ListenerAdapter> listener =
+                    ListenersBroker.getInstance().getDataChangeListenerFor(streamName);
+            if (listener.isPresent()) {
+                listener.get().addSubscriber(ctx.channel());
                 LOG.debug("Subscriber successfully registered.");
             } else {
                 LOG.error("Listener for stream with name '{}' was not found.", streamName);
-                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
+                        HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
             }
         } else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) {
-            final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
-            if (listeners != null && !listeners.isEmpty()) {
-                for (final NotificationListenerAdapter listener : listeners) {
-                    listener.addSubscriber(ctx.channel());
-                    LOG.debug("Subscriber successfully registered.");
-                }
+            final Optional<NotificationListenerAdapter> listener =
+                    ListenersBroker.getInstance().getNotificationListenerFor(streamName);
+            if (listener.isPresent()) {
+                listener.get().addSubscriber(ctx.channel());
+                LOG.debug("Subscriber successfully registered.");
             } else {
                 LOG.error("Listener for stream with name '{}' was not found.", streamName);
-                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+                sendHttpResponse(ctx, req, new DefaultFullHttpResponse(
+                        HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
             }
         }
 
         // Handshake
         final WebSocketServerHandshakerFactory wsFactory =
                 new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
-                null, false);
+                        null, false);
         this.handshaker = wsFactory.newHandshaker(req);
         if (this.handshaker == null) {
             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
@@ -122,17 +121,17 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
      * @param res FullHttpResponse
      */
     private static void sendHttpResponse(final ChannelHandlerContext ctx, final HttpRequest req,
-            final FullHttpResponse res) {
+                                         final FullHttpResponse res) {
         // Generate an error page if response getStatus code is not OK (200).
-        final boolean notOkay = !OK.equals(res.status());
+        final boolean notOkay = !HttpResponseStatus.OK.equals(res.status());
         if (notOkay) {
             res.content().writeCharSequence(res.status().toString(), CharsetUtil.UTF_8);
-            setContentLength(res, res.content().readableBytes());
+            HttpUtil.setContentLength(res, res.content().readableBytes());
         }
 
         // Send the response and close the connection if necessary.
         final ChannelFuture f = ctx.channel().writeAndFlush(res);
-        if (notOkay || !isKeepAlive(req)) {
+        if (notOkay || !HttpUtil.isKeepAlive(req)) {
             f.addListener(ChannelFutureListener.CLOSE);
         }
     }
@@ -146,26 +145,31 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
     private void handleWebSocketFrame(final ChannelHandlerContext ctx, final WebSocketFrame frame) {
         if (frame instanceof CloseWebSocketFrame) {
             this.handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
-            final String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
+            final String streamName = ListenersBroker.createStreamNameFromUri(
+                    ((CloseWebSocketFrame) frame).reasonText());
             if (streamName.contains(RestconfConstants.DATA_SUBSCR)) {
-                final ListenerAdapter listener = Notificator.getListenerFor(streamName);
-                if (listener != null) {
-                    listener.removeSubscriber(ctx.channel());
-                    LOG.debug("Subscriber successfully registered.");
-                    Notificator.removeListenerIfNoSubscriberExists(listener);
+                final Optional<ListenerAdapter> listener = ListenersBroker.getInstance()
+                        .getDataChangeListenerFor(streamName);
+                if (listener.isPresent()) {
+                    listener.get().removeSubscriber(ctx.channel());
+                    LOG.debug("Subscriber successfully removed.");
+                    if (!listener.get().hasSubscribers()) {
+                        ListenersBroker.getInstance().removeAndCloseDataChangeListener(listener.get());
+                    }
                 }
             } else if (streamName.contains(RestconfConstants.NOTIFICATION_STREAM)) {
-                final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
-                if (listeners != null && !listeners.isEmpty()) {
-                    for (final NotificationListenerAdapter listener : listeners) {
-                        listener.removeSubscriber(ctx.channel());
+                final Optional<NotificationListenerAdapter> listener
+                        = ListenersBroker.getInstance().getNotificationListenerFor(streamName);
+                if (listener.isPresent()) {
+                    listener.get().removeSubscriber(ctx.channel());
+                    LOG.debug("Subscriber successfully removed.");
+                    if (!listener.get().hasSubscribers()) {
+                        ListenersBroker.getInstance().removeAndCloseNotificationListener(listener.get());
                     }
                 }
             }
-            return;
         } else if (frame instanceof PingWebSocketFrame) {
             ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
-            return;
         }
     }
 
@@ -177,10 +181,10 @@ public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
     /**
      * Get web socket location from HTTP request.
      *
-     * @param req HTTP request from which the location will be returned
+     * @param req HTTP request from which the location will be returned.
      * @return String representation of web socket location.
      */
     private static String getWebSocketLocation(final HttpRequest req) {
-        return "ws://" + req.headers().get(HOST) + req.uri();
+        return "ws://" + req.headers().get(HttpHeaderNames.HOST) + req.uri();
     }
-}
+}
\ No newline at end of file
index 5ab8639b2164f4b6a2656bb55a7642a5156b5bd0..de8aaf6521b372b38c1c99269225026bdfbce6dd 100644 (file)
@@ -15,17 +15,16 @@ import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
 
 /**
- * {@link WebSocketServerInitializer} is used to setup the {@link ChannelPipeline} of a {@link io.netty.channel.Channel}
- * .
+ * {@link WebSocketServerInitializer} is used to setup the {@link ChannelPipeline} of
+ * a {@link io.netty.channel.Channel}.
  */
 public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
 
     @Override
-    protected void initChannel(final SocketChannel ch) {
-        ChannelPipeline pipeline = ch.pipeline();
+    protected void initChannel(final SocketChannel channel) {
+        ChannelPipeline pipeline = channel.pipeline();
         pipeline.addLast("codec-http", new HttpServerCodec());
         pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
         pipeline.addLast("handler", new WebSocketServerHandler());
     }
-
-}
+}
\ No newline at end of file
index f28981dde174fc07528066ca68f29c4216842c3b..64751f82d56dff303b21af684d865f3b651df1f5 100644 (file)
@@ -15,15 +15,16 @@ import static org.mockito.Mockito.when;
 import static org.opendaylight.yangtools.util.concurrent.FluentFutures.immediateTrueFluentFuture;
 
 import com.google.common.collect.ImmutableClassToInstanceMap;
-import java.lang.reflect.Field;
+import java.io.FileNotFoundException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
@@ -32,6 +33,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.opendaylight.mdsal.common.api.CommitInfo;
 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
@@ -49,17 +51,18 @@ import org.opendaylight.restconf.nb.rfc8040.handlers.NotificationServiceHandler;
 import org.opendaylight.restconf.nb.rfc8040.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.nb.rfc8040.handlers.TransactionChainHandler;
 import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenerAdapter;
-import org.opendaylight.restconf.nb.rfc8040.streams.listeners.Notificator;
+import org.opendaylight.restconf.nb.rfc8040.streams.listeners.ListenersBroker;
 import org.opendaylight.restconf.nb.rfc8040.utils.parser.IdentifierCodec;
 import org.opendaylight.yang.gen.v1.urn.sal.restconf.event.subscription.rev140708.NotificationOutputTypeGrouping.NotificationOutputType;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.test.util.YangParserTestUtils;
 
 public class RestconfStreamsSubscriptionServiceImplTest {
 
     private static final String URI = "/restconf/18/data/ietf-restconf-monitoring:restconf-state/streams/stream/"
             + "toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE";
-    private static Field listenersByStreamName;
 
     @Mock
     private DOMDataBrokerHandler dataBrokerHandler;
@@ -73,7 +76,7 @@ public class RestconfStreamsSubscriptionServiceImplTest {
 
     @SuppressWarnings("unchecked")
     @Before
-    public void setUp() throws Exception {
+    public void setUp() throws FileNotFoundException, URISyntaxException {
         MockitoAnnotations.initMocks(this);
 
         final DOMTransactionChain domTx = mock(DOMTransactionChain.class);
@@ -107,6 +110,8 @@ public class RestconfStreamsSubscriptionServiceImplTest {
         final UriBuilder baseUriBuilder = new LocalUriInfo().getBaseUriBuilder();
         when(uriInfo.getBaseUri()).thenReturn(baseUriBuilder.build());
         when(uriInfo.getBaseUriBuilder()).thenReturn(baseUriBuilder);
+        final URI uri = new URI("http://127.0.0.1/" + URI);
+        when(uriInfo.getAbsolutePath()).thenReturn(uri);
         this.schemaHandler.onGlobalContextUpdated(
                 YangParserTestUtils.parseYangFiles(TestRestconfUtils.loadFiles("/notifications")));
     }
@@ -124,29 +129,31 @@ public class RestconfStreamsSubscriptionServiceImplTest {
     }
 
     @BeforeClass
-    public static void setUpBeforeTest() throws Exception {
+    public static void setUpBeforeTest() {
         final Map<String, ListenerAdapter> listenersByStreamNameSetter = new HashMap<>();
         final ListenerAdapter adapter = mock(ListenerAdapter.class);
+        final YangInstanceIdentifier yiid = mock(YangInstanceIdentifier.class);
+        final YangInstanceIdentifier.PathArgument lastPathArgument = mock(YangInstanceIdentifier.PathArgument.class);
+        final QName qname = QName.create("toaster", "2009-11-20", "toasterStatus");
+        Mockito.when(adapter.getPath()).thenReturn(yiid);
+        Mockito.when(adapter.getOutputType()).thenReturn("JSON");
+        Mockito.when(yiid.getLastPathArgument()).thenReturn(lastPathArgument);
+        Mockito.when(lastPathArgument.getNodeType()).thenReturn(qname);
         listenersByStreamNameSetter.put(
                 "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
                 adapter);
-        listenersByStreamName = Notificator.class.getDeclaredField("dataChangeListener");
-
-        listenersByStreamName.setAccessible(true);
-        listenersByStreamName.set(Notificator.class, listenersByStreamNameSetter);
+        ListenersBroker.getInstance().setDataChangeListeners(listenersByStreamNameSetter);
     }
 
     @AfterClass
-    public static void setUpAfterTest() throws Exception {
-        listenersByStreamName.set(Notificator.class, null);
-        listenersByStreamName.set(Notificator.class, new ConcurrentHashMap<>());
-        listenersByStreamName.setAccessible(false);
+    public static void setUpAfterTest() {
+        ListenersBroker.getInstance().setDataChangeListeners(Collections.emptyMap());
     }
 
     @Test
-    public void testSubscribeToStream() throws Exception {
+    public void testSubscribeToStream() {
         final UriBuilder uriBuilder = UriBuilder.fromUri(URI);
-        Notificator.createListener(
+        ListenersBroker.getInstance().registerDataChangeListener(
                 IdentifierCodec.deserialize("toaster:toaster/toasterStatus", this.schemaHandler.get()),
                 "data-change-event-subscription/toaster:toaster/toasterStatus/datastore=OPERATIONAL/scope=ONE",
                 NotificationOutputType.XML);
index 4021be2c8d66ef4e4ce82b0fc6b84127cfb9b2f7..473d6af1cec8ce6bc40dffa59c435e22574f29d7 100644 (file)
@@ -17,7 +17,6 @@ import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.json.JSONObject;
@@ -87,7 +86,7 @@ public class ListenerAdapterTest extends AbstractConcurrentDataBrokerTest {
                               final NotificationOutputTypeGrouping.NotificationOutputType outputType,
                               final boolean leafNodesOnly) {
             super(path, streamName, outputType);
-            setQueryParams(EPOCH, Optional.empty(), Optional.empty(), leafNodesOnly);
+            setQueryParams(EPOCH, null, null, leafNodesOnly);
         }
 
         @Override
index 33328bc2d11a80e0b48a98e6bf015583f6e07c32..8b1ede41166f2e2dc9d986fa36cf9d05f3150148 100644 (file)
@@ -18,7 +18,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.junit.Before;
@@ -215,14 +214,10 @@ public class NotificationListenerTest {
         return child;
     }
 
-    private String prepareJson(final DOMNotification notificationData, final SchemaPath schemaPathNotifi)
-            throws Exception {
-        final List<SchemaPath> paths = new ArrayList<>();
-        paths.add(schemaPathNotifi);
-        final List<NotificationListenerAdapter> listNotifi =
-                Notificator.createNotificationListener(paths, "stream-name", NotificationOutputType.JSON.toString());
-        final NotificationListenerAdapter notifi = listNotifi.get(0);
-        final String result = notifi.prepareJson(schmeaCtx, notificationData);
+    private String prepareJson(final DOMNotification notificationData, final SchemaPath schemaPathNotifi) {
+        final NotificationListenerAdapter notifiAdapter = ListenersBroker.getInstance().registerNotificationListener(
+                schemaPathNotifi, "stream-name", NotificationOutputType.JSON);
+        final String result = notifiAdapter.prepareJson(schmeaCtx, notificationData);
         return Preconditions.checkNotNull(result);
     }
 }