Merge "Modify config-api exceptions, bump config and netconf to 0.2.5-SNAPSHOT."
authorTony Tkacik <ttkacik@cisco.com>
Mon, 24 Feb 2014 17:22:10 +0000 (17:22 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Mon, 24 Feb 2014 17:22:10 +0000 (17:22 +0000)
opendaylight/md-sal/sal-netconf-connector/src/main/java/org/opendaylight/controller/sal/connect/netconf/NetconfDevice.xtend
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java
opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java
opendaylight/md-sal/sal-restconf-broker/src/main/java/org/opendaylight/controller/sal/restconf/broker/impl/DataBrokerServiceImpl.java
opendaylight/usermanager/api/src/main/java/org/opendaylight/controller/usermanager/UserConfig.java

index c9fb1fc0b895ffabcd609eed14424e7411856413..4ae84c7d310c4c4d7b54c7dffa403ec2f8fff5ce 100644 (file)
@@ -127,12 +127,8 @@ AutoCloseable {
 
         val listener = new NetconfDeviceListener(this);
         val task = startClientTask(dispatcher, listener)
-        if (mountInstance != null) {
-            commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this)
-        }
         return processingExecutor.submit(task) as Future<Void>;
 
-    //commitHandlerReg = mountInstance.registerCommitHandler(path,this);
     }
 
     def Optional<SchemaContext> getSchemaContext() {
@@ -162,11 +158,16 @@ AutoCloseable {
                 deviceContextProvider.createContextFromCapabilities(initialCapabilities);
                 if (mountInstance != null && schemaContext.isPresent) {
                     mountInstance.schemaContext = schemaContext.get();
+                    val operations = schemaContext.get().operations;
+                    for (rpc : operations) {
+                        mountInstance.addRpcImplementation(rpc.QName, this);
+                    }
                 }
                 updateDeviceState()
                 if (mountInstance != null && confReaderReg == null && operReaderReg == null) {
                     confReaderReg = mountInstance.registerConfigurationReader(ROOT_PATH, this);
                     operReaderReg = mountInstance.registerOperationalReader(ROOT_PATH, this);
+                    commitHandlerReg = mountInstance.registerCommitHandler(ROOT_PATH, this);
                 }
             } catch (Exception e) {
                 logger.error("Netconf client NOT started. ", e)
index fdd6ba0317d597dd1483c4a54dcd647e4734b6cc..5fd76de98bb51c4e0b322f6c16f0bd17c5d417d2 100644 (file)
@@ -51,356 +51,617 @@ import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 
+/**
+ * {@link ListenerAdapter} is responsible to track events, which occurred by
+ * changing data in data source.
+ */
 public class ListenerAdapter implements DataChangeListener {
 
-    private static final Logger logger = LoggerFactory.getLogger(ListenerAdapter.class);
-    private final XmlMapper xmlMapper = new XmlMapper();
-    private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
-
-    private final InstanceIdentifier path;
-    private ListenerRegistration<DataChangeListener> registration;
-    private final String streamName;
-    private Set<Channel> subscribers = new ConcurrentSet<>();
-    private final EventBus eventBus;
-    private final EventBusChangeRecorder eventBusChangeRecorder;
-
-    ListenerAdapter(InstanceIdentifier path, String streamName) {
-        Preconditions.checkNotNull(path);
-        Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
-        this.path = path;
-        this.streamName = streamName;
-        eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
-        eventBusChangeRecorder = new EventBusChangeRecorder();
-        eventBus.register(eventBusChangeRecorder);
-    }
-
-    @Override
-    public void onDataChanged(DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
-        if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty()
-                || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty()
-                || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) {
-            String xml = prepareXmlFrom(change);
-            Event event = new Event(EventType.NOTIFY);
-            event.setData(xml);
-            eventBus.post(event);
-        }
-    }
-
-    private final class EventBusChangeRecorder {
-        @Subscribe public void recordCustomerChange(Event event) {
-            if (event.getType() == EventType.REGISTER) {
-                Channel subscriber = event.getSubscriber();
-                if (!subscribers.contains(subscriber)) {
-                    subscribers.add(subscriber);
-                }
-            } else if (event.getType() == EventType.DEREGISTER) {
-                subscribers.remove(event.getSubscriber());
-                Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
-            } else if (event.getType() == EventType.NOTIFY) {
-                for (Channel subscriber : subscribers) {
-                    if (subscriber.isActive()) {
-                        logger.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
-                        subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
-                    } else {
-                        logger.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
-                        subscribers.remove(subscriber);
-                    }
-                }
-            }
-        }
-    }
-
-    private final class Event {
-        private final EventType type;
-        private Channel subscriber;
-        private String data;
-
-        public Event(EventType type) {
-            this.type = type;
-        }
-
-        public Channel getSubscriber() {
-            return subscriber;
-        }
-
-        public void setSubscriber(Channel subscriber) {
-            this.subscriber = subscriber;
-        }
-
-        public String getData() {
-            return data;
-        }
-
-        public void setData(String data) {
-            this.data = data;
-        }
-
-        public EventType getType() {
-            return type;
-        }
-    }
-
-    private enum EventType {
-        REGISTER,
-        DEREGISTER,
-        NOTIFY;
-    }
-
-    private String prepareXmlFrom(DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
-        Document doc = createDocument();
-        Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
-                "notification");
-        doc.appendChild(notificationElement);
-
-        Element eventTimeElement = doc.createElement("eventTime");
-        eventTimeElement.setTextContent(toRFC3339(new Date()));
-        notificationElement.appendChild(eventTimeElement);
-
-        Element dataChangedNotificationEventElement = doc.createElementNS(
-                "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
-        addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
-        notificationElement.appendChild(dataChangedNotificationEventElement);
-
-        try {
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            TransformerFactory tf = TransformerFactory.newInstance();
-            Transformer transformer = tf.newTransformer();
-            transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
-            transformer.setOutputProperty(OutputKeys.METHOD, "xml");
-            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-            transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
-            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
-            transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, "UTF-8")));
-            byte[] charData = out.toByteArray();
-            return new String(charData, "UTF-8");
-        } catch (TransformerException | UnsupportedEncodingException e) {
-            String msg = "Error during transformation of Document into String";
-            logger.error(msg, e);
-            return msg;
-        }
-    }
-
-    private String toRFC3339(Date d) {
-        return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
-    }
-
-    private Document createDocument() {
-        DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
-        Document doc = null;
-        try {
-            DocumentBuilder bob = dbf.newDocumentBuilder();
-            doc = bob.newDocument();
-        } catch (ParserConfigurationException e) {
-            return null;
-        }
-        return doc;
-    }
-
-    private void addValuesToDataChangedNotificationEventElement(Document doc,
-            Element dataChangedNotificationEventElement, DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
-        addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.CREATED);
-        addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.CREATED);
-        if (change.getCreatedConfigurationData().isEmpty()) {
-            addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.UPDATED);
-        }
-        if (change.getCreatedOperationalData().isEmpty()) {
-            addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.UPDATED);
-        }
-        addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.DELETED);
-        addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.DELETED);
-    }
-
-    private void addValuesFromDataToElement(Document doc, Set<InstanceIdentifier> data, Element element, Store store,
-            Operation operation) {
-        if (data == null || data.isEmpty()) {
-            return;
-        }
-        for (InstanceIdentifier path : data) {
-            Node node = createDataChangeEventElement(doc, path, null, store, operation);
-            element.appendChild(node);
-        }
-    }
-
-    private void addValuesFromDataToElement(Document doc, Map<InstanceIdentifier, CompositeNode> data, Element element, Store store,
-            Operation operation) {
-        if (data == null || data.isEmpty()) {
-            return;
-        }
-        for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
-            Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation);
-            element.appendChild(node);
-        }
-    }
-
-    private Node createDataChangeEventElement(Document doc, InstanceIdentifier path, CompositeNode data, Store store,
-            Operation operation) {
-        Element dataChangeEventElement = doc.createElement("data-change-event");
-
-        Element pathElement = doc.createElement("path");
-        addPathAsValueToElement(path, pathElement);
-        dataChangeEventElement.appendChild(pathElement);
-
-        Element storeElement = doc.createElement("store");
-        storeElement.setTextContent(store.value);
-        dataChangeEventElement.appendChild(storeElement);
-
-        Element operationElement = doc.createElement("operation");
-        operationElement.setTextContent(operation.value);
-        dataChangeEventElement.appendChild(operationElement);
-
-        if (data != null) {
-            Element dataElement = doc.createElement("data");
-            Node dataAnyXml = translateToXml(path, data);
-            Node adoptedNode = doc.adoptNode(dataAnyXml);
-            dataElement.appendChild(adoptedNode);
-            dataChangeEventElement.appendChild(dataElement);
-        }
-
-        return dataChangeEventElement;
-    }
-
-    private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
-        DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path);
-        if (schemaNode == null) {
-            logger.info("Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", path);
-            return null;
-        }
-        try {
-            Document xml = xmlMapper.write(data, schemaNode);
-            return xml.getFirstChild();
-        } catch (UnsupportedDataTypeException e) {
-            logger.error("Error occured during translation of notification to XML.", e);
-            return null;
-        }
-    }
-
-    private void addPathAsValueToElement(InstanceIdentifier path, Element element) {
-        // Map< key = namespace, value = prefix>
-        Map<String, String> prefixes = new HashMap<>();
-        InstanceIdentifier instanceIdentifier = path;
-        StringBuilder textContent = new StringBuilder();
-        for (PathArgument pathArgument : instanceIdentifier.getPath()) {
-            textContent.append("/");
-            writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
-            if (pathArgument instanceof NodeIdentifierWithPredicates) {
-                Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
-                for (QName keyValue : predicates.keySet()) {
-                    String predicateValue = String.valueOf(predicates.get(keyValue));
-                    textContent.append("[");
-                    writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
-                    textContent.append("='");
-                    textContent.append(predicateValue);
-                    textContent.append("'");
-                    textContent.append("]");
-                }
-            } else if (pathArgument instanceof NodeWithValue) {
-                textContent.append("[.='");
-                textContent.append(((NodeWithValue)pathArgument).getValue());
-                textContent.append("'");
-                textContent.append("]");
-            }
-        }
-        element.setTextContent(textContent.toString());
-    }
-
-    private static void writeIdentifierWithNamespacePrefix(Element element, StringBuilder textContent, QName qName,
-            Map<String, String> prefixes) {
-        String namespace = qName.getNamespace().toString();
-        String prefix = prefixes.get(namespace);
-        if (prefix == null) {
-            prefix = qName.getPrefix();
-            if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) {
-                prefix = generateNewPrefix(prefixes.values());
-            }
-        }
-
-        element.setAttribute("xmlns:" + prefix, namespace.toString());
-        textContent.append(prefix);
-        prefixes.put(namespace, prefix);
-
-        textContent.append(":");
-        textContent.append(qName.getLocalName());
-    }
-
-    private static String generateNewPrefix(Collection<String> prefixes) {
-        StringBuilder result = null;
-        Random random = new Random();
-        do {
-            result = new StringBuilder();
-            for (int i = 0; i < 4; i++) {
-                int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
-                result.append(Character.toChars(randomNumber));
-            }
-        } while (prefixes.contains(result.toString()));
-
-        return result.toString();
-    }
-
-    public InstanceIdentifier getPath() {
-        return path;
-    }
-
-    public void setRegistration(ListenerRegistration<DataChangeListener> registration) {
-        this.registration = registration;
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public void close() throws Exception {
-        subscribers = new ConcurrentSet<>();
-        registration.close();
-        registration = null;
-        eventBus.unregister(eventBusChangeRecorder);
-    }
-
-    public boolean isListening() {
-        return registration == null ? false : true;
-    }
-
-    public void addSubscriber(Channel subscriber) {
-        if (!subscriber.isActive()) {
-            logger.debug("Channel is not active between websocket server and subscriber {}"
-                    + subscriber.remoteAddress());
-        }
-        Event event = new Event(EventType.REGISTER);
-        event.setSubscriber(subscriber);
-        eventBus.post(event);
-    }
-
-    public void removeSubscriber(Channel subscriber) {
-        logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
-        Event event = new Event(EventType.DEREGISTER);
-        event.setSubscriber(subscriber);
-        eventBus.post(event);
-    }
-
-    public boolean hasSubscribers() {
-        return !subscribers.isEmpty();
-    }
-
-    private static enum Store {
-        CONFIG("config"),
-        OPERATION("operation");
-
-        private final String value;
-
-        private Store(String value) {
-            this.value = value;
-        }
-    }
-
-    private static enum Operation {
-        CREATED("created"),
-        UPDATED("updated"),
-        DELETED("deleted");
-
-        private final String value;
-
-        private Operation(String value) {
-            this.value = value;
-        }
-    }
+       private static final Logger logger = LoggerFactory
+                       .getLogger(ListenerAdapter.class);
+       private final XmlMapper xmlMapper = new XmlMapper();
+       private final SimpleDateFormat rfc3339 = new SimpleDateFormat(
+                       "yyyy-MM-dd'T'hh:mm:ssZ");
+
+       private final InstanceIdentifier path;
+       private ListenerRegistration<DataChangeListener> registration;
+       private final String streamName;
+       private Set<Channel> subscribers = new ConcurrentSet<>();
+       private final EventBus eventBus;
+       private final EventBusChangeRecorder eventBusChangeRecorder;
+
+       /**
+        * Creates new {@link ListenerAdapter} listener specified by path and stream
+        * name.
+        * 
+        * @param path
+        *            Path to data in data store.
+        * @param streamName
+        *            The name of the stream.
+        */
+       ListenerAdapter(InstanceIdentifier path, String streamName) {
+               Preconditions.checkNotNull(path);
+               Preconditions
+                               .checkArgument(streamName != null && !streamName.isEmpty());
+               this.path = path;
+               this.streamName = streamName;
+               eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
+               eventBusChangeRecorder = new EventBusChangeRecorder();
+               eventBus.register(eventBusChangeRecorder);
+       }
+
+       @Override
+       public void onDataChanged(
+                       DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+               if (!change.getCreatedConfigurationData().isEmpty()
+                               || !change.getCreatedOperationalData().isEmpty()
+                               || !change.getUpdatedConfigurationData().isEmpty()
+                               || !change.getUpdatedOperationalData().isEmpty()
+                               || !change.getRemovedConfigurationData().isEmpty()
+                               || !change.getRemovedOperationalData().isEmpty()) {
+                       String xml = prepareXmlFrom(change);
+                       Event event = new Event(EventType.NOTIFY);
+                       event.setData(xml);
+                       eventBus.post(event);
+               }
+       }
+
+       /**
+        * Tracks events of data change by customer.
+        */
+       private final class EventBusChangeRecorder {
+               @Subscribe
+               public void recordCustomerChange(Event event) {
+                       if (event.getType() == EventType.REGISTER) {
+                               Channel subscriber = event.getSubscriber();
+                               if (!subscribers.contains(subscriber)) {
+                                       subscribers.add(subscriber);
+                               }
+                       } else if (event.getType() == EventType.DEREGISTER) {
+                               subscribers.remove(event.getSubscriber());
+                               Notificator
+                                               .removeListenerIfNoSubscriberExists(ListenerAdapter.this);
+                       } else if (event.getType() == EventType.NOTIFY) {
+                               for (Channel subscriber : subscribers) {
+                                       if (subscriber.isActive()) {
+                                               logger.debug("Data are sent to subscriber {}:",
+                                                               subscriber.remoteAddress());
+                                               subscriber.writeAndFlush(new TextWebSocketFrame(event
+                                                               .getData()));
+                                       } else {
+                                               logger.debug(
+                                                               "Subscriber {} is removed - channel is not active yet.",
+                                                               subscriber.remoteAddress());
+                                               subscribers.remove(subscriber);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Represents event of specific {@link EventType} type, holds data and
+        * {@link Channel} subscriber.
+        */
+       private final class Event {
+               private final EventType type;
+               private Channel subscriber;
+               private String data;
+
+               /**
+                * Creates new event specified by {@link EventType} type.
+                * 
+                * @param type
+                *            EventType
+                */
+               public Event(EventType type) {
+                       this.type = type;
+               }
+
+               /**
+                * Gets the {@link Channel} subscriber.
+                * 
+                * @return Channel
+                */
+               public Channel getSubscriber() {
+                       return subscriber;
+               }
+
+               /**
+                * Sets subscriber for event.
+                * 
+                * @param subscriber
+                *            Channel
+                */
+               public void setSubscriber(Channel subscriber) {
+                       this.subscriber = subscriber;
+               }
+
+               /**
+                * Gets event data.
+                * 
+                * @return String representation of event data.
+                */
+               public String getData() {
+                       return data;
+               }
+
+               /**
+                * Sets event data.
+                * 
+                * @param String
+                *            data.
+                */
+               public void setData(String data) {
+                       this.data = data;
+               }
+
+               /**
+                * Gets event type.
+                * 
+                * @return The type of the event.
+                */
+               public EventType getType() {
+                       return type;
+               }
+       }
+
+       /**
+        * Type of the event.
+        */
+       private enum EventType {
+               REGISTER, DEREGISTER, NOTIFY;
+       }
+
+       /**
+        * Prepare data in printable form and transform it to String.
+        * 
+        * @param change
+        *            DataChangeEvent
+        * @return Data in printable form.
+        */
+       private String prepareXmlFrom(
+                       DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+               Document doc = createDocument();
+               Element notificationElement = doc.createElementNS(
+                               "urn:ietf:params:xml:ns:netconf:notification:1.0",
+                               "notification");
+               doc.appendChild(notificationElement);
+
+               Element eventTimeElement = doc.createElement("eventTime");
+               eventTimeElement.setTextContent(toRFC3339(new Date()));
+               notificationElement.appendChild(eventTimeElement);
+
+               Element dataChangedNotificationEventElement = doc.createElementNS(
+                               "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote",
+                               "data-changed-notification");
+               addValuesToDataChangedNotificationEventElement(doc,
+                               dataChangedNotificationEventElement, change);
+               notificationElement.appendChild(dataChangedNotificationEventElement);
+
+               try {
+                       ByteArrayOutputStream out = new ByteArrayOutputStream();
+                       TransformerFactory tf = TransformerFactory.newInstance();
+                       Transformer transformer = tf.newTransformer();
+                       transformer
+                                       .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
+                       transformer.setOutputProperty(OutputKeys.METHOD, "xml");
+                       transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+                       transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+                       transformer.setOutputProperty(
+                                       "{http://xml.apache.org/xslt}indent-amount", "4");
+                       transformer.transform(new DOMSource(doc), new StreamResult(
+                                       new OutputStreamWriter(out, "UTF-8")));
+                       byte[] charData = out.toByteArray();
+                       return new String(charData, "UTF-8");
+               } catch (TransformerException | UnsupportedEncodingException e) {
+                       String msg = "Error during transformation of Document into String";
+                       logger.error(msg, e);
+                       return msg;
+               }
+       }
+
+       /**
+        * Formats data specified by RFC3339.
+        * 
+        * @param d
+        *            Date
+        * @return Data specified by RFC3339.
+        */
+       private String toRFC3339(Date d) {
+               return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
+       }
+
+       /**
+        * Creates {@link Document} document.
+        * 
+        * @return {@link Document} document.
+        */
+       private Document createDocument() {
+               DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+               Document doc = null;
+               try {
+                       DocumentBuilder bob = dbf.newDocumentBuilder();
+                       doc = bob.newDocument();
+               } catch (ParserConfigurationException e) {
+                       return null;
+               }
+               return doc;
+       }
+
+       /**
+        * Adds values to data changed notification event element.
+        * 
+        * @param doc
+        *            {@link Document}
+        * @param dataChangedNotificationEventElement
+        *            {@link Element}
+        * @param change
+        *            {@link DataChangeEvent}
+        */
+       private void addValuesToDataChangedNotificationEventElement(Document doc,
+                       Element dataChangedNotificationEventElement,
+                       DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
+               addValuesFromDataToElement(doc, change.getCreatedConfigurationData(),
+                               dataChangedNotificationEventElement, Store.CONFIG,
+                               Operation.CREATED);
+               addValuesFromDataToElement(doc, change.getCreatedOperationalData(),
+                               dataChangedNotificationEventElement, Store.OPERATION,
+                               Operation.CREATED);
+               if (change.getCreatedConfigurationData().isEmpty()) {
+                       addValuesFromDataToElement(doc,
+                                       change.getUpdatedConfigurationData(),
+                                       dataChangedNotificationEventElement, Store.CONFIG,
+                                       Operation.UPDATED);
+               }
+               if (change.getCreatedOperationalData().isEmpty()) {
+                       addValuesFromDataToElement(doc, change.getUpdatedOperationalData(),
+                                       dataChangedNotificationEventElement, Store.OPERATION,
+                                       Operation.UPDATED);
+               }
+               addValuesFromDataToElement(doc, change.getRemovedConfigurationData(),
+                               dataChangedNotificationEventElement, Store.CONFIG,
+                               Operation.DELETED);
+               addValuesFromDataToElement(doc, change.getRemovedOperationalData(),
+                               dataChangedNotificationEventElement, Store.OPERATION,
+                               Operation.DELETED);
+       }
+
+       /**
+        * Adds values from data to element.
+        * 
+        * @param doc
+        *            {@link Document}
+        * @param data
+        *            Set of {@link InstanceIdentifier}.
+        * @param element
+        *            {@link Element}
+        * @param store
+        *            {@link Store}
+        * @param operation
+        *            {@link Operation}
+        */
+       private void addValuesFromDataToElement(Document doc,
+                       Set<InstanceIdentifier> data, Element element, Store store,
+                       Operation operation) {
+               if (data == null || data.isEmpty()) {
+                       return;
+               }
+               for (InstanceIdentifier path : data) {
+                       Node node = createDataChangeEventElement(doc, path, null, store,
+                                       operation);
+                       element.appendChild(node);
+               }
+       }
+
+       /**
+        * Adds values from data to element.
+        * 
+        * @param doc
+        *            {@link Document}
+        * @param data
+        *            Map of {@link InstanceIdentifier} and {@link CompositeNode}.
+        * @param element
+        *            {@link Element}
+        * @param store
+        *            {@link Store}
+        * @param operation
+        *            {@link Operation}
+        */
+       private void addValuesFromDataToElement(Document doc,
+                       Map<InstanceIdentifier, CompositeNode> data, Element element,
+                       Store store, Operation operation) {
+               if (data == null || data.isEmpty()) {
+                       return;
+               }
+               for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
+                       Node node = createDataChangeEventElement(doc, entry.getKey(),
+                                       entry.getValue(), store, operation);
+                       element.appendChild(node);
+               }
+       }
+
+       /**
+        * Creates changed event element from data.
+        * 
+        * @param doc
+        *            {@link Document}
+        * @param path
+        *            Path to data in data store.
+        * @param data
+        *            {@link CompositeNode}
+        * @param store
+        *            {@link Store}
+        * @param operation
+        *            {@link Operation}
+        * @return {@link Node} node represented by changed event element.
+        */
+       private Node createDataChangeEventElement(Document doc,
+                       InstanceIdentifier path, CompositeNode data, Store store,
+                       Operation operation) {
+               Element dataChangeEventElement = doc.createElement("data-change-event");
+
+               Element pathElement = doc.createElement("path");
+               addPathAsValueToElement(path, pathElement);
+               dataChangeEventElement.appendChild(pathElement);
+
+               Element storeElement = doc.createElement("store");
+               storeElement.setTextContent(store.value);
+               dataChangeEventElement.appendChild(storeElement);
+
+               Element operationElement = doc.createElement("operation");
+               operationElement.setTextContent(operation.value);
+               dataChangeEventElement.appendChild(operationElement);
+
+               if (data != null) {
+                       Element dataElement = doc.createElement("data");
+                       Node dataAnyXml = translateToXml(path, data);
+                       Node adoptedNode = doc.adoptNode(dataAnyXml);
+                       dataElement.appendChild(adoptedNode);
+                       dataChangeEventElement.appendChild(dataElement);
+               }
+
+               return dataChangeEventElement;
+       }
+
+       /**
+        * Translates {@link CompositeNode} data to XML format.
+        * 
+        * @param path
+        *            Path to data in data store.
+        * @param data
+        *            {@link CompositeNode}
+        * @return Data in XML format.
+        */
+       private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
+               DataNodeContainer schemaNode = ControllerContext.getInstance()
+                               .getDataNodeContainerFor(path);
+               if (schemaNode == null) {
+                       logger.info(
+                                       "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.",
+                                       path);
+                       return null;
+               }
+               try {
+                       Document xml = xmlMapper.write(data, schemaNode);
+                       return xml.getFirstChild();
+               } catch (UnsupportedDataTypeException e) {
+                       logger.error(
+                                       "Error occured during translation of notification to XML.",
+                                       e);
+                       return null;
+               }
+       }
+
+       /**
+        * Adds path as value to element.
+        * 
+        * @param path
+        *            Path to data in data store.
+        * @param element
+        *            {@link Element}
+        */
+       private void addPathAsValueToElement(InstanceIdentifier path,
+                       Element element) {
+               // Map< key = namespace, value = prefix>
+               Map<String, String> prefixes = new HashMap<>();
+               InstanceIdentifier instanceIdentifier = path;
+               StringBuilder textContent = new StringBuilder();
+               for (PathArgument pathArgument : instanceIdentifier.getPath()) {
+                       textContent.append("/");
+                       writeIdentifierWithNamespacePrefix(element, textContent,
+                                       pathArgument.getNodeType(), prefixes);
+                       if (pathArgument instanceof NodeIdentifierWithPredicates) {
+                               Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument)
+                                               .getKeyValues();
+                               for (QName keyValue : predicates.keySet()) {
+                                       String predicateValue = String.valueOf(predicates
+                                                       .get(keyValue));
+                                       textContent.append("[");
+                                       writeIdentifierWithNamespacePrefix(element, textContent,
+                                                       keyValue, prefixes);
+                                       textContent.append("='");
+                                       textContent.append(predicateValue);
+                                       textContent.append("'");
+                                       textContent.append("]");
+                               }
+                       } else if (pathArgument instanceof NodeWithValue) {
+                               textContent.append("[.='");
+                               textContent.append(((NodeWithValue) pathArgument).getValue());
+                               textContent.append("'");
+                               textContent.append("]");
+                       }
+               }
+               element.setTextContent(textContent.toString());
+       }
+
+       /**
+        * Writes identifier that consists of prefix and QName.
+        * 
+        * @param element
+        *            {@link Element}
+        * @param textContent
+        *            StringBuilder
+        * @param qName
+        *            QName
+        * @param prefixes
+        *            Map of namespaces and prefixes.
+        */
+       private static void writeIdentifierWithNamespacePrefix(Element element,
+                       StringBuilder textContent, QName qName, Map<String, String> prefixes) {
+               String namespace = qName.getNamespace().toString();
+               String prefix = prefixes.get(namespace);
+               if (prefix == null) {
+                       prefix = qName.getPrefix();
+                       if (prefix == null || prefix.isEmpty()
+                                       || prefixes.containsValue(prefix)) {
+                               prefix = generateNewPrefix(prefixes.values());
+                       }
+               }
+
+               element.setAttribute("xmlns:" + prefix, namespace.toString());
+               textContent.append(prefix);
+               prefixes.put(namespace, prefix);
+
+               textContent.append(":");
+               textContent.append(qName.getLocalName());
+       }
+
+       /**
+        * Generates new prefix which consists of four random characters <a-z>.
+        * 
+        * @param prefixes
+        *            Collection of prefixes.
+        * @return New prefix which consists of four random characters <a-z>.
+        */
+       private static String generateNewPrefix(Collection<String> prefixes) {
+               StringBuilder result = null;
+               Random random = new Random();
+               do {
+                       result = new StringBuilder();
+                       for (int i = 0; i < 4; i++) {
+                               int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
+                               result.append(Character.toChars(randomNumber));
+                       }
+               } while (prefixes.contains(result.toString()));
+
+               return result.toString();
+       }
+
+       /**
+        * Gets path pointed to data in data store.
+        * 
+        * @return Path pointed to data in data store.
+        */
+       public InstanceIdentifier getPath() {
+               return path;
+       }
+
+       /**
+        * Sets {@link ListenerRegistration} registration.
+        * 
+        * @param registration
+        *            ListenerRegistration<DataChangeListener>
+        */
+       public void setRegistration(
+                       ListenerRegistration<DataChangeListener> registration) {
+               this.registration = registration;
+       }
+
+       /**
+        * Gets the name of the stream.
+        * 
+        * @return The name of the stream.
+        */
+       public String getStreamName() {
+               return streamName;
+       }
+
+       /**
+        * Removes all subscribers and unregisters event bus change recorder form
+        * event bus.
+        */
+       public void close() throws Exception {
+               subscribers = new ConcurrentSet<>();
+               registration.close();
+               registration = null;
+               eventBus.unregister(eventBusChangeRecorder);
+       }
+
+       /**
+        * Checks if {@link ListenerRegistration} registration exist.
+        * 
+        * @return True if exist, false otherwise.
+        */
+       public boolean isListening() {
+               return registration == null ? false : true;
+       }
+
+       /**
+        * Creates event of type {@link EventType#REGISTER}, set {@link Channel}
+        * subscriber to the event and post event into event bus.
+        * 
+        * @param subscriber
+        *            Channel
+        */
+       public void addSubscriber(Channel subscriber) {
+               if (!subscriber.isActive()) {
+                       logger.debug("Channel is not active between websocket server and subscriber {}"
+                                       + subscriber.remoteAddress());
+               }
+               Event event = new Event(EventType.REGISTER);
+               event.setSubscriber(subscriber);
+               eventBus.post(event);
+       }
+
+       /**
+        * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel}
+        * subscriber to the event and posts event into event bus.
+        * 
+        * @param subscriber
+        */
+       public void removeSubscriber(Channel subscriber) {
+               logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
+               Event event = new Event(EventType.DEREGISTER);
+               event.setSubscriber(subscriber);
+               eventBus.post(event);
+       }
+
+       /**
+        * Checks if exists at least one {@link Channel} subscriber.
+        * 
+        * @return True if exist at least one {@link Channel} subscriber, false
+        *         otherwise.
+        */
+       public boolean hasSubscribers() {
+               return !subscribers.isEmpty();
+       }
+
+       /**
+        * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}.
+        */
+       private static enum Store {
+               CONFIG("config"), OPERATION("operation");
+
+               private final String value;
+
+               private Store(String value) {
+                       this.value = value;
+               }
+       }
+
+       /**
+        * Consists of three types {@link Operation#CREATED},
+        * {@link Operation#UPDATED} and {@link Operation#DELETED}.
+        */
+       private static enum Operation {
+               CREATED("created"), UPDATED("updated"), DELETED("deleted");
+
+               private final String value;
+
+               private Operation(String value) {
+                       this.value = value;
+               }
+       }
 
 }
index d1cb25861ae0496c299cc9f4c67ebf5314a871e4..36c9c67ffcd3281db279c858176291f4da571ef7 100644 (file)
@@ -7,94 +7,164 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
+/**
+ * {@link Notificator} is responsible to create, remove and find {@link ListenerAdapter} listener.
+ */
 public class Notificator {
 
-    private static Map<String, ListenerAdapter> listenersByStreamName = new ConcurrentHashMap<>();
-    private static Map<InstanceIdentifier, ListenerAdapter> listenersByInstanceIdentifier = new ConcurrentHashMap<>();
-    private static final Lock lock = new ReentrantLock();
+       private static Map<String, ListenerAdapter> listenersByStreamName = new ConcurrentHashMap<>();
+       private static Map<InstanceIdentifier, ListenerAdapter> listenersByInstanceIdentifier = new ConcurrentHashMap<>();
+       private static final Lock lock = new ReentrantLock();
 
-    private Notificator() {
-    }
+       private Notificator() {
+       }
 
-    public static ListenerAdapter getListenerFor(String streamName) {
-        return listenersByStreamName.get(streamName);
-    }
+       /**
+        * Gets {@link ListenerAdapter} specified by stream name.
+        * 
+        * @param streamName
+        *            The name of the stream.
+        * @return {@link ListenerAdapter} specified by stream name.
+        */
+       public static ListenerAdapter getListenerFor(String streamName) {
+               return listenersByStreamName.get(streamName);
+       }
 
-    public static ListenerAdapter getListenerFor(InstanceIdentifier path) {
-        return listenersByInstanceIdentifier.get(path);
-    }
+       /**
+        * Gets {@link ListenerAdapter} listener specified by
+        * {@link InstanceIdentifier} path.
+        * 
+        * @param path
+        *            Path to data in data repository.
+        * @return ListenerAdapter
+        */
+       public static ListenerAdapter getListenerFor(InstanceIdentifier path) {
+               return listenersByInstanceIdentifier.get(path);
+       }
 
-    public static boolean existListenerFor(InstanceIdentifier path) {
-        return listenersByInstanceIdentifier.containsKey(path);
-    }
+       /**
+        * Checks if the listener specified by {@link InstanceIdentifier} path
+        * exist.
+        * 
+        * @param path
+        *            Path to data in data repository.
+        * @return True if the listener exist, false otherwise.
+        */
+       public static boolean existListenerFor(InstanceIdentifier path) {
+               return listenersByInstanceIdentifier.containsKey(path);
+       }
 
-    public static ListenerAdapter createListener(InstanceIdentifier path, String streamName) {
-        ListenerAdapter listener = new ListenerAdapter(path, streamName);
-        try {
-            lock.lock();
-            listenersByInstanceIdentifier.put(path, listener);
-            listenersByStreamName.put(streamName, listener);
-        } finally {
-            lock.unlock();
-        }
-        return listener;
-    }
+       /**
+        * Creates new {@link ListenerAdapter} listener from
+        * {@link InstanceIdentifier} path and stream name.
+        * 
+        * @param path
+        *            Path to data in data repository.
+        * @param streamName
+        *            The name of the stream.
+        * @return New {@link ListenerAdapter} listener from
+        *         {@link InstanceIdentifier} path and stream name.
+        */
+       public static ListenerAdapter createListener(InstanceIdentifier path,
+                       String streamName) {
+               ListenerAdapter listener = new ListenerAdapter(path, streamName);
+               try {
+                       lock.lock();
+                       listenersByInstanceIdentifier.put(path, listener);
+                       listenersByStreamName.put(streamName, listener);
+               } finally {
+                       lock.unlock();
+               }
+               return listener;
+       }
 
-    public static void removeListener(InstanceIdentifier path) {
-        ListenerAdapter listener = listenersByInstanceIdentifier.get(path);
-        deleteListener(listener);
-    }
+       /**
+        * Looks for listener determined by {@link InstanceIdentifier} path and
+        * removes it.
+        * 
+        * @param path
+        *            InstanceIdentifier
+        */
+       public static void removeListener(InstanceIdentifier path) {
+               ListenerAdapter listener = listenersByInstanceIdentifier.get(path);
+               deleteListener(listener);
+       }
 
-    public static String createStreamNameFromUri(String uri) {
-        if (uri == null) {
-            return null;
-        }
-        String result = uri;
-        if (result.startsWith("/")) {
-            result = result.substring(1);
-        }
-        if (result.endsWith("/")) {
-            result = result.substring(0, result.length());
-        }
-        return result;
-    }
+       /**
+        * Creates String representation of stream name from URI. Removes slash from
+        * URI in start and end position.
+        * 
+        * @param uri
+        *            URI for creation stream name.
+        * @return String representation of stream name.
+        */
+       public static String createStreamNameFromUri(String uri) {
+               if (uri == null) {
+                       return null;
+               }
+               String result = uri;
+               if (result.startsWith("/")) {
+                       result = result.substring(1);
+               }
+               if (result.endsWith("/")) {
+                       result = result.substring(0, result.length());
+               }
+               return result;
+       }
 
-    public static void removeAllListeners() {
-        for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) {
-            try {
-                listener.close();
-            } catch (Exception e) {
-            }
-        }
-        try {
-            lock.lock();
-            listenersByStreamName = new ConcurrentHashMap<>();
-            listenersByInstanceIdentifier = new ConcurrentHashMap<>();
-        } finally {
-            lock.unlock();
-        }
-    }
+       /**
+        * Removes all listeners.
+        */
+       public static void removeAllListeners() {
+               for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) {
+                       try {
+                               listener.close();
+                       } catch (Exception e) {
+                       }
+               }
+               try {
+                       lock.lock();
+                       listenersByStreamName = new ConcurrentHashMap<>();
+                       listenersByInstanceIdentifier = new ConcurrentHashMap<>();
+               } finally {
+                       lock.unlock();
+               }
+       }
 
-    public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) {
-        if (!listener.hasSubscribers()) {
-            deleteListener(listener);
-        }
-    }
+       /**
+        * Checks if listener has at least one subscriber. In case it has any, delete
+        * listener.
+        * 
+        * @param listener
+        *            ListenerAdapter
+        */
+       public static void removeListenerIfNoSubscriberExists(
+                       ListenerAdapter listener) {
+               if (!listener.hasSubscribers()) {
+                       deleteListener(listener);
+               }
+       }
 
