Added resource /streams/stream/<streamName> 94/5094/9
authorMartin Sunal <msunal@cisco.com>
Sun, 2 Feb 2014 23:26:58 +0000 (00:26 +0100)
committerMartin Sunal <msunal@cisco.com>
Tue, 4 Feb 2014 20:35:05 +0000 (21:35 +0100)
- added netty web socket server - port 8181

Subscriber can be registered for notifications in this flow:
1. Subscriber invokes RPC create-data-change-event-subscription with input element regarding to module sal-remote revision "2014-01-14"
2. Subscriber gets sream name as a response from server
3. Subscriber calls GET operation on /streams/stream/<streamName>
4. Subscriber gets response with status code 200 OK and header field Location contains URI of web socket server
5. Subscriber creates web socket http connection to obtained URI in step 4.
6. Subscriber can be unregistered from receiving notifications by sending CloseWebSocketFrame with reasonText containing stream name

Change-Id: I92925f5a50b2ceadf2cc8f0eaee2c99dd6a63d8d
Signed-off-by: Martin Sunal <msunal@cisco.com>
20 files changed:
opendaylight/commons/opendaylight/pom.xml
opendaylight/distribution/opendaylight/pom.xml
opendaylight/md-sal/sal-rest-connector/pom.xml
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestconfService.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonMapper.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProvider.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.xtend
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.xtend
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.xtend
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestOperationUtils.java
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/IClientMessageCallback.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClient.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClientHandler.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/test/RestStream.java [new file with mode: 0644]
opendaylight/md-sal/sal-rest-connector/src/test/resources/full-versions/yangs/sal-remote@2014-01-14.yang [new file with mode: 0644]

index 9d4f838..d2e9048 100644 (file)
             <artifactId>netty-common</artifactId>
             <version>${netty.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-codec-http</artifactId>
+            <version>${netty.version}</version>
+        </dependency>
 
       <!-- yangtools dependencies -->
       <dependency>
index 28637bc..5aba5ca 100644 (file)
       <groupId>io.netty</groupId>
       <artifactId>netty-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http</artifactId>
+    </dependency>
 
     <!-- testing dependencies I'm pretty sure we should trim -->
     <dependency>
index ce3b05c..a4d2f4c 100644 (file)
       <version>2.2.4</version>
     </dependency>
     <dependency>
-     <groupId>org.opendaylight.yangtools</groupId>
-     <artifactId>yang-parser-impl</artifactId>
+      <groupId>org.opendaylight.yangtools</groupId>
+      <artifactId>yang-parser-impl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http</artifactId>
+      <version>4.0.10.Final</version>
     </dependency>
     
     <!-- Testing Dependencies -->
index 347b09c..cef5df9 100644 (file)
@@ -16,8 +16,10 @@ import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
 
 import org.opendaylight.controller.sal.restconf.impl.StructuredData;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
@@ -134,4 +136,8 @@ public interface RestconfService {
     @Path("/config/{identifier:.+}")
     public Response deleteConfigurationData(@PathParam("identifier") String identifier);
 
+    @GET
+    @Path("/streams/stream/{identifier:.+}")
+    public Response subscribeToStream(@PathParam("identifier") String identifier, @Context UriInfo uriInfo);
+
 }
index 226eff4..d1441d7 100644 (file)
@@ -203,7 +203,6 @@ class JsonMapper {
                     + baseType.getClass().getSimpleName() + ".");
         }
 
-        // TODO check InstanceIdentifierTypeDefinition
         if (baseType instanceof IdentityrefTypeDefinition) {
             if (node.getValue() instanceof QName) {
                 IdentityValuesDTO valueDTO = (IdentityValuesDTO) RestCodec.from(baseType, mountPoint).serialize(
index ea5108e..1870bdf 100644 (file)
@@ -18,6 +18,7 @@ import org.opendaylight.controller.sal.core.api.model.SchemaService;
 import org.opendaylight.controller.sal.core.api.mount.MountService;
 import org.opendaylight.controller.sal.restconf.impl.BrokerFacade;
 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
+import org.opendaylight.controller.sal.streams.websockets.WebSocketServer;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.model.api.SchemaServiceListener;
 import org.osgi.framework.BundleActivator;
@@ -34,6 +35,7 @@ public class RestconfProvider implements BundleActivator, Provider, ServiceTrack
     private ServiceTracker<Broker, Broker> brokerServiceTrancker;
     private BundleContext bundleContext;
     private ProviderSession session;
+    private Thread webSocketServerThread;
 
     @Override
     public void onSessionInitiated(ProviderSession session) {
@@ -53,6 +55,9 @@ public class RestconfProvider implements BundleActivator, Provider, ServiceTrack
         bundleContext = context;
         brokerServiceTrancker = new ServiceTracker<>(context, Broker.class, this);
         brokerServiceTrancker.open();
+        webSocketServerThread = new Thread(new WebSocketServer());
+        webSocketServerThread.setName("Web socket server");
+        webSocketServerThread.start();
     }
 
     @Override
@@ -64,6 +69,7 @@ public class RestconfProvider implements BundleActivator, Provider, ServiceTrack
                 e.printStackTrace();
             }
         }
+        webSocketServerThread.interrupt();
         session.close();
         brokerServiceTrancker.close();
     }
index 67429f3..d305006 100644 (file)
@@ -11,13 +11,14 @@ import javax.ws.rs.core.Response
 import org.opendaylight.controller.md.sal.common.api.data.DataReader
 import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession
 import org.opendaylight.controller.sal.core.api.data.DataBrokerService
+import org.opendaylight.controller.sal.core.api.mount.MountInstance
 import org.opendaylight.controller.sal.rest.impl.RestconfProvider
+import org.opendaylight.controller.sal.streams.listeners.ListenerAdapter
 import org.opendaylight.yangtools.yang.common.QName
 import org.opendaylight.yangtools.yang.common.RpcResult
 import org.opendaylight.yangtools.yang.data.api.CompositeNode
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
 import org.slf4j.LoggerFactory
