Bug 5528 - Subscribing to stream impl 00/40100/12
authorJakub Toth <jatoth@cisco.com>
Thu, 9 Jun 2016 09:48:48 +0000 (11:48 +0200)
committerJakub Toth <jatoth@cisco.com>
Tue, 28 Jun 2016 12:24:33 +0000 (12:24 +0000)
  *preparing dom data broker handler

Change-Id: Id90c13e8ebba1b60237a45808c973ccdf3a8dcda
Signed-off-by: Jakub Toth <jatoth@cisco.com>
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/RestConnectorProvider.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/handlers/DOMDataBrokerHandler.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/services/impl/RestconfStreamsSubscriptionServiceImpl.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/CreateStreamUtil.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/RestconfStreamsConstants.java
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/StreamUtil.java [new file with mode: 0644]
restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java [new file with mode: 0644]

index 9ed9fbff8cb68492d3debb81622164094ff71139..b6c14eb0f6baa4a9c1a93410d2264f8558961fd5 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.sal.core.api.Provider;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.netconf.sal.rest.api.RestConnector;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
 import org.opendaylight.restconf.handlers.DOMMountPointServiceHandler;
 import org.opendaylight.restconf.handlers.SchemaContextHandler;
 import org.opendaylight.restconf.handlers.TransactionChainHandler;
@@ -74,6 +75,8 @@ public class RestConnectorProvider implements Provider, RestConnector, AutoClose
         this.transactionChain = this.dataBroker.createTransactionChain(this.transactionListener);
         final TransactionChainHandler transactionChainHandler = new TransactionChainHandler(this.transactionChain);
 
+        final DOMDataBrokerHandler brokerHandler = new DOMDataBrokerHandler(this.dataBroker);
+
         wrapperServices.setHandlers(schemaCtxHandler, domMountPointServiceHandler);
     }
 
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/handlers/DOMDataBrokerHandler.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/handlers/DOMDataBrokerHandler.java
new file mode 100644 (file)
index 0000000..ab77740
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.handlers;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+
+/**
+ * Implementation of {@link DOMDataBrokerHandler}
+ *
+ */
+public class DOMDataBrokerHandler implements Handler<DOMDataBroker> {
+
+    private final DOMDataBroker broker;
+
+    public DOMDataBrokerHandler(final DOMDataBroker broker) {
+        this.broker = broker;
+    }
+
+    @Override
+    public DOMDataBroker get() {
+        return this.broker;
+    }
+
+}
index 9561ecf84339dd8f0c1119b3c340ae837d6ff850..3ff4e837be685f357fd66ca6747e87ea6b740f39 100644 (file)
@@ -7,15 +7,73 @@
  */
 package org.opendaylight.restconf.restful.services.impl;
 
+import com.google.common.base.Strings;
+import java.net.URI;
+import java.util.Map;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfDocumentedException;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
+import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
+import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.Notificator;
+import org.opendaylight.restconf.handlers.DOMDataBrokerHandler;
 import org.opendaylight.restconf.restful.services.api.RestconfStreamsSubscriptionService;
+import org.opendaylight.restconf.restful.utils.RestconfStreamsConstants;
+import org.opendaylight.restconf.restful.utils.SubscribeToStreamUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Implementation of {@link RestconfStreamsSubscriptionService}
+ *
+ */
 public class RestconfStreamsSubscriptionServiceImpl implements RestconfStreamsSubscriptionService {
 
+    private static final Logger LOG = LoggerFactory.getLogger(RestconfStreamsSubscriptionServiceImpl.class);
+
+    private DOMDataBrokerHandler domDataBrokerHandler;
+
     @Override
     public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
-        throw new UnsupportedOperationException("Not yet implemented");
-    }
+        final Map<String, String> mapOfValues = SubscribeToStreamUtil.mapValuesFromUri(identifier);
+
+        final LogicalDatastoreType ds = SubscribeToStreamUtil.parseURIEnum(LogicalDatastoreType.class,
+                mapOfValues.get(RestconfStreamsConstants.DATASTORE_PARAM_NAME));
+        if (ds == null) {
+            final String msg = "Stream name doesn't contains datastore value (pattern /datastore=)";
+            LOG.debug(msg);
+            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        }
 
+        final DataChangeScope scope = SubscribeToStreamUtil.parseURIEnum(DataChangeScope.class,
+                mapOfValues.get(RestconfStreamsConstants.SCOPE_PARAM_NAME));
+        if (scope == null) {
+            final String msg = "Stream name doesn't contains datastore value (pattern /scope=)";
+            LOG.warn(msg);
+            throw new RestconfDocumentedException(msg, ErrorType.APPLICATION, ErrorTag.MISSING_ATTRIBUTE);
+        }
+
+        final String streamName = Notificator.createStreamNameFromUri(identifier);
+        if (Strings.isNullOrEmpty(streamName)) {
+            final String msg = "Stream name is empty.";
+            LOG.warn(msg);
+            throw new RestconfDocumentedException(msg, ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+        }
+
+        final ListenerAdapter listener = Notificator.getListenerFor(streamName);
+        SubscribeToStreamUtil.registration(ds, scope, listener, this.domDataBrokerHandler.get());
+
+        final int port = SubscribeToStreamUtil.prepareNotificationPort();
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        final UriBuilder uriToWebSocketServer = uriBuilder.port(port).scheme(RestconfStreamsConstants.SCHEMA_SUBSCIBRE_URI);
+        final URI uri = uriToWebSocketServer.replacePath(streamName).build();
+
+        return Response.status(Status.OK).location(uri).build();
+    }
 }