-    private static void deleteListener(ListenerAdapter listener) {
-        if (listener != null) {
-            try {
-                listener.close();
-            } catch (Exception e) {
-            }
-            try {
-                lock.lock();
-                listenersByInstanceIdentifier.remove(listener.getPath());
-                listenersByStreamName.remove(listener.getStreamName());
-            } finally {
-                lock.unlock();
-            }
-        }
-    }
+       /**
+        * Delete {@link ListenerAdapter} listener specified in parameter.
+        * 
+        * @param listener
+        *            ListenerAdapter
+        */
+       private static void deleteListener(ListenerAdapter listener) {
+               if (listener != null) {
+                       try {
+                               listener.close();
+                       } catch (Exception e) {
+                       }
+                       try {
+                               lock.lock();
+                               listenersByInstanceIdentifier.remove(listener.getPath());
+                               listenersByStreamName.remove(listener.getStreamName());
+                       } finally {
+                               lock.unlock();
+                       }
+               }
+       }
 
 }
\ No newline at end of file
index 142cde14001ebeaab5530ef717ce11e6a498b0f5..d7b4bd9d9e0754c2bfc71958a845820c39eab8f1 100644 (file)
@@ -6,47 +6,56 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 
+/**
+ * {@link WebSocketServer} is responsible to start and stop web socket server at
+ * {@link #PORT}.
+ */
 public class WebSocketServer implements Runnable {
 
-    private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
-
-    public static final int PORT = 8181;
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
-
-    @Override
-    public void run() {
-        bossGroup = new NioEventLoopGroup();
-        workerGroup = new NioEventLoopGroup();
-        try {
-            ServerBootstrap b = new ServerBootstrap();
-            b.group(bossGroup, workerGroup)
-                .channel(NioServerSocketChannel.class)
-                .childHandler(new WebSocketServerInitializer());
-
-            Channel ch = b.bind(PORT).sync().channel();
-            logger.info("Web socket server started at port {}.", PORT);
-
-            ch.closeFuture().sync();
-        } catch (InterruptedException e) {
-            // NOOP
-        } finally {
-            stop();
-        }
-    }
-
-    private void stop() {
-        Notificator.removeAllListeners();
-        if (bossGroup != null) {
-            bossGroup.shutdownGracefully();
-        }
-        if (workerGroup != null) {
-            workerGroup.shutdownGracefully();
-        }
-    }
+       private static final Logger logger = LoggerFactory
+                       .getLogger(WebSocketServer.class);
+
+       public static final int PORT = 8181;
+       private EventLoopGroup bossGroup;
+       private EventLoopGroup workerGroup;
+
+       @Override
+       public void run() {
+               bossGroup = new NioEventLoopGroup();
+               workerGroup = new NioEventLoopGroup();
+               try {
+                       ServerBootstrap b = new ServerBootstrap();
+                       b.group(bossGroup, workerGroup)
+                                       .channel(NioServerSocketChannel.class)
+                                       .childHandler(new WebSocketServerInitializer());
+
+                       Channel ch = b.bind(PORT).sync().channel();
+                       logger.info("Web socket server started at port {}.", PORT);
+
+                       ch.closeFuture().sync();
+               } catch (InterruptedException e) {
+                       // NOOP
+               } finally {
+                       stop();
+               }
+       }
+
+       /**
+        * Stops the web socket server and removes all listeners.
+        */
+       private void stop() {
+               Notificator.removeAllListeners();
+               if (bossGroup != null) {
+                       bossGroup.shutdownGracefully();
+               }
+               if (workerGroup != null) {
+                       workerGroup.shutdownGracefully();
+               }
+       }
 
 }
