From: Martin Sunal Date: Sun, 2 Feb 2014 23:26:58 +0000 (+0100) Subject: Added resource /streams/stream/ X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~528^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=273a60e4a5c67c9d6dab1889f86e15092ddbefbd Added resource /streams/stream/ - added netty web socket server - port 8181 Subscriber can be registered for notifications in this flow: 1. Subscriber invokes RPC create-data-change-event-subscription with input element regarding to module sal-remote revision "2014-01-14" 2. Subscriber gets sream name as a response from server 3. Subscriber calls GET operation on /streams/stream/ 4. Subscriber gets response with status code 200 OK and header field Location contains URI of web socket server 5. Subscriber creates web socket http connection to obtained URI in step 4. 6. Subscriber can be unregistered from receiving notifications by sending CloseWebSocketFrame with reasonText containing stream name Change-Id: I92925f5a50b2ceadf2cc8f0eaee2c99dd6a63d8d Signed-off-by: Martin Sunal --- diff --git a/opendaylight/commons/opendaylight/pom.xml b/opendaylight/commons/opendaylight/pom.xml index 9d4f838992..d2e9048027 100644 --- a/opendaylight/commons/opendaylight/pom.xml +++ b/opendaylight/commons/opendaylight/pom.xml @@ -722,6 +722,11 @@ netty-common ${netty.version} + + io.netty + netty-codec-http + ${netty.version} + diff --git a/opendaylight/distribution/opendaylight/pom.xml b/opendaylight/distribution/opendaylight/pom.xml index 28637bcf46..5aba5ca5cb 100644 --- a/opendaylight/distribution/opendaylight/pom.xml +++ b/opendaylight/distribution/opendaylight/pom.xml @@ -1248,6 +1248,10 @@ io.netty netty-common + + io.netty + netty-codec-http + diff --git a/opendaylight/md-sal/sal-rest-connector/pom.xml b/opendaylight/md-sal/sal-rest-connector/pom.xml index ce3b05c53b..a4d2f4ca25 100644 --- a/opendaylight/md-sal/sal-rest-connector/pom.xml +++ b/opendaylight/md-sal/sal-rest-connector/pom.xml @@ -51,8 +51,13 @@ 2.2.4 - org.opendaylight.yangtools - yang-parser-impl + org.opendaylight.yangtools + yang-parser-impl + + + io.netty + netty-codec-http + 4.0.10.Final diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestconfService.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestconfService.java index 347b09cf25..cef5df9e57 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestconfService.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/api/RestconfService.java @@ -16,8 +16,10 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; import org.opendaylight.controller.sal.restconf.impl.StructuredData; import org.opendaylight.yangtools.yang.data.api.CompositeNode; @@ -134,4 +136,8 @@ public interface RestconfService { @Path("/config/{identifier:.+}") public Response deleteConfigurationData(@PathParam("identifier") String identifier); + @GET + @Path("/streams/stream/{identifier:.+}") + public Response subscribeToStream(@PathParam("identifier") String identifier, @Context UriInfo uriInfo); + } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonMapper.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonMapper.java index 226eff44e1..d1441d7b9d 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonMapper.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/JsonMapper.java @@ -203,7 +203,6 @@ class JsonMapper { + baseType.getClass().getSimpleName() + "."); } - // TODO check InstanceIdentifierTypeDefinition if (baseType instanceof IdentityrefTypeDefinition) { if (node.getValue() instanceof QName) { IdentityValuesDTO valueDTO = (IdentityValuesDTO) RestCodec.from(baseType, mountPoint).serialize( diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProvider.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProvider.java index ea5108ee43..1870bdf0bf 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProvider.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/rest/impl/RestconfProvider.java @@ -18,6 +18,7 @@ import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.controller.sal.core.api.mount.MountService; import org.opendaylight.controller.sal.restconf.impl.BrokerFacade; import org.opendaylight.controller.sal.restconf.impl.ControllerContext; +import org.opendaylight.controller.sal.streams.websockets.WebSocketServer; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.model.api.SchemaServiceListener; import org.osgi.framework.BundleActivator; @@ -34,6 +35,7 @@ public class RestconfProvider implements BundleActivator, Provider, ServiceTrack private ServiceTracker brokerServiceTrancker; private BundleContext bundleContext; private ProviderSession session; + private Thread webSocketServerThread; @Override public void onSessionInitiated(ProviderSession session) { @@ -53,6 +55,9 @@ public class RestconfProvider implements BundleActivator, Provider, ServiceTrack bundleContext = context; brokerServiceTrancker = new ServiceTracker<>(context, Broker.class, this); brokerServiceTrancker.open(); + webSocketServerThread = new Thread(new WebSocketServer()); + webSocketServerThread.setName("Web socket server"); + webSocketServerThread.start(); } @Override @@ -64,6 +69,7 @@ public class RestconfProvider implements BundleActivator, Provider, ServiceTrack e.printStackTrace(); } } + webSocketServerThread.interrupt(); session.close(); brokerServiceTrancker.close(); } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.xtend b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.xtend index 67429f3759..d3050061da 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.xtend +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/BrokerFacade.xtend @@ -11,13 +11,14 @@ import javax.ws.rs.core.Response import org.opendaylight.controller.md.sal.common.api.data.DataReader import org.opendaylight.controller.sal.core.api.Broker.ConsumerSession import org.opendaylight.controller.sal.core.api.data.DataBrokerService +import org.opendaylight.controller.sal.core.api.mount.MountInstance import org.opendaylight.controller.sal.rest.impl.RestconfProvider +import org.opendaylight.controller.sal.streams.listeners.ListenerAdapter import org.opendaylight.yangtools.yang.common.QName import org.opendaylight.yangtools.yang.common.RpcResult import org.opendaylight.yangtools.yang.data.api.CompositeNode import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier import org.slf4j.LoggerFactory -import org.opendaylight.controller.sal.core.api.mount.MountInstance class BrokerFacade implements DataReader { @@ -133,4 +134,13 @@ class BrokerFacade implements DataReader { return transaction.commit } + def registerToListenDataChanges(ListenerAdapter listener) { + checkPreconditions + if (listener.listening) { + return; + } + val registration = dataService.registerDataChangeListener(listener.path, listener) + listener.setRegistration(registration) + } + } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.xtend b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.xtend index d8a03e5d4f..c2b82eae63 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.xtend +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/ControllerContext.xtend @@ -8,9 +8,11 @@ package org.opendaylight.controller.sal.restconf.impl import com.google.common.base.Preconditions +import com.google.common.base.Splitter import com.google.common.collect.BiMap import com.google.common.collect.FluentIterable import com.google.common.collect.HashBiMap +import com.google.common.collect.Lists import java.net.URI import java.net.URLDecoder import java.net.URLEncoder @@ -96,13 +98,11 @@ class ControllerContext implements SchemaServiceListener { private def InstanceIdWithSchemaNode toIdentifier(String restconfInstance, boolean toMountPointIdentifier) { checkPreconditions - val pathArgs = restconfInstance.split("/"); + val pathArgs = Lists.newArrayList(Splitter.on("/").split(restconfInstance)) + pathArgs.omitFirstAndLastEmptyString if (pathArgs.empty) { return null; } - if (pathArgs.head.empty) { - pathArgs.remove(0) - } val startModule = pathArgs.head.toModuleName(); if (startModule === null) { throw new ResponseException(BAD_REQUEST, "First node in URI has to be in format \"moduleName:nodeName\"") @@ -121,6 +121,22 @@ class ControllerContext implements SchemaServiceListener { return iiWithSchemaNode } + private def omitFirstAndLastEmptyString(List list) { + if (list.empty) { + return list; + } + if (list.head.empty) { + list.remove(0) + } + if (list.empty) { + return list; + } + if (list.last.empty) { + list.remove(list.indexOf(list.last)) + } + return list; + } + private def getLatestModule(SchemaContext schema, String moduleName) { checkArgument(schema !== null); checkArgument(moduleName !== null && !moduleName.empty) @@ -176,31 +192,51 @@ class ControllerContext implements SchemaServiceListener { return mountPoint.schemaContext?.findModuleByName(module.localName, module.revision) } + def getDataNodeContainerFor(InstanceIdentifier path) { + checkPreconditions + val elements = path.path; + val startQName = elements.head.nodeType; + val initialModule = globalSchema.findModuleByNamespaceAndRevision(startQName.namespace, startQName.revision) + var node = initialModule as DataNodeContainer; + for (element : elements) { + val potentialNode = node.childByQName(element.nodeType); + if (potentialNode === null || !potentialNode.listOrContainer) { + return null + } + node = potentialNode as DataNodeContainer + } + return node + } + def String toFullRestconfIdentifier(InstanceIdentifier path) { checkPreconditions val elements = path.path; val ret = new StringBuilder(); - val startQName = elements.get(0).nodeType; + val startQName = elements.head.nodeType; val initialModule = globalSchema.findModuleByNamespaceAndRevision(startQName.namespace, startQName.revision) - var node = initialModule as DataSchemaNode; + var node = initialModule as DataNodeContainer; for (element : elements) { - node = node.childByQName(element.nodeType); - ret.append(element.toRestconfIdentifier(node)); + val potentialNode = node.childByQName(element.nodeType); + if (!potentialNode.listOrContainer) { + return null + } + node = potentialNode as DataNodeContainer + ret.append(element.convertToRestconfIdentifier(node)); } return ret.toString } - private def dispatch CharSequence toRestconfIdentifier(NodeIdentifier argument, DataSchemaNode node) { + private def dispatch CharSequence convertToRestconfIdentifier(NodeIdentifier argument, ContainerSchemaNode node) { '''/«argument.nodeType.toRestconfIdentifier()»''' } - private def dispatch CharSequence toRestconfIdentifier(NodeIdentifierWithPredicates argument, ListSchemaNode node) { + private def dispatch CharSequence convertToRestconfIdentifier(NodeIdentifierWithPredicates argument, ListSchemaNode node) { val nodeIdentifier = argument.nodeType.toRestconfIdentifier(); val keyValues = argument.keyValues; return '''/«nodeIdentifier»/«FOR key : node.keyDefinition SEPARATOR "/"»«keyValues.get(key).toUriString»«ENDFOR»''' } - private def dispatch CharSequence toRestconfIdentifier(PathArgument argument, DataSchemaNode node) { + private def dispatch CharSequence convertToRestconfIdentifier(PathArgument argument, DataNodeContainer node) { throw new IllegalArgumentException("Conversion of generic path argument is not supported"); } @@ -289,6 +325,10 @@ class ControllerContext implements SchemaServiceListener { return container.dataNodeChildByQName(name); } + private static dispatch def DataSchemaNode childByQName(Module container, QName name) { + return container.dataNodeChildByQName(name); + } + private static dispatch def DataSchemaNode childByQName(DataSchemaNode container, QName name) { return null; } @@ -415,7 +455,7 @@ class ControllerContext implements SchemaServiceListener { } } - if (!(targetNode instanceof ListSchemaNode) && !(targetNode instanceof ContainerSchemaNode)) { + if (!targetNode.isListOrContainer) { throw new ResponseException(BAD_REQUEST,"URI has bad format. Node \"" + strings.head + "\" must be Container or List yang type.") } // Number of consumed elements @@ -538,14 +578,24 @@ class ControllerContext implements SchemaServiceListener { private def QName toQName(String name) { val module = name.toModuleName; val node = name.toNodeName; - val namespace = FluentIterable.from(globalSchema.modules.sort[o1,o2 | o1.revision.compareTo(o2.revision)]) // + val namespace = FluentIterable.from(globalSchema.modules.sort[o1,o2 | o1.revision.compareTo(o2.revision)]) .transform[QName.create(namespace,revision,it.name)].findFirst[module == localName] - ; - return QName.create(namespace,node); + if (namespace === null) { + return null + } + return QName.create(namespace, node); + } + + private def boolean isListOrContainer(DataSchemaNode node) { + return ((node instanceof ListSchemaNode) || (node instanceof ContainerSchemaNode)) } def getRpcDefinition(String name) { - return qnameToRpc.get(name.toQName) + val validName = name.toQName + if (validName === null) { + return null + } + return qnameToRpc.get(validName) } override onGlobalContextUpdated(SchemaContext context) { diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.xtend b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.xtend index be170c75b0..cfbce736fb 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.xtend +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/restconf/impl/RestconfImpl.xtend @@ -8,6 +8,8 @@ package org.opendaylight.controller.sal.restconf.impl import com.google.common.base.Preconditions +import com.google.common.base.Splitter +import com.google.common.collect.Lists import java.net.URI import java.text.ParseException import java.text.SimpleDateFormat @@ -16,9 +18,12 @@ import java.util.HashMap import java.util.List import java.util.Set import javax.ws.rs.core.Response +import javax.ws.rs.core.UriInfo import org.opendaylight.controller.md.sal.common.api.TransactionStatus import org.opendaylight.controller.sal.core.api.mount.MountInstance import org.opendaylight.controller.sal.rest.api.RestconfService +import org.opendaylight.controller.sal.streams.listeners.Notificator +import org.opendaylight.controller.sal.streams.websockets.WebSocketServer import org.opendaylight.yangtools.yang.common.QName import org.opendaylight.yangtools.yang.common.RpcResult import org.opendaylight.yangtools.yang.data.api.CompositeNode @@ -37,13 +42,11 @@ import org.opendaylight.yangtools.yang.model.api.RpcDefinition import org.opendaylight.yangtools.yang.model.api.SchemaContext import org.opendaylight.yangtools.yang.model.api.TypeDefinition import org.opendaylight.yangtools.yang.model.api.type.IdentityrefTypeDefinition +import org.opendaylight.yangtools.yang.model.util.EmptyType +import org.opendaylight.yangtools.yang.parser.builder.impl.ContainerSchemaNodeBuilder +import org.opendaylight.yangtools.yang.parser.builder.impl.LeafSchemaNodeBuilder import static javax.ws.rs.core.Response.Status.* -import org.opendaylight.yangtools.yang.parser.builder.impl.LeafSchemaNodeBuilder -import org.opendaylight.yangtools.yang.parser.builder.impl.ContainerSchemaNodeBuilder -import org.opendaylight.yangtools.yang.model.util.EmptyType -import com.google.common.base.Splitter -import com.google.common.collect.Lists class RestconfImpl implements RestconfService { @@ -58,6 +61,8 @@ class RestconfImpl implements RestconfService { val static RESTCONF_MODULE_DRAFT02_MODULES_CONTAINER_SCHEMA_NODE = "modules" val static RESTCONF_MODULE_DRAFT02_MODULE_LIST_SCHEMA_NODE = "module" val static RESTCONF_MODULE_DRAFT02_OPERATIONS_CONTAINER_SCHEMA_NODE = "operations" + val static SAL_REMOTE_NAMESPACE = "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote" + val static SAL_REMOTE_RPC_SUBSRCIBE = "create-data-change-event-subscription" @Property BrokerFacade broker @@ -177,7 +182,7 @@ class RestconfImpl implements RestconfService { } else ( moduleNameAndRevision = identifier ) - val pathArgs = Lists.newArrayList(Splitter.on("/").omitEmptyStrings.split(moduleNameAndRevision)) + val pathArgs = Lists.newArrayList(Splitter.on("/").omitEmptyStrings.split(moduleNameAndRevision)) if (pathArgs.length < 2) { throw new ResponseException(BAD_REQUEST, "URI has bad format. End of URI should be in format 'moduleName/yyyy-MM-dd'") @@ -225,6 +230,36 @@ class RestconfImpl implements RestconfService { } override invokeRpc(String identifier, CompositeNode payload) { + val rpc = identifier.rpcDefinition + if (rpc === null) { + throw new ResponseException(NOT_FOUND, "RPC does not exist."); + } + if (rpc.QName.namespace.toString == SAL_REMOTE_NAMESPACE && rpc.QName.localName == SAL_REMOTE_RPC_SUBSRCIBE) { + val value = normalizeNode(payload, rpc.input, null) + val pathNode = value?.getFirstSimpleByName(QName.create(rpc.QName, "path")) + val pathValue = pathNode?.value + if (pathValue === null && !(pathValue instanceof InstanceIdentifier)) { + throw new ResponseException(INTERNAL_SERVER_ERROR, "Instance identifier was not normalized correctly."); + } + val pathIdentifier = (pathValue as InstanceIdentifier) + var String streamName = null + if (!pathIdentifier.path.nullOrEmpty) { + streamName = Notificator.createStreamNameFromUri(pathIdentifier.toFullRestconfIdentifier) + } + if (streamName.nullOrEmpty) { + throw new ResponseException(BAD_REQUEST, "Path is empty or contains data node which is not Container or List build-in type."); + } + val streamNameNode = NodeFactory.createImmutableSimpleNode(QName.create(rpc.output.QName, "stream-name"), null, streamName) + val List> output = new ArrayList + output.add(streamNameNode) + val responseData = NodeFactory.createMutableCompositeNode(rpc.output.QName, null, output, null, null) + + if (!Notificator.existListenerFor(pathIdentifier)) { + Notificator.createListener(pathIdentifier, streamName) + } + + return new StructuredData(responseData, rpc.output, null) + } return callRpc(identifier.rpcDefinition, payload) } @@ -382,6 +417,21 @@ class RestconfImpl implements RestconfService { } } + override subscribeToStream(String identifier, UriInfo uriInfo) { + val streamName = Notificator.createStreamNameFromUri(identifier) + if (streamName.nullOrEmpty) { + throw new ResponseException(BAD_REQUEST, "Stream name is empty.") + } + val listener = Notificator.getListenerFor(streamName); + if (listener === null) { + throw new ResponseException(BAD_REQUEST, "Stream was not found.") + } + broker.registerToListenDataChanges(listener) + val uriBuilder = uriInfo.getAbsolutePathBuilder() + val uriToWebsocketServer = uriBuilder.port(WebSocketServer.PORT).replacePath(streamName).build() + return Response.status(OK).location(uriToWebsocketServer).build + } + private def dispatch URI namespace(CompositeNode data) { return data.nodeType.namespace } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java new file mode 100644 index 0000000000..fdd6ba0317 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java @@ -0,0 +1,406 @@ +package org.opendaylight.controller.sal.streams.listeners; + +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.util.internal.ConcurrentSet; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; +import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.Executors; + +import javax.activation.UnsupportedDataTypeException; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent; +import org.opendaylight.controller.sal.core.api.data.DataChangeListener; +import org.opendaylight.controller.sal.rest.impl.XmlMapper; +import org.opendaylight.controller.sal.restconf.impl.ControllerContext; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.model.api.DataNodeContainer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +import com.google.common.base.Preconditions; +import com.google.common.eventbus.AsyncEventBus; +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + +public class ListenerAdapter implements DataChangeListener { + + private static final Logger logger = LoggerFactory.getLogger(ListenerAdapter.class); + private final XmlMapper xmlMapper = new XmlMapper(); + private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); + + private final InstanceIdentifier path; + private ListenerRegistration registration; + private final String streamName; + private Set subscribers = new ConcurrentSet<>(); + private final EventBus eventBus; + private final EventBusChangeRecorder eventBusChangeRecorder; + + ListenerAdapter(InstanceIdentifier path, String streamName) { + Preconditions.checkNotNull(path); + Preconditions.checkArgument(streamName != null && !streamName.isEmpty()); + this.path = path; + this.streamName = streamName; + eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); + eventBusChangeRecorder = new EventBusChangeRecorder(); + eventBus.register(eventBusChangeRecorder); + } + + @Override + public void onDataChanged(DataChangeEvent change) { + if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty() + || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty() + || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) { + String xml = prepareXmlFrom(change); + Event event = new Event(EventType.NOTIFY); + event.setData(xml); + eventBus.post(event); + } + } + + private final class EventBusChangeRecorder { + @Subscribe public void recordCustomerChange(Event event) { + if (event.getType() == EventType.REGISTER) { + Channel subscriber = event.getSubscriber(); + if (!subscribers.contains(subscriber)) { + subscribers.add(subscriber); + } + } else if (event.getType() == EventType.DEREGISTER) { + subscribers.remove(event.getSubscriber()); + Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this); + } else if (event.getType() == EventType.NOTIFY) { + for (Channel subscriber : subscribers) { + if (subscriber.isActive()) { + logger.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); + subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); + } else { + logger.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); + subscribers.remove(subscriber); + } + } + } + } + } + + private final class Event { + private final EventType type; + private Channel subscriber; + private String data; + + public Event(EventType type) { + this.type = type; + } + + public Channel getSubscriber() { + return subscriber; + } + + public void setSubscriber(Channel subscriber) { + this.subscriber = subscriber; + } + + public String getData() { + return data; + } + + public void setData(String data) { + this.data = data; + } + + public EventType getType() { + return type; + } + } + + private enum EventType { + REGISTER, + DEREGISTER, + NOTIFY; + } + + private String prepareXmlFrom(DataChangeEvent change) { + Document doc = createDocument(); + Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", + "notification"); + doc.appendChild(notificationElement); + + Element eventTimeElement = doc.createElement("eventTime"); + eventTimeElement.setTextContent(toRFC3339(new Date())); + notificationElement.appendChild(eventTimeElement); + + Element dataChangedNotificationEventElement = doc.createElementNS( + "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification"); + addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change); + notificationElement.appendChild(dataChangedNotificationEventElement); + + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TransformerFactory tf = TransformerFactory.newInstance(); + Transformer transformer = tf.newTransformer(); + transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); + transformer.setOutputProperty(OutputKeys.METHOD, "xml"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); + transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); + transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, "UTF-8"))); + byte[] charData = out.toByteArray(); + return new String(charData, "UTF-8"); + } catch (TransformerException | UnsupportedEncodingException e) { + String msg = "Error during transformation of Document into String"; + logger.error(msg, e); + return msg; + } + } + + private String toRFC3339(Date d) { + return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2"); + } + + private Document createDocument() { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + Document doc = null; + try { + DocumentBuilder bob = dbf.newDocumentBuilder(); + doc = bob.newDocument(); + } catch (ParserConfigurationException e) { + return null; + } + return doc; + } + + private void addValuesToDataChangedNotificationEventElement(Document doc, + Element dataChangedNotificationEventElement, DataChangeEvent change) { + addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.CREATED); + addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.CREATED); + if (change.getCreatedConfigurationData().isEmpty()) { + addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.UPDATED); + } + if (change.getCreatedOperationalData().isEmpty()) { + addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.UPDATED); + } + addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.DELETED); + addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.DELETED); + } + + private void addValuesFromDataToElement(Document doc, Set data, Element element, Store store, + Operation operation) { + if (data == null || data.isEmpty()) { + return; + } + for (InstanceIdentifier path : data) { + Node node = createDataChangeEventElement(doc, path, null, store, operation); + element.appendChild(node); + } + } + + private void addValuesFromDataToElement(Document doc, Map data, Element element, Store store, + Operation operation) { + if (data == null || data.isEmpty()) { + return; + } + for (Entry entry : data.entrySet()) { + Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation); + element.appendChild(node); + } + } + + private Node createDataChangeEventElement(Document doc, InstanceIdentifier path, CompositeNode data, Store store, + Operation operation) { + Element dataChangeEventElement = doc.createElement("data-change-event"); + + Element pathElement = doc.createElement("path"); + addPathAsValueToElement(path, pathElement); + dataChangeEventElement.appendChild(pathElement); + + Element storeElement = doc.createElement("store"); + storeElement.setTextContent(store.value); + dataChangeEventElement.appendChild(storeElement); + + Element operationElement = doc.createElement("operation"); + operationElement.setTextContent(operation.value); + dataChangeEventElement.appendChild(operationElement); + + if (data != null) { + Element dataElement = doc.createElement("data"); + Node dataAnyXml = translateToXml(path, data); + Node adoptedNode = doc.adoptNode(dataAnyXml); + dataElement.appendChild(adoptedNode); + dataChangeEventElement.appendChild(dataElement); + } + + return dataChangeEventElement; + } + + private Node translateToXml(InstanceIdentifier path, CompositeNode data) { + DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path); + if (schemaNode == null) { + logger.info("Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", path); + return null; + } + try { + Document xml = xmlMapper.write(data, schemaNode); + return xml.getFirstChild(); + } catch (UnsupportedDataTypeException e) { + logger.error("Error occured during translation of notification to XML.", e); + return null; + } + } + + private void addPathAsValueToElement(InstanceIdentifier path, Element element) { + // Map< key = namespace, value = prefix> + Map prefixes = new HashMap<>(); + InstanceIdentifier instanceIdentifier = path; + StringBuilder textContent = new StringBuilder(); + for (PathArgument pathArgument : instanceIdentifier.getPath()) { + textContent.append("/"); + writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes); + if (pathArgument instanceof NodeIdentifierWithPredicates) { + Map predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues(); + for (QName keyValue : predicates.keySet()) { + String predicateValue = String.valueOf(predicates.get(keyValue)); + textContent.append("["); + writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes); + textContent.append("='"); + textContent.append(predicateValue); + textContent.append("'"); + textContent.append("]"); + } + } else if (pathArgument instanceof NodeWithValue) { + textContent.append("[.='"); + textContent.append(((NodeWithValue)pathArgument).getValue()); + textContent.append("'"); + textContent.append("]"); + } + } + element.setTextContent(textContent.toString()); + } + + private static void writeIdentifierWithNamespacePrefix(Element element, StringBuilder textContent, QName qName, + Map prefixes) { + String namespace = qName.getNamespace().toString(); + String prefix = prefixes.get(namespace); + if (prefix == null) { + prefix = qName.getPrefix(); + if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) { + prefix = generateNewPrefix(prefixes.values()); + } + } + + element.setAttribute("xmlns:" + prefix, namespace.toString()); + textContent.append(prefix); + prefixes.put(namespace, prefix); + + textContent.append(":"); + textContent.append(qName.getLocalName()); + } + + private static String generateNewPrefix(Collection prefixes) { + StringBuilder result = null; + Random random = new Random(); + do { + result = new StringBuilder(); + for (int i = 0; i < 4; i++) { + int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26); + result.append(Character.toChars(randomNumber)); + } + } while (prefixes.contains(result.toString())); + + return result.toString(); + } + + public InstanceIdentifier getPath() { + return path; + } + + public void setRegistration(ListenerRegistration registration) { + this.registration = registration; + } + + public String getStreamName() { + return streamName; + } + + public void close() throws Exception { + subscribers = new ConcurrentSet<>(); + registration.close(); + registration = null; + eventBus.unregister(eventBusChangeRecorder); + } + + public boolean isListening() { + return registration == null ? false : true; + } + + public void addSubscriber(Channel subscriber) { + if (!subscriber.isActive()) { + logger.debug("Channel is not active between websocket server and subscriber {}" + + subscriber.remoteAddress()); + } + Event event = new Event(EventType.REGISTER); + event.setSubscriber(subscriber); + eventBus.post(event); + } + + public void removeSubscriber(Channel subscriber) { + logger.debug("Subscriber {} is removed.", subscriber.remoteAddress()); + Event event = new Event(EventType.DEREGISTER); + event.setSubscriber(subscriber); + eventBus.post(event); + } + + public boolean hasSubscribers() { + return !subscribers.isEmpty(); + } + + private static enum Store { + CONFIG("config"), + OPERATION("operation"); + + private final String value; + + private Store(String value) { + this.value = value; + } + } + + private static enum Operation { + CREATED("created"), + UPDATED("updated"), + DELETED("deleted"); + + private final String value; + + private Operation(String value) { + this.value = value; + } + } + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java new file mode 100644 index 0000000000..b0140ec83a --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java @@ -0,0 +1,91 @@ +package org.opendaylight.controller.sal.streams.listeners; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; + +public class Notificator { + + private static Map listenersByStreamName = new ConcurrentHashMap<>(); + private static Map listenersByInstanceIdentifier = new ConcurrentHashMap<>(); + private static final Lock lock = new ReentrantLock(); + + private Notificator() { + } + + public static ListenerAdapter getListenerFor(String streamName) { + return listenersByStreamName.get(streamName); + } + + public static ListenerAdapter getListenerFor(InstanceIdentifier path) { + return listenersByInstanceIdentifier.get(path); + } + + public static boolean existListenerFor(InstanceIdentifier path) { + return listenersByInstanceIdentifier.containsKey(path); + } + + public static ListenerAdapter createListener(InstanceIdentifier path, String streamName) { + ListenerAdapter listener = new ListenerAdapter(path, streamName); + lock.lock(); + listenersByInstanceIdentifier.put(path, listener); + listenersByStreamName.put(streamName, listener); + lock.unlock(); + return listener; + } + + public static void removeListener(InstanceIdentifier path) { + ListenerAdapter listener = listenersByInstanceIdentifier.get(path); + deleteListener(listener); + } + + public static String createStreamNameFromUri(String uri) { + if (uri == null) { + return null; + } + String result = uri; + if (result.startsWith("/")) { + result = result.substring(1); + } + if (result.endsWith("/")) { + result = result.substring(0, result.length()); + } + return result; + } + + public static void removeAllListeners() { + for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) { + try { + listener.close(); + } catch (Exception e) { + } + } + lock.lock(); + listenersByStreamName = new ConcurrentHashMap<>(); + listenersByInstanceIdentifier = new ConcurrentHashMap<>(); + lock.unlock(); + } + + public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) { + if (!listener.hasSubscribers()) { + deleteListener(listener); + } + } + + private static void deleteListener(ListenerAdapter listener) { + if (listener != null) { + try { + listener.close(); + } catch (Exception e) { + } + lock.lock(); + listenersByInstanceIdentifier.remove(listener.getPath()); + listenersByStreamName.remove(listener).getStreamName(); + lock.unlock(); + } + } + +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java new file mode 100644 index 0000000000..142cde1400 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java @@ -0,0 +1,52 @@ +package org.opendaylight.controller.sal.streams.websockets; + +import org.opendaylight.controller.sal.streams.listeners.Notificator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +public class WebSocketServer implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class); + + public static final int PORT = 8181; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + @Override + public void run() { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new WebSocketServerInitializer()); + + Channel ch = b.bind(PORT).sync().channel(); + logger.info("Web socket server started at port {}.", PORT); + + ch.closeFuture().sync(); + } catch (InterruptedException e) { + // NOOP + } finally { + stop(); + } + } + + private void stop() { + Notificator.removeAllListeners(); + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + } + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java new file mode 100644 index 0000000000..618ee57aba --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java @@ -0,0 +1,134 @@ +package org.opendaylight.controller.sal.streams.websockets; + +import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; +import static io.netty.handler.codec.http.HttpHeaders.setContentLength; +import static io.netty.handler.codec.http.HttpHeaders.Names.HOST; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; +import io.netty.util.CharsetUtil; + +import java.io.IOException; + +import org.opendaylight.controller.sal.streams.listeners.ListenerAdapter; +import org.opendaylight.controller.sal.streams.listeners.Notificator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketServerHandler extends SimpleChannelInboundHandler { + + private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class); + + private WebSocketServerHandshaker handshaker; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof FullHttpRequest) { + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + handleWebSocketFrame(ctx, (WebSocketFrame) msg); + } + } + + private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) + throws Exception { + // Handle a bad request. + if (!req.getDecoderResult().isSuccess()) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); + return; + } + + // Allow only GET methods. + if (req.getMethod() != GET) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); + return; + } + + String streamName = Notificator.createStreamNameFromUri(req.getUri()); + ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.addSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } else { + logger.error("Listener for stream with name '{}' was not found.", streamName); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); + } + + // Handshake + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( + getWebSocketLocation(req), null, false); + handshaker = wsFactory.newHandshaker(req); + if (handshaker == null) { + WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); + } else { + handshaker.handshake(ctx.channel(), req); + } + + } + + private static void sendHttpResponse(ChannelHandlerContext ctx, + HttpRequest req, FullHttpResponse res) { + // Generate an error page if response getStatus code is not OK (200). + if (res.getStatus().code() != 200) { + ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); + res.content().writeBytes(buf); + buf.release(); + setContentLength(res, res.content().readableBytes()); + } + + // Send the response and close the connection if necessary. + ChannelFuture f = ctx.channel().writeAndFlush(res); + if (!isKeepAlive(req) || res.getStatus().code() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + } + + private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws IOException { + if (frame instanceof CloseWebSocketFrame) { + handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText()); + ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.removeSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } + Notificator.removeListenerIfNoSubscriberExists(listener); + return; + } else if (frame instanceof PingWebSocketFrame) { + ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); + return; + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + if (cause instanceof java.nio.channels.ClosedChannelException == false) { + //cause.printStackTrace(); + } + ctx.close(); + } + + private static String getWebSocketLocation(HttpRequest req) { + return "http://" + req.headers().get(HOST) + req.getUri(); + } + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java new file mode 100644 index 0000000000..5eb71ef491 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java @@ -0,0 +1,19 @@ +package org.opendaylight.controller.sal.streams.websockets; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; + +public class WebSocketServerInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("codec-http", new HttpServerCodec()); + pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); + pipeline.addLast("handler", new WebSocketServerHandler()); + } + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestOperationUtils.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestOperationUtils.java index 3175ba9821..a0e61a6fa1 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestOperationUtils.java +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/RestOperationUtils.java @@ -15,13 +15,13 @@ import com.google.common.base.Charsets; public class RestOperationUtils { - static final String JSON = "+json"; - static final String XML = "+xml"; + public static final String JSON = "+json"; + public static final String XML = "+xml"; private RestOperationUtils() { } - static String createUri(String prefix, String encodedPart) throws UnsupportedEncodingException { + public static String createUri(String prefix, String encodedPart) throws UnsupportedEncodingException { return URI.create(prefix + URLEncoder.encode(encodedPart, Charsets.US_ASCII.name()).toString()).toASCIIString(); } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/IClientMessageCallback.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/IClientMessageCallback.java new file mode 100644 index 0000000000..63b8e6b4b7 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/IClientMessageCallback.java @@ -0,0 +1,9 @@ +package org.opendaylight.controller.sal.restconf.impl.websockets.client; + +/** + * Created by mbobak on 1/22/14. + */ +public interface IClientMessageCallback { + + public void onMessageReceived(Object message); +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClient.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClient.java new file mode 100644 index 0000000000..bff63b88ad --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClient.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.restconf.impl.websockets.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketVersion; + +import java.net.URI; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketClient { + + private final URI uri; + private Bootstrap bootstrap = new Bootstrap();; + private final WebSocketClientHandler clientHandler; + private static final Logger logger = LoggerFactory.getLogger(WebSocketClient.class); + private Channel clientChannel; + private final EventLoopGroup group = new NioEventLoopGroup(); + + public WebSocketClient(URI uri,IClientMessageCallback clientMessageCallback) { + this.uri = uri; + clientHandler = new WebSocketClientHandler( + WebSocketClientHandshakerFactory.newHandshaker( + uri, WebSocketVersion.V13, null, false,null),clientMessageCallback); // last null could be replaced with DefaultHttpHeaders + initialize(); + } + private void initialize(){ + + String protocol = uri.getScheme(); + if (!"http".equals(protocol)) { + throw new IllegalArgumentException("Unsupported protocol: " + protocol); + } + + bootstrap.group(group) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("http-codec", new HttpClientCodec()); + pipeline.addLast("aggregator", new HttpObjectAggregator(8192)); + pipeline.addLast("ws-handler", clientHandler); + } + }); + } + public void connect() throws InterruptedException{ + System.out.println("WebSocket Client connecting"); + clientChannel = bootstrap.connect(uri.getHost(), uri.getPort()).sync().channel(); + clientHandler.handshakeFuture().sync(); + } + + public void writeAndFlush(String message){ + clientChannel.writeAndFlush(new TextWebSocketFrame(message)); + } + public void writeAndFlush(Object message){ + clientChannel.writeAndFlush(message); + } + + public void ping(){ + clientChannel.writeAndFlush(new PingWebSocketFrame(Unpooled.copiedBuffer(new byte[]{1, 2, 3, 4, 5, 6}))); + } + + public void close() throws InterruptedException { + clientChannel.writeAndFlush(new CloseWebSocketFrame()); + + // WebSocketClientHandler will close the connection when the server + // responds to the CloseWebSocketFrame. + clientChannel.closeFuture().sync(); + group.shutdownGracefully(); + } + + public static void main(String[] args) throws Exception { + URI uri; + if (args.length > 0) { + uri = new URI(args[0]); + } else { + uri = new URI("http://192.168.11.1:8181/opendaylight-inventory:nodes"); + } + IClientMessageCallback messageCallback = new ClientMessageCallback(); + WebSocketClient webSocketClient = new WebSocketClient(uri, messageCallback); + webSocketClient.connect(); + } + + private static class ClientMessageCallback implements IClientMessageCallback { + @Override + public void onMessageReceived(Object message) { + logger.info("received message {}", ((TextWebSocketFrame)message).text()); + } + } + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClientHandler.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClientHandler.java new file mode 100644 index 0000000000..02e1632057 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/client/WebSocketClientHandler.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.restconf.impl.websockets.client; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import io.netty.util.CharsetUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebSocketClientHandler extends SimpleChannelInboundHandler { + + private static final Logger logger = LoggerFactory.getLogger(WebSocketClientHandler.class.toString()); + private final WebSocketClientHandshaker handshaker; + private ChannelPromise handshakeFuture; + private IClientMessageCallback messageListener; + + + public WebSocketClientHandler(WebSocketClientHandshaker handshaker,IClientMessageCallback listener) { + this.handshaker = handshaker; + this.messageListener = listener; + } + + public ChannelFuture handshakeFuture() { + return handshakeFuture; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + handshakeFuture = ctx.newPromise(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + handshaker.handshake(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + logger.info("WebSocket Client disconnected!"); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + Channel ch = ctx.channel(); + if (!handshaker.isHandshakeComplete()) { + handshaker.finishHandshake(ch, (FullHttpResponse) msg); + logger.info("WebSocket Client connected!"); + handshakeFuture.setSuccess(); + return; + } + + if (msg instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) msg; + throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + + response.content().toString(CharsetUtil.UTF_8) + ')'); + } + + messageListener.onMessageReceived(msg); + WebSocketFrame frame = (WebSocketFrame) msg; + + if (frame instanceof PongWebSocketFrame) { + logger.info("WebSocket Client received pong"); + } else if (frame instanceof CloseWebSocketFrame) { + logger.info("WebSocket Client received closing"); + ch.close(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + + if (!handshakeFuture.isDone()) { + handshakeFuture.setFailure(cause); + } + + ctx.close(); + } +} + diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/test/RestStream.java b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/test/RestStream.java new file mode 100644 index 0000000000..4dcc63e89a --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/websockets/test/RestStream.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.restconf.impl.websockets.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.opendaylight.controller.sal.restconf.impl.test.RestOperationUtils.createUri; + +import java.io.FileNotFoundException; +import java.io.UnsupportedEncodingException; +import java.net.URI; + +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.test.JerseyTest; +import org.junit.BeforeClass; +import org.junit.Test; +import org.opendaylight.controller.sal.rest.impl.JsonToCompositeNodeProvider; +import org.opendaylight.controller.sal.rest.impl.StructuredDataToJsonProvider; +import org.opendaylight.controller.sal.rest.impl.StructuredDataToXmlProvider; +import org.opendaylight.controller.sal.rest.impl.XmlToCompositeNodeProvider; +import org.opendaylight.controller.sal.restconf.impl.BrokerFacade; +import org.opendaylight.controller.sal.restconf.impl.ControllerContext; +import org.opendaylight.controller.sal.restconf.impl.RestconfImpl; +import org.opendaylight.controller.sal.restconf.impl.test.TestUtils; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +public class RestStream extends JerseyTest { + + private static BrokerFacade brokerFacade; + private static RestconfImpl restconfImpl; + private static SchemaContext schemaContextYangsIetf; + + @BeforeClass + public static void init() throws FileNotFoundException { + schemaContextYangsIetf = TestUtils.loadSchemaContext("/full-versions/yangs"); + ControllerContext controllerContext = ControllerContext.getInstance(); + controllerContext.setSchemas(schemaContextYangsIetf); + brokerFacade = mock(BrokerFacade.class); + restconfImpl = RestconfImpl.getInstance(); + restconfImpl.setBroker(brokerFacade); + restconfImpl.setControllerContext(controllerContext); + } + + @Override + protected Application configure() { + /* enable/disable Jersey logs to console */ +// enable(TestProperties.LOG_TRAFFIC); +// enable(TestProperties.DUMP_ENTITY); +// enable(TestProperties.RECORD_LOG_LEVEL); +// set(TestProperties.RECORD_LOG_LEVEL, Level.ALL.intValue()); + ResourceConfig resourceConfig = new ResourceConfig(); + resourceConfig = resourceConfig.registerInstances(restconfImpl, StructuredDataToXmlProvider.INSTANCE, + StructuredDataToJsonProvider.INSTANCE, XmlToCompositeNodeProvider.INSTANCE, + JsonToCompositeNodeProvider.INSTANCE); + return resourceConfig; + } + + @Test + public void testCallRpcCallGet() throws UnsupportedEncodingException, InterruptedException { + String uri = createUri("/operations/", "sal-remote:create-data-change-event-subscription"); + Response responseWithStreamName = post(uri, MediaType.APPLICATION_XML, getRpcInput()); + String xmlResponse = responseWithStreamName.readEntity(String.class); + assertNotNull(xmlResponse); + assertTrue(xmlResponse.contains("ietf-interfaces:interfaces/ietf-interfaces:interface/eth0")); + + uri = createUri("/streams/stream/", "ietf-interfaces:interfaces/ietf-interfaces:interface/eth0"); + Response responseWithRedirectionUri = get(uri, MediaType.APPLICATION_XML); + final URI websocketServerUri = responseWithRedirectionUri.getLocation(); + assertNotNull(websocketServerUri); + assertEquals(websocketServerUri.toString(), "http://localhost:8181/ietf-interfaces:interfaces/ietf-interfaces:interface/eth0"); + } + + private Response post(String uri, String mediaType, String data) { + return target(uri).request(mediaType).post(Entity.entity(data, mediaType)); + } + + private Response get(String uri, String mediaType) { + return target(uri).request(mediaType).get(); + } + + private String getRpcInput() { + StringBuilder sb = new StringBuilder(); + sb.append(""); + sb.append("/int:interfaces/int:interface[int:name='eth0']"); + sb.append(""); + return sb.toString(); + } + +} diff --git a/opendaylight/md-sal/sal-rest-connector/src/test/resources/full-versions/yangs/sal-remote@2014-01-14.yang b/opendaylight/md-sal/sal-rest-connector/src/test/resources/full-versions/yangs/sal-remote@2014-01-14.yang new file mode 100644 index 0000000000..d12e252711 --- /dev/null +++ b/opendaylight/md-sal/sal-rest-connector/src/test/resources/full-versions/yangs/sal-remote@2014-01-14.yang @@ -0,0 +1,98 @@ +module sal-remote { + + yang-version 1; + namespace "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote"; + prefix "sal-remote"; + + + organization "Cisco Systems, Inc."; + contact "Martin Bobak "; + + description + "This module contains the definition of methods related to + sal remote model. + + Copyright (c)2013 Cisco Systems, Inc. All rights reserved. + + This program and the accompanying materials are made available + under the terms of the Eclipse Public License v1.0 which + accompanies this distribution, and is available at + http://www.eclipse.org/legal/epl-v10.html"; + + revision "2014-01-14" { + description + "Initial revision"; + } + + + typedef q-name { + type string; + reference + "http://www.w3.org/TR/2004/REC-xmlschema-2-20041028/#QName"; + } + + rpc create-data-change-event-subscription { + input { + leaf path { + type instance-identifier; + description "Subtree path. "; + } + } + output { + leaf stream-name { + type string; + description "Notification stream name."; + } + } + } + + notification data-changed-notification { + description "Data change notification."; + list data-change-event { + key path; + leaf path { + type instance-identifier; + } + leaf store { + type enumeration { + enum config; + enum operation; + } + } + leaf operation { + type enumeration { + enum created; + enum updated; + enum deleted; + } + } + anyxml data{ + description "DataObject "; + } + } + } + + rpc create-notification-stream { + input { + leaf-list notifications { + type q-name; + description "Notification QNames"; + } + } + output { + leaf notification-stream-identifier { + type string; + description "Unique notification stream identifier, in which notifications will be propagated"; + } + } + } + + rpc begin-transaction{ + output{ + anyxml data-modification-transaction{ + description "DataModificationTransaction xml"; + } + } + } + +} \ No newline at end of file