Bug 3959 - support netconf notification
[netconf.git] / restconf / sal-rest-connector / src / main / java / org / opendaylight / netconf / sal / restconf / impl / RestconfImpl.java
index 493dbb99c433f2f1d510f5221f12f25b518d1fae..7226a0abb22b357d4da42ab386cc66f46664296f 100644 (file)
@@ -26,9 +26,12 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -56,6 +59,7 @@ import org.opendaylight.netconf.sal.rest.api.RestconfService;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorTag;
 import org.opendaylight.netconf.sal.restconf.impl.RestconfError.ErrorType;
 import org.opendaylight.netconf.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.netconf.sal.streams.listeners.NotificationListenerAdapter;
 import org.opendaylight.netconf.sal.streams.listeners.Notificator;
 import org.opendaylight.netconf.sal.streams.websockets.WebSocketServer;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -68,6 +72,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.AugmentationNode;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -86,6 +91,7 @@ import org.opendaylight.yangtools.yang.model.api.LeafListSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.LeafSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.ListSchemaNode;
 import org.opendaylight.yangtools.yang.model.api.Module;
+import org.opendaylight.yangtools.yang.model.api.NotificationDefinition;
 import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.opendaylight.yangtools.yang.model.api.SchemaNode;
@@ -134,6 +140,12 @@ public class RestconfImpl implements RestconfService {
 
     private static final YangInstanceIdentifier.AugmentationIdentifier SAL_REMOTE_AUG_IDENTIFIER;
 
+    public static final CharSequence DATA_SUBSCR = "data-change-event-subscription";
+    private static final CharSequence CREATE_DATA_SUBSCR = "create-" + DATA_SUBSCR;
+
+    public static final CharSequence NOTIFICATION_STREAM = "notification-stream";
+    private static final CharSequence CREATE_NOTIFICATION_STREAM = "create-" + NOTIFICATION_STREAM;
+
     static {
         try {
             final Date eventSubscriptionAugRevision = new SimpleDateFormat("yyyy-MM-dd").parse("2014-07-08");
@@ -378,7 +390,15 @@ public class RestconfImpl implements RestconfService {
             response = mountRpcServices.get().invokeRpc(type, payload.getData());
         } else {
             if (namespace.toString().equals(SAL_REMOTE_NAMESPACE)) {
-                response = invokeSalRemoteRpcSubscribeRPC(payload);
+                if (identifier.contains(CREATE_DATA_SUBSCR)) {
+                    response = invokeSalRemoteRpcSubscribeRPC(payload);
+                } else if (identifier.contains(CREATE_NOTIFICATION_STREAM)) {
+                    response = invokeSalRemoteRpcNotifiStrRPC(payload);
+                } else {
+                    final String msg = "Not supported operation";
+                    LOG.warn(msg);
+                    throw new RestconfDocumentedException(msg, ErrorType.RPC, ErrorTag.OPERATION_NOT_SUPPORTED);
+                }
             } else {
                 response = this.broker.invokeRpc(type, payload.getData());
             }
@@ -468,9 +488,10 @@ public class RestconfImpl implements RestconfService {
         }
 
         final YangInstanceIdentifier pathIdentifier = ((YangInstanceIdentifier) pathValue);
-        String streamName = null;
+        String streamName = (String) CREATE_DATA_SUBSCR;
         if (!pathIdentifier.isEmpty()) {
-            final String fullRestconfIdentifier = this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
+            final String fullRestconfIdentifier = DATA_SUBSCR
+                    + this.controllerContext.toFullRestconfIdentifier(pathIdentifier, null);
 
             LogicalDatastoreType datastore = parseEnumTypeParameter(value, LogicalDatastoreType.class, DATASTORE_PARAM_NAME);
             datastore = datastore == null ? DEFAULT_DATASTORE : datastore;
@@ -877,6 +898,64 @@ public class RestconfImpl implements RestconfService {
      */
     @Override
     public Response subscribeToStream(final String identifier, final UriInfo uriInfo) {
+        if (identifier.contains(DATA_SUBSCR)) {
+            return dataSubs(identifier, uriInfo);
+        } else if (identifier.contains(NOTIFICATION_STREAM)) {
+            return notifStream(identifier, uriInfo);
+        }
+        final String msg = "Bad type of notification of sal-remote";
+        LOG.warn(msg);
+        throw new RestconfDocumentedException(msg);
+    }
+
+    /**
+     * Register notification listener by stream name
+     *
+     * @param identifier
+     *            - stream name
+     * @param uriInfo
+     *            - uriInfo
+     * @return {@link Response}
+     */
+    private Response notifStream(final String identifier, final UriInfo uriInfo) {
+        final String streamName = Notificator.createStreamNameFromUri(identifier);
+        if (Strings.isNullOrEmpty(streamName)) {
+            throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
+        }
+        final List<NotificationListenerAdapter> listeners = Notificator.getNotificationListenerFor(streamName);
+        if ((listeners == null) || listeners.isEmpty()) {
+            throw new RestconfDocumentedException("Stream was not found.", ErrorType.PROTOCOL,
+                    ErrorTag.UNKNOWN_ELEMENT);
+        }
+
+        for (final NotificationListenerAdapter listener : listeners) {
+            this.broker.registerToListenNotification(listener);
+        }
+
+        final UriBuilder uriBuilder = uriInfo.getAbsolutePathBuilder();
+        int notificationPort = NOTIFICATION_PORT;
+        try {
+            final WebSocketServer webSocketServerInstance = WebSocketServer.getInstance();
+            notificationPort = webSocketServerInstance.getPort();
+        } catch (final NullPointerException e) {
+            WebSocketServer.createInstance(NOTIFICATION_PORT);
+        }
+        final UriBuilder uriToWebsocketServerBuilder = uriBuilder.port(notificationPort).scheme("ws");
+        final URI uriToWebsocketServer = uriToWebsocketServerBuilder.replacePath(streamName).build();
+
+        return Response.status(Status.OK).location(uriToWebsocketServer).build();
+    }
+
+    /**
+     * Register data change listener by stream name
+     *
+     * @param identifier
+     *            - stream name
+     * @param uriInfo
+     *            - uri info
+     * @return {@link Response}
+     */
+    private Response dataSubs(final String identifier, final UriInfo uriInfo) {
         final String streamName = Notificator.createStreamNameFromUri(identifier);
         if (Strings.isNullOrEmpty(streamName)) {
             throw new RestconfDocumentedException("Stream name is empty.", ErrorType.PROTOCOL, ErrorTag.INVALID_VALUE);
@@ -935,7 +1014,7 @@ public class RestconfImpl implements RestconfService {
     /**
      * Load parameter for subscribing to stream from input composite node
      *
-     * @param compNode
+     * @param value
      *            contains value
      * @return enum object if its string value is equal to {@code paramName}. In other cases null.
      */
@@ -1104,4 +1183,69 @@ public class RestconfImpl implements RestconfService {
 
         return streamNodeValues.build();
     }
+
+    /**
+     * Prepare stream for notification
+     *
+     * @param payload
+     *            - contains list of qnames of notifications
+     * @return - checked future object
+     */
+    private CheckedFuture<DOMRpcResult, DOMRpcException> invokeSalRemoteRpcNotifiStrRPC(
+            final NormalizedNodeContext payload) {
+        final ContainerNode data = (ContainerNode) payload.getData();
+        LeafSetNode leafSet = null;
+        String outputType = "XML";
+        for (final DataContainerChild<? extends PathArgument, ?> dataChild : data.getValue()) {
+            if (dataChild instanceof LeafSetNode) {
+                leafSet = (LeafSetNode) dataChild;
+            } else if (dataChild instanceof AugmentationNode) {
+                outputType = (String) (((AugmentationNode) dataChild).getValue()).iterator().next().getValue();
+            }
+        }
+
+        final Collection<LeafSetEntryNode> entryNodes = leafSet.getValue();
+        final List<SchemaPath> paths = new ArrayList<>();
+        String streamName = CREATE_NOTIFICATION_STREAM + "/";
+
+        final Iterator<LeafSetEntryNode> iterator = entryNodes.iterator();
+        while (iterator.hasNext()) {
+            final QName valueQName = QName.create((String) iterator.next().getValue());
+            final Module module = ControllerContext.getInstance()
+                    .findModuleByNamespace(valueQName.getModule().getNamespace());
+            Preconditions.checkNotNull(module, "Module for namespace " + valueQName.getModule().getNamespace()
+                    + " does not exist");
+            NotificationDefinition notifiDef = null;
+            for (final NotificationDefinition notification : module.getNotifications()) {
+                if (notification.getQName().equals(valueQName)) {
+                    notifiDef = notification;
+                    break;
+                }
+            }
+            final String moduleName = module.getName();
+            Preconditions.checkNotNull(notifiDef,
+                    "Notification " + valueQName + "doesn't exist in module " + moduleName);
+            paths.add(notifiDef.getPath());
+            streamName = streamName + moduleName + ":" + valueQName.getLocalName();
+            if (iterator.hasNext()) {
+                streamName = streamName + ",";
+            }
+        }
+
+        final QName rpcQName = payload.getInstanceIdentifierContext().getSchemaNode().getQName();
+        final QName outputQname = QName.create(rpcQName, "output");
+        final QName streamNameQname = QName.create(rpcQName, "notification-stream-identifier");
+
+        final ContainerNode output = ImmutableContainerNodeBuilder.create()
+                .withNodeIdentifier(new NodeIdentifier(outputQname))
+                .withChild(ImmutableNodes.leafNode(streamNameQname, streamName)).build();
+
+        if (!Notificator.existNotificationListenerFor(streamName)) {
+            Notificator.createNotificationListener(paths, streamName, outputType);
+        }
+
+        final DOMRpcResult defaultDOMRpcResult = new DefaultDOMRpcResult(output);
+
+        return Futures.immediateCheckedFuture(defaultDOMRpcResult);
+    }
 }