index 618ee57abab305668d069f7a6031cc0fabc8d8f5..bf899a0b2521297603b5d538e0c5158e31dba2d7 100644 (file)
@@ -33,102 +33,154 @@ import org.opendaylight.controller.sal.streams.listeners.Notificator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * {@link WebSocketServerHandler} is implementation of
+ * {@link SimpleChannelInboundHandler} which allow handle
+ * {@link FullHttpRequest} and {@link WebSocketFrame} messages.
+ */
 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
 
-    private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class);
-
-    private WebSocketServerHandshaker handshaker;
-
-    @Override
-    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
-        if (msg instanceof FullHttpRequest) {
-            handleHttpRequest(ctx, (FullHttpRequest) msg);
-        } else if (msg instanceof WebSocketFrame) {
-            handleWebSocketFrame(ctx, (WebSocketFrame) msg);
-        }
-    }
-
-    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req)
-            throws Exception {
-        // Handle a bad request.
-        if (!req.getDecoderResult().isSuccess()) {
-            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
-            return;
-        }
-
-        // Allow only GET methods.
-        if (req.getMethod() != GET) {
-            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
-            return;
-        }
-
-        String streamName = Notificator.createStreamNameFromUri(req.getUri());
-        ListenerAdapter listener = Notificator.getListenerFor(streamName);
-        if (listener != null) {
-            listener.addSubscriber(ctx.channel());
-            logger.debug("Subscriber successfully registered.");
-        } else {
-            logger.error("Listener for stream with name '{}' was not found.", streamName);
-            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR));
-        }
-
-        // Handshake
-        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
-                getWebSocketLocation(req), null, false);
-        handshaker = wsFactory.newHandshaker(req);
-        if (handshaker == null) {
-            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
-        } else {
-            handshaker.handshake(ctx.channel(), req);
-        }
-
-    }
-
-    private static void sendHttpResponse(ChannelHandlerContext ctx,
-            HttpRequest req, FullHttpResponse res) {
-        // Generate an error page if response getStatus code is not OK (200).
-        if (res.getStatus().code() != 200) {
-            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
-            res.content().writeBytes(buf);
-            buf.release();
-            setContentLength(res, res.content().readableBytes());
-        }
-
-        // Send the response and close the connection if necessary.
-        ChannelFuture f = ctx.channel().writeAndFlush(res);
-        if (!isKeepAlive(req) || res.getStatus().code() != 200) {
-            f.addListener(ChannelFutureListener.CLOSE);
-        }
-    }
-
-    private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws IOException {
-        if (frame instanceof CloseWebSocketFrame) {
-            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
-            String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText());
-            ListenerAdapter listener = Notificator.getListenerFor(streamName);
-            if (listener != null) {
-                listener.removeSubscriber(ctx.channel());
-                logger.debug("Subscriber successfully registered.");
-            }
-            Notificator.removeListenerIfNoSubscriberExists(listener);
-            return;
-        } else if (frame instanceof PingWebSocketFrame) {
-            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
-            return;
-        }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-            throws Exception {
-        if (cause instanceof java.nio.channels.ClosedChannelException == false) {
-            //cause.printStackTrace();
-        }
-        ctx.close();
-    }
-
-    private static String getWebSocketLocation(HttpRequest req) {
-        return "http://" + req.headers().get(HOST) + req.getUri();
-    }
+       private static final Logger logger = LoggerFactory
+                       .getLogger(WebSocketServerHandler.class);
+
+       private WebSocketServerHandshaker handshaker;
+
+       @Override
+       protected void channelRead0(ChannelHandlerContext ctx, Object msg)
+                       throws Exception {
+               if (msg instanceof FullHttpRequest) {
+                       handleHttpRequest(ctx, (FullHttpRequest) msg);
+               } else if (msg instanceof WebSocketFrame) {
+                       handleWebSocketFrame(ctx, (WebSocketFrame) msg);
+               }
+       }
+
+       /**
+        * Checks if HTTP request method is GET and if is possible to decode HTTP
+        * result of request.
+        * 
+        * @param ctx
+        *            ChannelHandlerContext
+        * @param req
+        *            FullHttpRequest
+        */
+       private void handleHttpRequest(ChannelHandlerContext ctx,
+                       FullHttpRequest req) throws Exception {
+               // Handle a bad request.
+               if (!req.getDecoderResult().isSuccess()) {
+                       sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1,
+                                       BAD_REQUEST));
+                       return;
+               }
+
+               // Allow only GET methods.
+               if (req.getMethod() != GET) {
+                       sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1,
+                                       FORBIDDEN));
+                       return;
+               }
+
+               String streamName = Notificator.createStreamNameFromUri(req.getUri());
+               ListenerAdapter listener = Notificator.getListenerFor(streamName);
+               if (listener != null) {
+                       listener.addSubscriber(ctx.channel());
+                       logger.debug("Subscriber successfully registered.");
+               } else {
+                       logger.error("Listener for stream with name '{}' was not found.",
+                                       streamName);
+                       sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1,
+                                       INTERNAL_SERVER_ERROR));
+               }
+
+               // Handshake
+               WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
+                               getWebSocketLocation(req), null, false);
+               handshaker = wsFactory.newHandshaker(req);
+               if (handshaker == null) {
+                       WebSocketServerHandshakerFactory
+                                       .sendUnsupportedWebSocketVersionResponse(ctx.channel());
+               } else {
+                       handshaker.handshake(ctx.channel(), req);
+               }
+
+       }
+
+       /**
+        * Checks response status, send response and close connection if necessary
+        * 
+        * @param ctx
+        *            ChannelHandlerContext
+        * @param req
+        *            HttpRequest
+        * @param res
+        *            FullHttpResponse
+        */
+       private static void sendHttpResponse(ChannelHandlerContext ctx,
+                       HttpRequest req, FullHttpResponse res) {
+               // Generate an error page if response getStatus code is not OK (200).
+               if (res.getStatus().code() != 200) {
+                       ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),
+                                       CharsetUtil.UTF_8);
+                       res.content().writeBytes(buf);
+                       buf.release();
+                       setContentLength(res, res.content().readableBytes());
+               }
+
+               // Send the response and close the connection if necessary.
+               ChannelFuture f = ctx.channel().writeAndFlush(res);
+               if (!isKeepAlive(req) || res.getStatus().code() != 200) {
+                       f.addListener(ChannelFutureListener.CLOSE);
+               }
+       }
+
+       /**
+        * Handles web socket frame.
+        * 
+        * @param ctx
+        *            {@link ChannelHandlerContext}
+        * @param frame
+        *            {@link WebSocketFrame}
+        */
+       private void handleWebSocketFrame(ChannelHandlerContext ctx,
+                       WebSocketFrame frame) throws IOException {
+               if (frame instanceof CloseWebSocketFrame) {
+                       handshaker.close(ctx.channel(),
+                                       (CloseWebSocketFrame) frame.retain());
+                       String streamName = Notificator
+                                       .createStreamNameFromUri(((CloseWebSocketFrame) frame)
+                                                       .reasonText());
+                       ListenerAdapter listener = Notificator.getListenerFor(streamName);
+                       if (listener != null) {
+                               listener.removeSubscriber(ctx.channel());
+                               logger.debug("Subscriber successfully registered.");
+                       }
+                       Notificator.removeListenerIfNoSubscriberExists(listener);
+                       return;
+               } else if (frame instanceof PingWebSocketFrame) {
+                       ctx.channel().write(
+                                       new PongWebSocketFrame(frame.content().retain()));
+                       return;
+               }
+       }
+
+       @Override
+       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+                       throws Exception {
+               if (cause instanceof java.nio.channels.ClosedChannelException == false) {
+                       // cause.printStackTrace();
+               }
+               ctx.close();
+       }
+
+       /**
+        * Get web socket location from HTTP request.
+        * 
+        * @param req
+        *            HTTP request from which the location will be returned
+        * @return String representation of web socket location.
+        */
+       private static String getWebSocketLocation(HttpRequest req) {
+               return "http://" + req.headers().get(HOST) + req.getUri();
+       }
 
 }