index 2a6cd37d2fd6bd3a3ff5e94ae62505971d3cddd4..03b04182471fdf85202d26a81729d6d74af3fc5e 100644 (file)
@@ -143,16 +143,7 @@ public final class CreateStreamUtil {
             return null;
         }
 
-        return resolveEnum(clazz, (String) value);
-    }
-
-    private static <T> T resolveEnum(final Class<T> clazz, final String value) {
-        for (final T t : clazz.getEnumConstants()) {
-            if (((Enum<?>) t).name().equals(value)) {
-                return t;
-            }
-        }
-        return null;
+        return StreamUtil.resolveEnum(clazz, (String) value);
     }
 
     private static YangInstanceIdentifier preparePath(final NormalizedNodeContext payload, final ContainerNode data,
index 3cab59b69d3fb322551528474b9a0913a96ceb36..c5dc5d251334c8cf953427671dcb218aabd7b206 100644 (file)
@@ -49,11 +49,15 @@ public final class RestconfStreamsConstants {
 
     public static final String SCOPE_PARAM_NAME = "scope";
 
-    public static final String DS_URI = RestconfConstants.SLASH + DATASTORE_PARAM_NAME
-            + ParserBuilderConstants.Deserializer.EQUAL;
+    public static final char EQUAL = ParserBuilderConstants.Deserializer.EQUAL;
 
-    public static final String SCOPE_URI = RestconfConstants.SLASH + SCOPE_PARAM_NAME
-            + ParserBuilderConstants.Deserializer.EQUAL;
+    public static final String DS_URI = RestconfConstants.SLASH + DATASTORE_PARAM_NAME + EQUAL;
+
+    public static final String SCOPE_URI = RestconfConstants.SLASH + SCOPE_PARAM_NAME + EQUAL;
+
+    public static final int NOTIFICATION_PORT = 8181;
+
+    public static final String SCHEMA_SUBSCIBRE_URI = "ws";
 
     static {
         Date eventSubscriptionAugRevision;
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/StreamUtil.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/StreamUtil.java
new file mode 100644 (file)
index 0000000..38cfc32
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.restful.utils;
+
+/**
+ * Common util class for stream
+ * <ul>
+ * <li>{@link SubscribeToStreamUtil}
+ * <li>{@link CreateStreamUtil}
+ * </ul>
+ *
+ */
+public final class StreamUtil {
+
+    private StreamUtil() {
+        throw new UnsupportedOperationException("Util class");
+    }
+
+    /**
+     * Resolve specific type of enum by value.
+     *
+     * @param clazz
+     *            - enum
+     * @param value
+     *            - string of enum
+     * @return - enum
+     */
+    public static <T> T resolveEnum(final Class<T> clazz, final String value) {
+        for (final T t : clazz.getEnumConstants()) {
+            if (((Enum<?>) t).name().equals(value)) {
+                return t;
+            }
+        }
+        return null;
+    }
+}
diff --git a/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java b/restconf/sal-rest-connector/src/main/java/org/opendaylight/restconf/restful/utils/SubscribeToStreamUtil.java
new file mode 100644 (file)
index 0000000..c07551c
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.restconf.restful.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
+import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
+import org.opendaylight.restconf.utils.RestconfConstants;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Subscribe to stream util class
+ *
+ */
+public final class SubscribeToStreamUtil {
+
+    private SubscribeToStreamUtil() {
+        throw new UnsupportedOperationException("Util class");
+    }
+
+    /**
+     * Parse enum from URI
+     *
+     * @param clazz
+     *            - enum type
+     * @param value
+     *            - string of enum value
+     * @return enum
+     */
+    public static <T> T parseURIEnum(final Class<T> clazz, final String value) {
+        if ((value == null) || value.equals("")) {
+            return null;
+        }
+        return StreamUtil.resolveEnum(clazz, value);
+    }
+
+    /**
+     * Prepare map of values from URI
+     *
+     * @param identifier
+     *            - URI
+     * @return {@link Map}
+     */
+    public static Map<String, String> mapValuesFromUri(final String identifier) {
+        final HashMap<String, String> result = new HashMap<>();
+        final String[] tokens = identifier.split(String.valueOf(RestconfConstants.SLASH));
+        for (final String token : tokens) {
+            final String[] paramToken = token.split(String.valueOf(RestconfStreamsConstants.EQUAL));
+            if (paramToken.length == 2) {
+                result.put(paramToken[0], paramToken[1]);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Register data change listener in dom data broker and set it to listener
+     * on stream
+     *
+     * @param ds
+     *            - {@link LogicalDatastoreType}
+     * @param scope
+     *            - {@link DataChangeScope}
+     * @param listener
+     *            - listener on specific stream
+     * @param domDataBroker
+     *            - data broker for register data change listener
+     */
+    public static void registration(final LogicalDatastoreType ds, final DataChangeScope scope,
+            final ListenerAdapter listener, final DOMDataBroker domDataBroker) {
+        if (listener.isListening()) {
+            return;
+        }
+
+        final YangInstanceIdentifier path = listener.getPath();
+        final ListenerRegistration<DOMDataChangeListener> registration = domDataBroker.registerDataChangeListener(ds,
+                path, listener, scope);
+
+        listener.setRegistration(registration);
+    }
+
+    /**
+     * Get port from web socket server. If doesn't exit, create it.
+     *
+     * @return port
+     */
+    public static int prepareNotificationPort() {
+        int port = RestconfStreamsConstants.NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServer = WebSocketServer.getInstance();
+            port = webSocketServer.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(RestconfStreamsConstants.NOTIFICATION_PORT);
+        }
+        return port;
+    }
+
+}