-import org.opendaylight.controller.sal.core.api.mount.MountInstance
 
 class BrokerFacade implements DataReader<InstanceIdentifier, CompositeNode> {
 
@@ -133,4 +134,13 @@ class BrokerFacade implements DataReader<InstanceIdentifier, CompositeNode> {
         return transaction.commit
     }
 
+    def registerToListenDataChanges(ListenerAdapter listener) {
+        checkPreconditions
+        if (listener.listening) {
+            return;
+        }
+        val registration = dataService.registerDataChangeListener(listener.path, listener)
+        listener.setRegistration(registration)
+    }
+
 }
index d8a03e5..c2b82ea 100644 (file)
@@ -8,9 +8,11 @@
 package org.opendaylight.controller.sal.restconf.impl
 
 import com.google.common.base.Preconditions
+import com.google.common.base.Splitter
 import com.google.common.collect.BiMap
 import com.google.common.collect.FluentIterable
 import com.google.common.collect.HashBiMap
+import com.google.common.collect.Lists
 import java.net.URI
 import java.net.URLDecoder
 import java.net.URLEncoder
@@ -96,13 +98,11 @@ class ControllerContext implements SchemaServiceListener {
 
     private def InstanceIdWithSchemaNode toIdentifier(String restconfInstance, boolean toMountPointIdentifier) {
         checkPreconditions
-        val pathArgs = restconfInstance.split("/");
+        val pathArgs = Lists.newArrayList(Splitter.on("/").split(restconfInstance))
+        pathArgs.omitFirstAndLastEmptyString
         if (pathArgs.empty) {
             return null;
         }
-        if (pathArgs.head.empty) {
-            pathArgs.remove(0)
-        }
         val startModule = pathArgs.head.toModuleName();
         if (startModule === null) {
             throw new ResponseException(BAD_REQUEST, "First node in URI has to be in format \"moduleName:nodeName\"")
@@ -121,6 +121,22 @@ class ControllerContext implements SchemaServiceListener {
         return iiWithSchemaNode
     }
 
+    private def omitFirstAndLastEmptyString(List<String> list) {
+        if (list.empty) {
+            return list;
+        }
+        if (list.head.empty) {
+            list.remove(0)
+        }
+        if (list.empty) {
+            return list;
+        }
+        if (list.last.empty) {
+            list.remove(list.indexOf(list.last))
+        }
+        return list;
+    }
+
     private def getLatestModule(SchemaContext schema, String moduleName) {
         checkArgument(schema !== null);
         checkArgument(moduleName !== null && !moduleName.empty)
@@ -176,31 +192,51 @@ class ControllerContext implements SchemaServiceListener {
         return mountPoint.schemaContext?.findModuleByName(module.localName, module.revision)
     }
 
+    def getDataNodeContainerFor(InstanceIdentifier path) {
+        checkPreconditions
+        val elements = path.path;
+        val startQName = elements.head.nodeType;
+        val initialModule = globalSchema.findModuleByNamespaceAndRevision(startQName.namespace, startQName.revision)
+        var node = initialModule as DataNodeContainer;
+        for (element : elements) {
+            val potentialNode = node.childByQName(element.nodeType);
+            if (potentialNode === null || !potentialNode.listOrContainer) {
+                return null
+            }
+            node = potentialNode as DataNodeContainer
+        }
+        return node
+    }
+
     def String toFullRestconfIdentifier(InstanceIdentifier path) {
         checkPreconditions
         val elements = path.path;
         val ret = new StringBuilder();
-        val startQName = elements.get(0).nodeType;
+        val startQName = elements.head.nodeType;
         val initialModule = globalSchema.findModuleByNamespaceAndRevision(startQName.namespace, startQName.revision)
-        var node = initialModule as DataSchemaNode;
+        var node = initialModule as DataNodeContainer;
         for (element : elements) {
-            node = node.childByQName(element.nodeType);
-            ret.append(element.toRestconfIdentifier(node));
+            val potentialNode = node.childByQName(element.nodeType);
+            if (!potentialNode.listOrContainer) {
+                return null
+            }
+            node = potentialNode as DataNodeContainer
+            ret.append(element.convertToRestconfIdentifier(node));
         }
         return ret.toString
     }
 
-    private def dispatch CharSequence toRestconfIdentifier(NodeIdentifier argument, DataSchemaNode node) {
+    private def dispatch CharSequence convertToRestconfIdentifier(NodeIdentifier argument, ContainerSchemaNode node) {
         '''/«argument.nodeType.toRestconfIdentifier()»'''
     }
 
-    private def dispatch CharSequence toRestconfIdentifier(NodeIdentifierWithPredicates argument, ListSchemaNode node) {
+    private def dispatch CharSequence convertToRestconfIdentifier(NodeIdentifierWithPredicates argument, ListSchemaNode node) {
         val nodeIdentifier = argument.nodeType.toRestconfIdentifier();
         val keyValues = argument.keyValues;
         return '''/«nodeIdentifier»/«FOR key : node.keyDefinition SEPARATOR "/"»«keyValues.get(key).toUriString»«ENDFOR»'''
     }
 
-    private def dispatch CharSequence toRestconfIdentifier(PathArgument argument, DataSchemaNode node) {
+    private def dispatch CharSequence convertToRestconfIdentifier(PathArgument argument, DataNodeContainer node) {
         throw new IllegalArgumentException("Conversion of generic path argument is not supported");
     }
 
@@ -289,6 +325,10 @@ class ControllerContext implements SchemaServiceListener {
         return container.dataNodeChildByQName(name);
     }
 
+    private static dispatch def DataSchemaNode childByQName(Module container, QName name) {
+        return container.dataNodeChildByQName(name);
+    }
+
     private static dispatch def DataSchemaNode childByQName(DataSchemaNode container, QName name) {
         return null;
     }
@@ -415,7 +455,7 @@ class ControllerContext implements SchemaServiceListener {
             }
         }
         
-        if (!(targetNode instanceof ListSchemaNode) && !(targetNode instanceof ContainerSchemaNode)) {
+        if (!targetNode.isListOrContainer) {
             throw new ResponseException(BAD_REQUEST,"URI has bad format. Node \"" + strings.head + "\" must be Container or List yang type.")
         }
         // Number of consumed elements
@@ -538,14 +578,24 @@ class ControllerContext implements SchemaServiceListener {
     private def QName toQName(String name) {
         val module = name.toModuleName;
         val node = name.toNodeName;
-        val namespace = FluentIterable.from(globalSchema.modules.sort[o1,o2 | o1.revision.compareTo(o2.revision)]) //
+        val namespace = FluentIterable.from(globalSchema.modules.sort[o1,o2 | o1.revision.compareTo(o2.revision)])
             .transform[QName.create(namespace,revision,it.name)].findFirst[module == localName]
-        ;
-        return QName.create(namespace,node);
+        if (namespace === null) {
+            return null
+        }
+        return QName.create(namespace, node);
+    }
+
+    private def boolean isListOrContainer(DataSchemaNode node) {
+        return ((node instanceof ListSchemaNode) || (node instanceof ContainerSchemaNode))
     }
 
     def getRpcDefinition(String name) {
-        return qnameToRpc.get(name.toQName)
+        val validName = name.toQName
+        if (validName === null) {
+            return null
+        }
+        return qnameToRpc.get(validName)
     }
 
     override onGlobalContextUpdated(SchemaContext context) {
index be170c7..cfbce73 100644 (file)
@@ -8,6 +8,8 @@
 package org.opendaylight.controller.sal.restconf.impl
 
 import com.google.common.base.Preconditions
+import com.google.common.base.Splitter
+import com.google.common.collect.Lists
 import java.net.URI
 import java.text.ParseException
 import java.text.SimpleDateFormat
@@ -16,9 +18,12 @@ import java.util.HashMap
 import java.util.List
 import java.util.Set
 import javax.ws.rs.core.Response
+import javax.ws.rs.core.UriInfo
 import org.opendaylight.controller.md.sal.common.api.TransactionStatus
 import org.opendaylight.controller.sal.core.api.mount.MountInstance
 import org.opendaylight.controller.sal.rest.api.RestconfService
+import org.opendaylight.controller.sal.streams.listeners.Notificator
+import org.opendaylight.controller.sal.streams.websockets.WebSocketServer
 import org.opendaylight.yangtools.yang.common.QName
 import org.opendaylight.yangtools.yang.common.RpcResult
 import org.opendaylight.yangtools.yang.data.api.CompositeNode
@@ -37,13 +42,11 @@ import org.opendaylight.yangtools.yang.model.api.RpcDefinition
 import org.opendaylight.yangtools.yang.model.api.SchemaContext
 import org.opendaylight.yangtools.yang.model.api.TypeDefinition
 import org.opendaylight.yangtools.yang.model.api.type.IdentityrefTypeDefinition
+import org.opendaylight.yangtools.yang.model.util.EmptyType
+import org.opendaylight.yangtools.yang.parser.builder.impl.ContainerSchemaNodeBuilder
+import org.opendaylight.yangtools.yang.parser.builder.impl.LeafSchemaNodeBuilder
 
 import static javax.ws.rs.core.Response.Status.*
-import org.opendaylight.yangtools.yang.parser.builder.impl.LeafSchemaNodeBuilder
-import org.opendaylight.yangtools.yang.parser.builder.impl.ContainerSchemaNodeBuilder
-import org.opendaylight.yangtools.yang.model.util.EmptyType
-import com.google.common.base.Splitter
-import com.google.common.collect.Lists
 
 class RestconfImpl implements RestconfService {
 
@@ -58,6 +61,8 @@ class RestconfImpl implements RestconfService {
     val static RESTCONF_MODULE_DRAFT02_MODULES_CONTAINER_SCHEMA_NODE = "modules"
     val static RESTCONF_MODULE_DRAFT02_MODULE_LIST_SCHEMA_NODE = "module"
     val static RESTCONF_MODULE_DRAFT02_OPERATIONS_CONTAINER_SCHEMA_NODE = "operations"
+    val static SAL_REMOTE_NAMESPACE = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote"
+    val static SAL_REMOTE_RPC_SUBSRCIBE = "create-data-change-event-subscription"
 
     @Property
     BrokerFacade broker
@@ -177,7 +182,7 @@ class RestconfImpl implements RestconfService {
         } else (
             moduleNameAndRevision = identifier
         )
-        val  pathArgs = Lists.newArrayList(Splitter.on("/").omitEmptyStrings.split(moduleNameAndRevision))
+        val pathArgs = Lists.newArrayList(Splitter.on("/").omitEmptyStrings.split(moduleNameAndRevision))
         if (pathArgs.length < 2) {
             throw new ResponseException(BAD_REQUEST,
                 "URI has bad format. End of URI should be in format 'moduleName/yyyy-MM-dd'")
@@ -225,6 +230,36 @@ class RestconfImpl implements RestconfService {
     }
 
     override invokeRpc(String identifier, CompositeNode payload) {
+        val rpc = identifier.rpcDefinition
+        if (rpc === null) {
+            throw new ResponseException(NOT_FOUND, "RPC does not exist.");
+        }
+        if (rpc.QName.namespace.toString == SAL_REMOTE_NAMESPACE && rpc.QName.localName == SAL_REMOTE_RPC_SUBSRCIBE) {
+            val value = normalizeNode(payload, rpc.input, null)
+            val pathNode = value?.getFirstSimpleByName(QName.create(rpc.QName, "path"))
+            val pathValue = pathNode?.value
+            if (pathValue === null && !(pathValue instanceof InstanceIdentifier)) {
+                throw new ResponseException(INTERNAL_SERVER_ERROR, "Instance identifier was not normalized correctly.");
+            }
+            val pathIdentifier = (pathValue as InstanceIdentifier)
+            var String streamName = null
+            if (!pathIdentifier.path.nullOrEmpty) {
+                streamName = Notificator.createStreamNameFromUri(pathIdentifier.toFullRestconfIdentifier)
+            }
+            if (streamName.nullOrEmpty) {
+                throw new ResponseException(BAD_REQUEST, "Path is empty or contains data node which is not Container or List build-in type.");
+            }
+            val streamNameNode = NodeFactory.createImmutableSimpleNode(QName.create(rpc.output.QName, "stream-name"), null, streamName)
+            val List<Node<?>> output = new ArrayList
+            output.add(streamNameNode)
+            val responseData = NodeFactory.createMutableCompositeNode(rpc.output.QName, null, output, null, null)
+
+            if (!Notificator.existListenerFor(pathIdentifier)) {
+                Notificator.createListener(pathIdentifier, streamName)
+            }
+
+            return new StructuredData(responseData, rpc.output, null)
+        }
         return callRpc(identifier.rpcDefinition, payload)
     }
 
@@ -382,6 +417,21 @@ class RestconfImpl implements RestconfService {
         }
     }
 
+    override subscribeToStream(String identifier, UriInfo uriInfo) {
+        val streamName = Notificator.createStreamNameFromUri(identifier)
+        if (streamName.nullOrEmpty) {
+            throw new ResponseException(BAD_REQUEST, "Stream name is empty.")
+        }
+        val listener = Notificator.getListenerFor(streamName);
+        if (listener === null) {
+            throw new ResponseException(BAD_REQUEST, "Stream was not found.")
+        }
+        broker.registerToListenDataChanges(listener)
+        val uriBuilder = uriInfo.getAbsolutePathBuilder()
+        val uriToWebsocketServer = uriBuilder.port(WebSocketServer.PORT).replacePath(streamName).build()
+        return Response.status(OK).location(uriToWebsocketServer).build
+    }
+
     private def dispatch URI namespace(CompositeNode data) {
         return data.nodeType.namespace
     }
diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java
new file mode 100644 (file)
index 0000000..fdd6ba0
--- /dev/null
@@ -0,0 +1,406 @@
+package org.opendaylight.controller.sal.streams.listeners;
+
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.util.internal.ConcurrentSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Executors;
+
+import javax.activation.UnsupportedDataTypeException;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
+import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
+import org.opendaylight.controller.sal.rest.impl.XmlMapper;
+import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import com.google.common.base.Preconditions;
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+public class ListenerAdapter implements DataChangeListener {
+
+    private static final Logger logger = LoggerFactory.getLogger(ListenerAdapter.class);
+    private final XmlMapper xmlMapper = new XmlMapper();
+    private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
+
+    private final InstanceIdentifier path;
+    private ListenerRegistration<DataChangeListener> registration;
+    private final String streamName;
+    private Set<Channel> subscribers = new ConcurrentSet<>();
+    private final EventBus eventBus;
+    private final EventBusChangeRecorder eventBusChangeRecorder;
+
+    ListenerAdapter(InstanceIdentifier path, String streamName) {
+        Preconditions.checkNotNull(path);
+        Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
+        this.path = path;
+        this.streamName = streamName;
+        eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+        eventBusChangeRecorder = new EventBusChangeRecorder();
+        eventBus.register(eventBusChangeRecorder);
+    }
+
+    @Override
+    public void onDataChanged(DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+        if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty()
+                || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty()
+                || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) {
+            String xml = prepareXmlFrom(change);
+            Event event = new Event(EventType.NOTIFY);
+            event.setData(xml);
+            eventBus.post(event);
+        }
+    }
+
+    private final class EventBusChangeRecorder {
+        @Subscribe public void recordCustomerChange(Event event) {
+            if (event.getType() == EventType.REGISTER) {
+                Channel subscriber = event.getSubscriber();
+                if (!subscribers.contains(subscriber)) {
+                    subscribers.add(subscriber);
+                }
+            } else if (event.getType() == EventType.DEREGISTER) {
+                subscribers.remove(event.getSubscriber());
+                Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
+            } else if (event.getType() == EventType.NOTIFY) {
+                for (Channel subscriber : subscribers) {
+                    if (subscriber.isActive()) {
+                        logger.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
+                        subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
+                    } else {
+                        logger.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
+                        subscribers.remove(subscriber);
+                    }
+                }
+            }
+        }
+    }
+
+    private final class Event {
+        private final EventType type;
+        private Channel subscriber;
+        private String data;
+
+        public Event(EventType type) {
+            this.type = type;
+        }
+
+        public Channel getSubscriber() {
+            return subscriber;
+        }
+
+        public void setSubscriber(Channel subscriber) {
+            this.subscriber = subscriber;
+        }
+
+        public String getData() {
+            return data;
+        }
+
+        public void setData(String data) {
+            this.data = data;
+        }
+
+        public EventType getType() {
+            return type;
+        }
+    }
+
+    private enum EventType {
+        REGISTER,
+        DEREGISTER,
+        NOTIFY;
+    }
+
+    private String prepareXmlFrom(DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+        Document doc = createDocument();
+        Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
+                "notification");
+        doc.appendChild(notificationElement);
+
+        Element eventTimeElement = doc.createElement("eventTime");
+        eventTimeElement.setTextContent(toRFC3339(new Date()));
+        notificationElement.appendChild(eventTimeElement);
+
+        Element dataChangedNotificationEventElement = doc.createElementNS(
+                "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
+        addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
+        notificationElement.appendChild(dataChangedNotificationEventElement);
+
+        try {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            TransformerFactory tf = TransformerFactory.newInstance();
+            Transformer transformer = tf.newTransformer();
+            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+            transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+            transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
+            transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, "UTF-8")));
+            byte[] charData = out.toByteArray();
+            return new String(charData, "UTF-8");
+        } catch (TransformerException | UnsupportedEncodingException e) {
+            String msg = "Error during transformation of Document into String";
+            logger.error(msg, e);
+            return msg;
+        }
+    }
+
+    private String toRFC3339(Date d) {
+        return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
+    }
+
+    private Document createDocument() {
+        DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+        Document doc = null;
+        try {
+            DocumentBuilder bob = dbf.newDocumentBuilder();
+            doc = bob.newDocument();
+        } catch (ParserConfigurationException e) {
+            return null;
+        }
+        return doc;
+    }
+
+    private void addValuesToDataChangedNotificationEventElement(Document doc,
+            Element dataChangedNotificationEventElement, DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+        addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.CREATED);
+        addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.CREATED);
+        if (change.getCreatedConfigurationData().isEmpty()) {
+            addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.UPDATED);
+        }
+        if (change.getCreatedOperationalData().isEmpty()) {
+            addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.UPDATED);
+        }
+        addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.DELETED);
+        addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.DELETED);
+    }
+
+    private void addValuesFromDataToElement(Document doc, Set<InstanceIdentifier> data, Element element, Store store,
+            Operation operation) {
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+        for (InstanceIdentifier path : data) {
+            Node node = createDataChangeEventElement(doc, path, null, store, operation);
+            element.appendChild(node);
+        }
+    }
+
+    private void addValuesFromDataToElement(Document doc, Map<InstanceIdentifier, CompositeNode> data, Element element, Store store,
+            Operation operation) {
+        if (data == null || data.isEmpty()) {
+            return;
+        }
+        for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
+            Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation);
+            element.appendChild(node);
+        }
+    }
+
+    private Node createDataChangeEventElement(Document doc, InstanceIdentifier path, CompositeNode data, Store store,
+            Operation operation) {
+        Element dataChangeEventElement = doc.createElement("data-change-event");
+
+        Element pathElement = doc.createElement("path");
+        addPathAsValueToElement(path, pathElement);
+        dataChangeEventElement.appendChild(pathElement);
+
+        Element storeElement = doc.createElement("store");
+        storeElement.setTextContent(store.value);
+        dataChangeEventElement.appendChild(storeElement);
+
+        Element operationElement = doc.createElement("operation");
+        operationElement.setTextContent(operation.value);
+        dataChangeEventElement.appendChild(operationElement);
+
+        if (data != null) {
+            Element dataElement = doc.createElement("data");
+            Node dataAnyXml = translateToXml(path, data);
+            Node adoptedNode = doc.adoptNode(dataAnyXml);
+            dataElement.appendChild(adoptedNode);
+            dataChangeEventElement.appendChild(dataElement);
+        }
+
+        return dataChangeEventElement;
+    }
+
+    private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
+        DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path);
+        if (schemaNode == null) {
+            logger.info("Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", path);
+            return null;
+        }
+        try {
+            Document xml = xmlMapper.write(data, schemaNode);
+            return xml.getFirstChild();
+        } catch (UnsupportedDataTypeException e) {
+            logger.error("Error occured during translation of notification to XML.", e);
+            return null;
+        }
+    }
+
+    private void addPathAsValueToElement(InstanceIdentifier path, Element element) {
+        // Map< key = namespace, value = prefix>
+        Map<String, String> prefixes = new HashMap<>();
+        InstanceIdentifier instanceIdentifier = path;
+        StringBuilder textContent = new StringBuilder();
+        for (PathArgument pathArgument : instanceIdentifier.getPath()) {
+            textContent.append("/");
+            writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
+            if (pathArgument instanceof NodeIdentifierWithPredicates) {
+                Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
+                for (QName keyValue : predicates.keySet()) {
+                    String predicateValue = String.valueOf(predicates.get(keyValue));
+                    textContent.append("[");
+                    writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
+                    textContent.append("='");
+                    textContent.append(predicateValue);
+                    textContent.append("'");
+                    textContent.append("]");
+                }
+            } else if (pathArgument instanceof NodeWithValue) {
+                textContent.append("[.='");
+                textContent.append(((NodeWithValue)pathArgument).getValue());
+                textContent.append("'");
+                textContent.append("]");
+            }
+        }
+        element.setTextContent(textContent.toString());
+    }
+
+    private static void writeIdentifierWithNamespacePrefix(Element element, StringBuilder textContent, QName qName,
+            Map<String, String> prefixes) {
+        String namespace = qName.getNamespace().toString();
+        String prefix = prefixes.get(namespace);
+        if (prefix == null) {
+            prefix = qName.getPrefix();
+            if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) {
+                prefix = generateNewPrefix(prefixes.values());
+            }
+        }
+
+        element.setAttribute("xmlns:" + prefix, namespace.toString());
+        textContent.append(prefix);
+        prefixes.put(namespace, prefix);
+
+        textContent.append(":");
+        textContent.append(qName.getLocalName());
+    }
+
+    private static String generateNewPrefix(Collection<String> prefixes) {
+        StringBuilder result = null;
+        Random random = new Random();
+        do {
+            result = new StringBuilder();
+            for (int i = 0; i < 4; i++) {
+                int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
+                result.append(Character.toChars(randomNumber));
+            }
+        } while (prefixes.contains(result.toString()));
+
+        return result.toString();
+    }
+
+    public InstanceIdentifier getPath() {
+        return path;
+    }
+
+    public void setRegistration(ListenerRegistration<DataChangeListener> registration) {
+        this.registration = registration;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public void close() throws Exception {
+        subscribers = new ConcurrentSet<>();
+        registration.close();
+        registration = null;
+        eventBus.unregister(eventBusChangeRecorder);
+    }
+
+    public boolean isListening() {
+        return registration == null ? false : true;
+    }
+
+    public void addSubscriber(Channel subscriber) {
+        if (!subscriber.isActive()) {
+            logger.debug("Channel is not active between websocket server and subscriber {}"
+                    + subscriber.remoteAddress());
+        }
+        Event event = new Event(EventType.REGISTER);
+        event.setSubscriber(subscriber);
+        eventBus.post(event);
+    }
+
+    public void removeSubscriber(Channel subscriber) {
+        logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+        Event event = new Event(EventType.DEREGISTER);
+        event.setSubscriber(subscriber);
+        eventBus.post(event);
+    }
+
+    public boolean hasSubscribers() {
+        return !subscribers.isEmpty();
+    }
+
+    private static enum Store {
+        CONFIG("config"),
+        OPERATION("operation");
+
+        private final String value;
+
+        private Store(String value) {
+            this.value = value;
+        }
+    }
+
+    private static enum Operation {
+        CREATED("created"),
+        UPDATED("updated"),
+        DELETED("deleted");
+
+        private final String value;
+
+        private Operation(String value) {
+            this.value = value;
+        }
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java
new file mode 100644 (file)
index 0000000..b0140ec
--- /dev/null
@@ -0,0 +1,91 @@
+package org.opendaylight.controller.sal.streams.listeners;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public class Notificator {
+
+    private static Map<String, ListenerAdapter> listenersByStreamName = new ConcurrentHashMap<>();
+    private static Map<InstanceIdentifier, ListenerAdapter> listenersByInstanceIdentifier = new ConcurrentHashMap<>();
+    private static final Lock lock = new ReentrantLock();
+
+    private Notificator() {
+    }
+
+    public static ListenerAdapter getListenerFor(String streamName) {
+        return listenersByStreamName.get(streamName);
+    }
+
+    public static ListenerAdapter getListenerFor(InstanceIdentifier path) {
+        return listenersByInstanceIdentifier.get(path);
+    }
+
+    public static boolean existListenerFor(InstanceIdentifier path) {
+        return listenersByInstanceIdentifier.containsKey(path);
+    }
+
+    public static ListenerAdapter createListener(InstanceIdentifier path, String streamName) {
+        ListenerAdapter listener = new ListenerAdapter(path, streamName);
+        lock.lock();
+        listenersByInstanceIdentifier.put(path, listener);
+        listenersByStreamName.put(streamName, listener);
+        lock.unlock();
+        return listener;
+    }
+
+    public static void removeListener(InstanceIdentifier path) {
+        ListenerAdapter listener = listenersByInstanceIdentifier.get(path);
+        deleteListener(listener);
+    }
+
+    public static String createStreamNameFromUri(String uri) {
+        if (uri == null) {
+            return null;
+        }
+        String result = uri;
+        if (result.startsWith("/")) {
+            result = result.substring(1);
+        }
+        if (result.endsWith("/")) {
+            result = result.substring(0, result.length());
+        }
+        return result;
+    }
+
+    public static void removeAllListeners() {
+        for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) {
+            try {
+                listener.close();
+            } catch (Exception e) {
+            }
+        }
+        lock.lock();
+        listenersByStreamName = new ConcurrentHashMap<>();
+        listenersByInstanceIdentifier = new ConcurrentHashMap<>();
+        lock.unlock();
+    }
+
+    public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) {
+        if (!listener.hasSubscribers()) {
+            deleteListener(listener);
+        }
+    }
+
+    private static void deleteListener(ListenerAdapter listener) {
+        if (listener != null) {
+            try {
+                listener.close();
+            } catch (Exception e) {
+            }
+            lock.lock();
+            listenersByInstanceIdentifier.remove(listener.getPath());
+            listenersByStreamName.remove(listener).getStreamName();
+            lock.unlock();
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java
new file mode 100644 (file)
index 0000000..142cde1
--- /dev/null
@@ -0,0 +1,52 @@
+package org.opendaylight.controller.sal.streams.websockets;
+
+import org.opendaylight.controller.sal.streams.listeners.Notificator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class WebSocketServer implements Runnable {
+
+    private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
+
+    public static final int PORT = 8181;
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+
+    @Override
+    public void run() {
+        bossGroup = new NioEventLoopGroup();
+        workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel.class)
+                .childHandler(new WebSocketServerInitializer());
+
+            Channel ch = b.bind(PORT).sync().channel();
+            logger.info("Web socket server started at port {}.", PORT);
+
+            ch.closeFuture().sync();
+        } catch (InterruptedException e) {
+            // NOOP
+        } finally {
+            stop();
+        }
+    }
+
+    private void stop() {
+        Notificator.removeAllListeners();
+        if (bossGroup != null) {
+            bossGroup.shutdownGracefully();
+        }
+        if (workerGroup != null) {
+            workerGroup.shutdownGracefully();
+        }
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java
new file mode 100644 (file)
index 0000000..618ee57
--- /dev/null
@@ -0,0 +1,134 @@
+package org.opendaylight.controller.sal.streams.websockets;
+
+import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
+import io.netty.util.CharsetUtil;
+
+import java.io.IOException;
+
+import org.opendaylight.controller.sal.streams.listeners.ListenerAdapter;
+import org.opendaylight.controller.sal.streams.listeners.Notificator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
+
+    private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class);
+
+    private WebSocketServerHandshaker handshaker;
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+        if (msg instanceof FullHttpRequest) {
+            handleHttpRequest(ctx, (FullHttpRequest) msg);
+        } else if (msg instanceof WebSocketFrame) {
+            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
+        }
+    }
+
+    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
+            throws Exception {
+        // Handle a bad request.
+        if (!req.getDecoderResult().isSuccess()) {
+            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
+            return;
+        }
+
+        // Allow only GET methods.
+        if (req.getMethod() != GET) {
+            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
+            return;
+        }
+
+        String streamName = Notificator.createStreamNameFromUri(req.getUri());
+        ListenerAdapter listener = Notificator.getListenerFor(streamName);
+        if (listener != null) {
+            listener.addSubscriber(ctx.channel());
+            logger.debug("Subscriber successfully registered.");
+        } else {
+            logger.error("Listener for stream with name '{}' was not found.", streamName);
+            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
+        }
+
+        // Handshake
+        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
+                getWebSocketLocation(req), null, false);
+        handshaker = wsFactory.newHandshaker(req);
+        if (handshaker == null) {
+            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
+        } else {
+            handshaker.handshake(ctx.channel(), req);
+        }
+
+    }
+
+    private static void sendHttpResponse(ChannelHandlerContext ctx,
+            HttpRequest req, FullHttpResponse res) {
+        // Generate an error page if response getStatus code is not OK (200).
+        if (res.getStatus().code() != 200) {
+            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
+            res.content().writeBytes(buf);
+            buf.release();
+            setContentLength(res, res.content().readableBytes());
+        }
+
+        // Send the response and close the connection if necessary.
+        ChannelFuture f = ctx.channel().writeAndFlush(res);
+        if (!isKeepAlive(req) || res.getStatus().code() != 200) {
+            f.addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+
+    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws IOException {
+        if (frame instanceof CloseWebSocketFrame) {
+            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
+            String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
+            ListenerAdapter listener = Notificator.getListenerFor(streamName);
+            if (listener != null) {
+                listener.removeSubscriber(ctx.channel());
+                logger.debug("Subscriber successfully registered.");
+            }
+            Notificator.removeListenerIfNoSubscriberExists(listener);
+            return;
+        } else if (frame instanceof PingWebSocketFrame) {
+            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
+            return;
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+            throws Exception {
+        if (cause instanceof java.nio.channels.ClosedChannelException == false) {
+            //cause.printStackTrace();
+        }
+        ctx.close();
+    }
+
+    private static String getWebSocketLocation(HttpRequest req) {
+        return "http://" + req.headers().get(HOST) + req.getUri();
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java
new file mode 100644 (file)
index 0000000..5eb71ef
--- /dev/null
@@ -0,0 +1,19 @@
+package org.opendaylight.controller.sal.streams.websockets;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+
+public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
+
+    @Override
+    protected void initChannel(SocketChannel ch) throws Exception {
+        ChannelPipeline pipeline = ch.pipeline();
+        pipeline.addLast("codec-http", new HttpServerCodec());
+        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
+        pipeline.addLast("handler", new WebSocketServerHandler());
+    }
+
+}
index 3175ba9..a0e61a6 100644 (file)
@@ -15,13 +15,13 @@ import com.google.common.base.Charsets;
 
 public class RestOperationUtils {
 
-    static final String JSON = "+json";
-    static final String XML = "+xml";
+    public static final String JSON = "+json";
+    public static final String XML = "+xml";
 
     private RestOperationUtils() {
     }
 
-    static String createUri(String prefix, String encodedPart) throws UnsupportedEncodingException {
+    public static String createUri(String prefix, String encodedPart) throws UnsupportedEncodingException {
         return URI.create(prefix + URLEncoder.encode(encodedPart, Charsets.US_ASCII.name()).toString()).toASCIIString();
     }
 }
diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/IClientMessageCallback.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/IClientMessageCallback.java
new file mode 100644 (file)
index 0000000..63b8e6b
--- /dev/null
@@ -0,0 +1,9 @@
+package org.opendaylight.controller.sal.restconf.impl.websockets.client;
+
+/**
+ * Created by mbobak on 1/22/14.
+ */
+public interface IClientMessageCallback {
+
+    public void onMessageReceived(Object message);
+}
diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClient.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClient.java
new file mode 100644 (file)
index 0000000..bff63b8
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.impl.websockets.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
+import io.netty.handler.codec.http.websocketx.WebSocketVersion;
+
+import java.net.URI;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebSocketClient  {
+
+    private final URI uri;
+    private Bootstrap bootstrap = new Bootstrap();;
+    private final WebSocketClientHandler clientHandler;
+    private static final Logger logger = LoggerFactory.getLogger(WebSocketClient.class);
+    private Channel clientChannel;
+    private final EventLoopGroup group = new NioEventLoopGroup();
+
+    public WebSocketClient(URI uri,IClientMessageCallback clientMessageCallback) {
+        this.uri = uri;
+        clientHandler = new WebSocketClientHandler(
+                WebSocketClientHandshakerFactory.newHandshaker(
+                        uri, WebSocketVersion.V13, null, false,null),clientMessageCallback); // last null could be replaced with DefaultHttpHeaders
+        initialize();
+    }
+    private void initialize(){
+
+        String protocol = uri.getScheme();
+        if (!"http".equals(protocol)) {
+            throw new IllegalArgumentException("Unsupported protocol: " + protocol);
+        }
+
+        bootstrap.group(group)
+                .channel(NioSocketChannel.class)
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(SocketChannel ch) throws Exception {
+                        ChannelPipeline pipeline = ch.pipeline();
+                        pipeline.addLast("http-codec", new HttpClientCodec());
+                        pipeline.addLast("aggregator", new HttpObjectAggregator(8192));
+                        pipeline.addLast("ws-handler", clientHandler);
+                    }
+                });
+    }
+    public void connect() throws InterruptedException{
+        System.out.println("WebSocket Client connecting");
+        clientChannel  = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel();
+        clientHandler.handshakeFuture().sync();
+    }
+
+    public void writeAndFlush(String message){
+        clientChannel.writeAndFlush(new TextWebSocketFrame(message));
+    }
+    public void writeAndFlush(Object message){
+        clientChannel.writeAndFlush(message);
+    }
+
+    public void ping(){
+        clientChannel.writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer(new byte[]{1, 2, 3, 4, 5, 6})));
+    }
+
+    public void close() throws InterruptedException {
+        clientChannel.writeAndFlush(new CloseWebSocketFrame());
+
+        // WebSocketClientHandler will close the connection when the server
+        // responds to the CloseWebSocketFrame.
+        clientChannel.closeFuture().sync();
+        group.shutdownGracefully();
+    }
+
+    public static void main(String[] args) throws Exception {
+        URI uri;
+        if (args.length > 0) {
+            uri = new URI(args[0]);
+        } else {
+            uri = new URI("http://192.168.11.1:8181/opendaylight-inventory:nodes");
+        }
+        IClientMessageCallback messageCallback = new ClientMessageCallback();
+        WebSocketClient webSocketClient = new WebSocketClient(uri, messageCallback);
+        webSocketClient.connect();
+    }
+
+    private static class ClientMessageCallback implements IClientMessageCallback {
+        @Override
+        public void onMessageReceived(Object message) {
+            logger.info("received message {}", ((TextWebSocketFrame)message).text());
+        }
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClientHandler.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClientHandler.java
new file mode 100644 (file)
index 0000000..02e1632
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.impl.websockets.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.util.CharsetUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
+
+    private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class.toString());
+    private final WebSocketClientHandshaker handshaker;
+    private ChannelPromise handshakeFuture;
+    private IClientMessageCallback messageListener;
+
+
+    public WebSocketClientHandler(WebSocketClientHandshaker handshaker,IClientMessageCallback listener) {
+        this.handshaker = handshaker;
+        this.messageListener = listener;
+    }
+
+    public ChannelFuture handshakeFuture() {
+        return handshakeFuture;
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        handshakeFuture = ctx.newPromise();
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        handshaker.handshake(ctx.channel());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        logger.info("WebSocket Client disconnected!");
+    }
+
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+        Channel ch = ctx.channel();
+        if (!handshaker.isHandshakeComplete()) {
+            handshaker.finishHandshake(ch, (FullHttpResponse) msg);
+            logger.info("WebSocket Client connected!");
+            handshakeFuture.setSuccess();
+            return;
+        }
+
+        if (msg instanceof FullHttpResponse) {
+            FullHttpResponse response = (FullHttpResponse) msg;
+            throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content="
+                    + response.content().toString(CharsetUtil.UTF_8) + ')');
+        }
+
+        messageListener.onMessageReceived(msg);
+        WebSocketFrame frame = (WebSocketFrame) msg;
+
+        if (frame instanceof PongWebSocketFrame) {
+            logger.info("WebSocket Client received pong");
+        } else if (frame instanceof CloseWebSocketFrame) {
+            logger.info("WebSocket Client received closing");
+            ch.close();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+
+        if (!handshakeFuture.isDone()) {
+            handshakeFuture.setFailure(cause);
+        }
+
+        ctx.close();
+    }
+}
+
diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/test/RestStream.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/test/RestStream.java
new file mode 100644 (file)
index 0000000..4dcc63e
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.restconf.impl.websockets.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.opendaylight.controller.sal.restconf.impl.test.RestOperationUtils.createUri;
+
+import java.io.FileNotFoundException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.opendaylight.controller.sal.rest.impl.JsonToCompositeNodeProvider;
+import org.opendaylight.controller.sal.rest.impl.StructuredDataToJsonProvider;
+import org.opendaylight.controller.sal.rest.impl.StructuredDataToXmlProvider;
+import org.opendaylight.controller.sal.rest.impl.XmlToCompositeNodeProvider;
+import org.opendaylight.controller.sal.restconf.impl.BrokerFacade;
+import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
+import org.opendaylight.controller.sal.restconf.impl.RestconfImpl;
+import org.opendaylight.controller.sal.restconf.impl.test.TestUtils;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+public class RestStream extends JerseyTest {
+
+    private static BrokerFacade brokerFacade;
+    private static RestconfImpl restconfImpl;
+    private static SchemaContext schemaContextYangsIetf;
+
+    @BeforeClass
+    public static void init() throws FileNotFoundException {
+        schemaContextYangsIetf = TestUtils.loadSchemaContext("/full-versions/yangs");
+        ControllerContext controllerContext = ControllerContext.getInstance();
+        controllerContext.setSchemas(schemaContextYangsIetf);
+        brokerFacade = mock(BrokerFacade.class);
+        restconfImpl = RestconfImpl.getInstance();
+        restconfImpl.setBroker(brokerFacade);
+        restconfImpl.setControllerContext(controllerContext);
+    }
+
+    @Override
+    protected Application configure() {
+        /* enable/disable Jersey logs to console */
+//         enable(TestProperties.LOG_TRAFFIC);
+//         enable(TestProperties.DUMP_ENTITY);
+//         enable(TestProperties.RECORD_LOG_LEVEL);
+//         set(TestProperties.RECORD_LOG_LEVEL, Level.ALL.intValue());
+        ResourceConfig resourceConfig = new ResourceConfig();
+        resourceConfig = resourceConfig.registerInstances(restconfImpl, StructuredDataToXmlProvider.INSTANCE,
+                StructuredDataToJsonProvider.INSTANCE, XmlToCompositeNodeProvider.INSTANCE,
+                JsonToCompositeNodeProvider.INSTANCE);
+        return resourceConfig;
+    }
+
+    @Test
+    public void testCallRpcCallGet() throws UnsupportedEncodingException, InterruptedException {
+        String uri = createUri("/operations/", "sal-remote:create-data-change-event-subscription");
+        Response responseWithStreamName = post(uri, MediaType.APPLICATION_XML, getRpcInput());
+        String xmlResponse = responseWithStreamName.readEntity(String.class);
+        assertNotNull(xmlResponse);
+        assertTrue(xmlResponse.contains("<stream-name>ietf-interfaces:interfaces/ietf-interfaces:interface/eth0</stream-name>"));
+
+        uri = createUri("/streams/stream/", "ietf-interfaces:interfaces/ietf-interfaces:interface/eth0");
+        Response responseWithRedirectionUri = get(uri, MediaType.APPLICATION_XML);
+        final URI websocketServerUri = responseWithRedirectionUri.getLocation();
+        assertNotNull(websocketServerUri);
+        assertEquals(websocketServerUri.toString(), "http://localhost:8181/ietf-interfaces:interfaces/ietf-interfaces:interface/eth0");
+    }
+
+    private Response post(String uri, String mediaType, String data) {
+        return target(uri).request(mediaType).post(Entity.entity(data, mediaType));
+    }
+
+    private Response get(String uri, String mediaType) {
+        return target(uri).request(mediaType).get();
+    }
+
+    private String getRpcInput() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("<input xmlns=\"urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote\">");
+        sb.append("<path xmlns:int=\"urn:ietf:params:xml:ns:yang:ietf-interfaces\">/int:interfaces/int:interface[int:name='eth0']</path>");
+        sb.append("</input>");
+        return sb.toString();
+    }
+
+}
diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/resources/full-versions/yangs/sal-remote@2014-01-14.yang b/opendaylight/md-sal/sal-rest-connector/src/test/resources/full-versions/yangs/sal-remote@2014-01-14.yang
new file mode 100644 (file)
index 0000000..d12e252
--- /dev/null
@@ -0,0 +1,98 @@
+module sal-remote {
+
+       yang-version 1;
+    namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote";
+    prefix "sal-remote";
+    
+
+    organization "Cisco Systems, Inc.";
+    contact "Martin Bobak <mbobak@cisco.com>";
+
+    description
+          "This module contains the definition of methods related to
+           sal remote model.
+
+           Copyright (c)2013 Cisco Systems, Inc. All rights reserved.
+
+           This program and the accompanying materials are made available
+           under the terms of the Eclipse Public License v1.0 which
+           accompanies this distribution, and is available at
+           http://www.eclipse.org/legal/epl-v10.html";
+
+    revision "2014-01-14" {
+        description
+            "Initial revision";
+    }
+
+
+     typedef q-name {
+       type string;
+       reference
+         "http://www.w3.org/TR/2004/REC-xmlschema-2-20041028/#QName";
+     }
+
+    rpc create-data-change-event-subscription {
+        input {
+            leaf path {
+                type instance-identifier;
+                description "Subtree path. ";
+            }
+         }
+         output {
+            leaf stream-name {
+                type string;
+                description "Notification stream name.";
+            }
+         }
+    }
+
+    notification data-changed-notification {
+        description "Data change notification.";
+        list data-change-event {
+            key path;
+            leaf path {
+                type instance-identifier;
+            }
+            leaf store {
+                type enumeration {
+                    enum config;
+                    enum operation;
+                }
+            }
+            leaf operation {
+                type enumeration {
+                    enum created;
+                    enum updated;
+                    enum deleted;
+                }
+            }
+            anyxml data{
+                description "DataObject ";
+            }
+         }
+    }
+
+    rpc create-notification-stream {
+        input {
+            leaf-list notifications {
+                type q-name;
+                description "Notification QNames";
+            }
+         }
+        output {
+            leaf notification-stream-identifier {
+                type string;
+                description "Unique notification stream identifier, in which notifications will be propagated";
+            }
+        }
+    }
+
+    rpc begin-transaction{
+        output{
+            anyxml data-modification-transaction{
+                description "DataModificationTransaction xml";
+            }
+        }
+    }
+
+}
\ No newline at end of file