index 5eb71ef491be83b727d82a891d15b1171f74c3cd..65ae5d6fab1af08b9aaabaed03e1cd824513fa7f 100644 (file)
@@ -1,19 +1,25 @@
 package org.opendaylight.controller.sal.streams.websockets;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
 
-public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
+/**
+ * {@link WebSocketServerInitializer} is used to setup the
+ * {@link ChannelPipeline} of a {@link Channel}.
+ */
+public class WebSocketServerInitializer extends
+               ChannelInitializer<SocketChannel> {
 
-    @Override
-    protected void initChannel(SocketChannel ch) throws Exception {
-        ChannelPipeline pipeline = ch.pipeline();
-        pipeline.addLast("codec-http", new HttpServerCodec());
-        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
-        pipeline.addLast("handler", new WebSocketServerHandler());
-    }
+       @Override
+       protected void initChannel(SocketChannel ch) throws Exception {
+               ChannelPipeline pipeline = ch.pipeline();
+               pipeline.addLast("codec-http", new HttpServerCodec());
+               pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
+               pipeline.addLast("handler", new WebSocketServerHandler());
+       }
 
 }
index e6659c226520f2cd7cf7c9de03716d333d9de7f5..9410d17007dc1072f16074d27193505805f52e06 100644 (file)
@@ -7,10 +7,10 @@
  */
 package org.opendaylight.controller.sal.restconf.broker.impl;
 
-import com.google.common.base.Optional;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+
 import org.opendaylight.controller.sal.binding.api.data.DataBrokerService;
 import org.opendaylight.controller.sal.binding.api.data.DataChangeListener;
 import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction;
@@ -33,11 +33,13 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+
 public class DataBrokerServiceImpl implements DataBrokerService  {
 
     private static final Logger logger = LoggerFactory.getLogger(DataBrokerServiceImpl.class.toString());
-    private RestconfClientContext restconfClientContext;
-    private SalRemoteService salRemoteService;
+    private final RestconfClientContext restconfClientContext;
+    private final SalRemoteService salRemoteService;
 
     public DataBrokerServiceImpl(RestconfClientContext restconfClientContext) {
         this.restconfClientContext = restconfClientContext;
@@ -152,7 +154,7 @@ public class DataBrokerServiceImpl implements DataBrokerService  {
     }
 
     private class SalRemoteDataListenerRegistration implements ListenerRegistration<DataChangeListener> {
-        private DataChangeListener dataChangeListener;
+        private final DataChangeListener dataChangeListener;
         public SalRemoteDataListenerRegistration(DataChangeListener dataChangeListener){
             this.dataChangeListener = dataChangeListener;
         }
@@ -161,7 +163,7 @@ public class DataBrokerServiceImpl implements DataBrokerService  {
             return this.dataChangeListener;
         }
         @Override
-        public void close() throws Exception {
+        public void close() {
             //noop
         }
     }
index ea9e599d83c06ef9af96493935f7ebe53081ef97..958b15978cf648114597047198c07375a7133802 100644 (file)
@@ -137,7 +137,7 @@ public class UserConfig extends ConfigurationObject implements Serializable {
     }
 
     public byte[] getSalt() {
-        return salt.clone();
+        return (salt == null) ? null : salt.clone();
     }
 
     @Override