From ac317ecb6d40d51ed026272b3b6b788e0e87ea9d Mon Sep 17 00:00:00 2001 From: Ladislav Borak Date: Fri, 21 Feb 2014 13:19:50 +0100 Subject: [PATCH] Added documentation into web socket in RestConf Change-Id: I36fae4c8916c6d25a1b9619390467e48b64e8a3d Signed-off-by: Ladislav Borak --- .../streams/listeners/ListenerAdapter.java | 959 +++++++++++------- .../sal/streams/listeners/Notificator.java | 224 ++-- .../streams/websockets/WebSocketServer.java | 81 +- .../websockets/WebSocketServerHandler.java | 242 +++-- .../WebSocketServerInitializer.java | 22 +- .../broker/impl/DataBrokerServiceImpl.java | 12 +- 6 files changed, 970 insertions(+), 570 deletions(-) diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java index fdd6ba0317..5fd76de98b 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/ListenerAdapter.java @@ -51,356 +51,617 @@ import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +/** + * {@link ListenerAdapter} is responsible to track events, which occurred by + * changing data in data source. + */ public class ListenerAdapter implements DataChangeListener { - private static final Logger logger = LoggerFactory.getLogger(ListenerAdapter.class); - private final XmlMapper xmlMapper = new XmlMapper(); - private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ"); - - private final InstanceIdentifier path; - private ListenerRegistration registration; - private final String streamName; - private Set subscribers = new ConcurrentSet<>(); - private final EventBus eventBus; - private final EventBusChangeRecorder eventBusChangeRecorder; - - ListenerAdapter(InstanceIdentifier path, String streamName) { - Preconditions.checkNotNull(path); - Preconditions.checkArgument(streamName != null && !streamName.isEmpty()); - this.path = path; - this.streamName = streamName; - eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); - eventBusChangeRecorder = new EventBusChangeRecorder(); - eventBus.register(eventBusChangeRecorder); - } - - @Override - public void onDataChanged(DataChangeEvent change) { - if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty() - || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty() - || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) { - String xml = prepareXmlFrom(change); - Event event = new Event(EventType.NOTIFY); - event.setData(xml); - eventBus.post(event); - } - } - - private final class EventBusChangeRecorder { - @Subscribe public void recordCustomerChange(Event event) { - if (event.getType() == EventType.REGISTER) { - Channel subscriber = event.getSubscriber(); - if (!subscribers.contains(subscriber)) { - subscribers.add(subscriber); - } - } else if (event.getType() == EventType.DEREGISTER) { - subscribers.remove(event.getSubscriber()); - Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this); - } else if (event.getType() == EventType.NOTIFY) { - for (Channel subscriber : subscribers) { - if (subscriber.isActive()) { - logger.debug("Data are sent to subscriber {}:", subscriber.remoteAddress()); - subscriber.writeAndFlush(new TextWebSocketFrame(event.getData())); - } else { - logger.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress()); - subscribers.remove(subscriber); - } - } - } - } - } - - private final class Event { - private final EventType type; - private Channel subscriber; - private String data; - - public Event(EventType type) { - this.type = type; - } - - public Channel getSubscriber() { - return subscriber; - } - - public void setSubscriber(Channel subscriber) { - this.subscriber = subscriber; - } - - public String getData() { - return data; - } - - public void setData(String data) { - this.data = data; - } - - public EventType getType() { - return type; - } - } - - private enum EventType { - REGISTER, - DEREGISTER, - NOTIFY; - } - - private String prepareXmlFrom(DataChangeEvent change) { - Document doc = createDocument(); - Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0", - "notification"); - doc.appendChild(notificationElement); - - Element eventTimeElement = doc.createElement("eventTime"); - eventTimeElement.setTextContent(toRFC3339(new Date())); - notificationElement.appendChild(eventTimeElement); - - Element dataChangedNotificationEventElement = doc.createElementNS( - "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification"); - addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change); - notificationElement.appendChild(dataChangedNotificationEventElement); - - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - TransformerFactory tf = TransformerFactory.newInstance(); - Transformer transformer = tf.newTransformer(); - transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); - transformer.setOutputProperty(OutputKeys.METHOD, "xml"); - transformer.setOutputProperty(OutputKeys.INDENT, "yes"); - transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); - transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4"); - transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, "UTF-8"))); - byte[] charData = out.toByteArray(); - return new String(charData, "UTF-8"); - } catch (TransformerException | UnsupportedEncodingException e) { - String msg = "Error during transformation of Document into String"; - logger.error(msg, e); - return msg; - } - } - - private String toRFC3339(Date d) { - return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2"); - } - - private Document createDocument() { - DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); - Document doc = null; - try { - DocumentBuilder bob = dbf.newDocumentBuilder(); - doc = bob.newDocument(); - } catch (ParserConfigurationException e) { - return null; - } - return doc; - } - - private void addValuesToDataChangedNotificationEventElement(Document doc, - Element dataChangedNotificationEventElement, DataChangeEvent change) { - addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.CREATED); - addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.CREATED); - if (change.getCreatedConfigurationData().isEmpty()) { - addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.UPDATED); - } - if (change.getCreatedOperationalData().isEmpty()) { - addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.UPDATED); - } - addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.DELETED); - addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.DELETED); - } - - private void addValuesFromDataToElement(Document doc, Set data, Element element, Store store, - Operation operation) { - if (data == null || data.isEmpty()) { - return; - } - for (InstanceIdentifier path : data) { - Node node = createDataChangeEventElement(doc, path, null, store, operation); - element.appendChild(node); - } - } - - private void addValuesFromDataToElement(Document doc, Map data, Element element, Store store, - Operation operation) { - if (data == null || data.isEmpty()) { - return; - } - for (Entry entry : data.entrySet()) { - Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation); - element.appendChild(node); - } - } - - private Node createDataChangeEventElement(Document doc, InstanceIdentifier path, CompositeNode data, Store store, - Operation operation) { - Element dataChangeEventElement = doc.createElement("data-change-event"); - - Element pathElement = doc.createElement("path"); - addPathAsValueToElement(path, pathElement); - dataChangeEventElement.appendChild(pathElement); - - Element storeElement = doc.createElement("store"); - storeElement.setTextContent(store.value); - dataChangeEventElement.appendChild(storeElement); - - Element operationElement = doc.createElement("operation"); - operationElement.setTextContent(operation.value); - dataChangeEventElement.appendChild(operationElement); - - if (data != null) { - Element dataElement = doc.createElement("data"); - Node dataAnyXml = translateToXml(path, data); - Node adoptedNode = doc.adoptNode(dataAnyXml); - dataElement.appendChild(adoptedNode); - dataChangeEventElement.appendChild(dataElement); - } - - return dataChangeEventElement; - } - - private Node translateToXml(InstanceIdentifier path, CompositeNode data) { - DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path); - if (schemaNode == null) { - logger.info("Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", path); - return null; - } - try { - Document xml = xmlMapper.write(data, schemaNode); - return xml.getFirstChild(); - } catch (UnsupportedDataTypeException e) { - logger.error("Error occured during translation of notification to XML.", e); - return null; - } - } - - private void addPathAsValueToElement(InstanceIdentifier path, Element element) { - // Map< key = namespace, value = prefix> - Map prefixes = new HashMap<>(); - InstanceIdentifier instanceIdentifier = path; - StringBuilder textContent = new StringBuilder(); - for (PathArgument pathArgument : instanceIdentifier.getPath()) { - textContent.append("/"); - writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes); - if (pathArgument instanceof NodeIdentifierWithPredicates) { - Map predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues(); - for (QName keyValue : predicates.keySet()) { - String predicateValue = String.valueOf(predicates.get(keyValue)); - textContent.append("["); - writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes); - textContent.append("='"); - textContent.append(predicateValue); - textContent.append("'"); - textContent.append("]"); - } - } else if (pathArgument instanceof NodeWithValue) { - textContent.append("[.='"); - textContent.append(((NodeWithValue)pathArgument).getValue()); - textContent.append("'"); - textContent.append("]"); - } - } - element.setTextContent(textContent.toString()); - } - - private static void writeIdentifierWithNamespacePrefix(Element element, StringBuilder textContent, QName qName, - Map prefixes) { - String namespace = qName.getNamespace().toString(); - String prefix = prefixes.get(namespace); - if (prefix == null) { - prefix = qName.getPrefix(); - if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) { - prefix = generateNewPrefix(prefixes.values()); - } - } - - element.setAttribute("xmlns:" + prefix, namespace.toString()); - textContent.append(prefix); - prefixes.put(namespace, prefix); - - textContent.append(":"); - textContent.append(qName.getLocalName()); - } - - private static String generateNewPrefix(Collection prefixes) { - StringBuilder result = null; - Random random = new Random(); - do { - result = new StringBuilder(); - for (int i = 0; i < 4; i++) { - int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26); - result.append(Character.toChars(randomNumber)); - } - } while (prefixes.contains(result.toString())); - - return result.toString(); - } - - public InstanceIdentifier getPath() { - return path; - } - - public void setRegistration(ListenerRegistration registration) { - this.registration = registration; - } - - public String getStreamName() { - return streamName; - } - - public void close() throws Exception { - subscribers = new ConcurrentSet<>(); - registration.close(); - registration = null; - eventBus.unregister(eventBusChangeRecorder); - } - - public boolean isListening() { - return registration == null ? false : true; - } - - public void addSubscriber(Channel subscriber) { - if (!subscriber.isActive()) { - logger.debug("Channel is not active between websocket server and subscriber {}" - + subscriber.remoteAddress()); - } - Event event = new Event(EventType.REGISTER); - event.setSubscriber(subscriber); - eventBus.post(event); - } - - public void removeSubscriber(Channel subscriber) { - logger.debug("Subscriber {} is removed.", subscriber.remoteAddress()); - Event event = new Event(EventType.DEREGISTER); - event.setSubscriber(subscriber); - eventBus.post(event); - } - - public boolean hasSubscribers() { - return !subscribers.isEmpty(); - } - - private static enum Store { - CONFIG("config"), - OPERATION("operation"); - - private final String value; - - private Store(String value) { - this.value = value; - } - } - - private static enum Operation { - CREATED("created"), - UPDATED("updated"), - DELETED("deleted"); - - private final String value; - - private Operation(String value) { - this.value = value; - } - } + private static final Logger logger = LoggerFactory + .getLogger(ListenerAdapter.class); + private final XmlMapper xmlMapper = new XmlMapper(); + private final SimpleDateFormat rfc3339 = new SimpleDateFormat( + "yyyy-MM-dd'T'hh:mm:ssZ"); + + private final InstanceIdentifier path; + private ListenerRegistration registration; + private final String streamName; + private Set subscribers = new ConcurrentSet<>(); + private final EventBus eventBus; + private final EventBusChangeRecorder eventBusChangeRecorder; + + /** + * Creates new {@link ListenerAdapter} listener specified by path and stream + * name. + * + * @param path + * Path to data in data store. + * @param streamName + * The name of the stream. + */ + ListenerAdapter(InstanceIdentifier path, String streamName) { + Preconditions.checkNotNull(path); + Preconditions + .checkArgument(streamName != null && !streamName.isEmpty()); + this.path = path; + this.streamName = streamName; + eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor()); + eventBusChangeRecorder = new EventBusChangeRecorder(); + eventBus.register(eventBusChangeRecorder); + } + + @Override + public void onDataChanged( + DataChangeEvent change) { + if (!change.getCreatedConfigurationData().isEmpty() + || !change.getCreatedOperationalData().isEmpty() + || !change.getUpdatedConfigurationData().isEmpty() + || !change.getUpdatedOperationalData().isEmpty() + || !change.getRemovedConfigurationData().isEmpty() + || !change.getRemovedOperationalData().isEmpty()) { + String xml = prepareXmlFrom(change); + Event event = new Event(EventType.NOTIFY); + event.setData(xml); + eventBus.post(event); + } + } + + /** + * Tracks events of data change by customer. + */ + private final class EventBusChangeRecorder { + @Subscribe + public void recordCustomerChange(Event event) { + if (event.getType() == EventType.REGISTER) { + Channel subscriber = event.getSubscriber(); + if (!subscribers.contains(subscriber)) { + subscribers.add(subscriber); + } + } else if (event.getType() == EventType.DEREGISTER) { + subscribers.remove(event.getSubscriber()); + Notificator + .removeListenerIfNoSubscriberExists(ListenerAdapter.this); + } else if (event.getType() == EventType.NOTIFY) { + for (Channel subscriber : subscribers) { + if (subscriber.isActive()) { + logger.debug("Data are sent to subscriber {}:", + subscriber.remoteAddress()); + subscriber.writeAndFlush(new TextWebSocketFrame(event + .getData())); + } else { + logger.debug( + "Subscriber {} is removed - channel is not active yet.", + subscriber.remoteAddress()); + subscribers.remove(subscriber); + } + } + } + } + } + + /** + * Represents event of specific {@link EventType} type, holds data and + * {@link Channel} subscriber. + */ + private final class Event { + private final EventType type; + private Channel subscriber; + private String data; + + /** + * Creates new event specified by {@link EventType} type. + * + * @param type + * EventType + */ + public Event(EventType type) { + this.type = type; + } + + /** + * Gets the {@link Channel} subscriber. + * + * @return Channel + */ + public Channel getSubscriber() { + return subscriber; + } + + /** + * Sets subscriber for event. + * + * @param subscriber + * Channel + */ + public void setSubscriber(Channel subscriber) { + this.subscriber = subscriber; + } + + /** + * Gets event data. + * + * @return String representation of event data. + */ + public String getData() { + return data; + } + + /** + * Sets event data. + * + * @param String + * data. + */ + public void setData(String data) { + this.data = data; + } + + /** + * Gets event type. + * + * @return The type of the event. + */ + public EventType getType() { + return type; + } + } + + /** + * Type of the event. + */ + private enum EventType { + REGISTER, DEREGISTER, NOTIFY; + } + + /** + * Prepare data in printable form and transform it to String. + * + * @param change + * DataChangeEvent + * @return Data in printable form. + */ + private String prepareXmlFrom( + DataChangeEvent change) { + Document doc = createDocument(); + Element notificationElement = doc.createElementNS( + "urn:ietf:params:xml:ns:netconf:notification:1.0", + "notification"); + doc.appendChild(notificationElement); + + Element eventTimeElement = doc.createElement("eventTime"); + eventTimeElement.setTextContent(toRFC3339(new Date())); + notificationElement.appendChild(eventTimeElement); + + Element dataChangedNotificationEventElement = doc.createElementNS( + "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", + "data-changed-notification"); + addValuesToDataChangedNotificationEventElement(doc, + dataChangedNotificationEventElement, change); + notificationElement.appendChild(dataChangedNotificationEventElement); + + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + TransformerFactory tf = TransformerFactory.newInstance(); + Transformer transformer = tf.newTransformer(); + transformer + .setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no"); + transformer.setOutputProperty(OutputKeys.METHOD, "xml"); + transformer.setOutputProperty(OutputKeys.INDENT, "yes"); + transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); + transformer.setOutputProperty( + "{http://xml.apache.org/xslt}indent-amount", "4"); + transformer.transform(new DOMSource(doc), new StreamResult( + new OutputStreamWriter(out, "UTF-8"))); + byte[] charData = out.toByteArray(); + return new String(charData, "UTF-8"); + } catch (TransformerException | UnsupportedEncodingException e) { + String msg = "Error during transformation of Document into String"; + logger.error(msg, e); + return msg; + } + } + + /** + * Formats data specified by RFC3339. + * + * @param d + * Date + * @return Data specified by RFC3339. + */ + private String toRFC3339(Date d) { + return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2"); + } + + /** + * Creates {@link Document} document. + * + * @return {@link Document} document. + */ + private Document createDocument() { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + Document doc = null; + try { + DocumentBuilder bob = dbf.newDocumentBuilder(); + doc = bob.newDocument(); + } catch (ParserConfigurationException e) { + return null; + } + return doc; + } + + /** + * Adds values to data changed notification event element. + * + * @param doc + * {@link Document} + * @param dataChangedNotificationEventElement + * {@link Element} + * @param change + * {@link DataChangeEvent} + */ + private void addValuesToDataChangedNotificationEventElement(Document doc, + Element dataChangedNotificationEventElement, + DataChangeEvent change) { + addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), + dataChangedNotificationEventElement, Store.CONFIG, + Operation.CREATED); + addValuesFromDataToElement(doc, change.getCreatedOperationalData(), + dataChangedNotificationEventElement, Store.OPERATION, + Operation.CREATED); + if (change.getCreatedConfigurationData().isEmpty()) { + addValuesFromDataToElement(doc, + change.getUpdatedConfigurationData(), + dataChangedNotificationEventElement, Store.CONFIG, + Operation.UPDATED); + } + if (change.getCreatedOperationalData().isEmpty()) { + addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), + dataChangedNotificationEventElement, Store.OPERATION, + Operation.UPDATED); + } + addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), + dataChangedNotificationEventElement, Store.CONFIG, + Operation.DELETED); + addValuesFromDataToElement(doc, change.getRemovedOperationalData(), + dataChangedNotificationEventElement, Store.OPERATION, + Operation.DELETED); + } + + /** + * Adds values from data to element. + * + * @param doc + * {@link Document} + * @param data + * Set of {@link InstanceIdentifier}. + * @param element + * {@link Element} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + */ + private void addValuesFromDataToElement(Document doc, + Set data, Element element, Store store, + Operation operation) { + if (data == null || data.isEmpty()) { + return; + } + for (InstanceIdentifier path : data) { + Node node = createDataChangeEventElement(doc, path, null, store, + operation); + element.appendChild(node); + } + } + + /** + * Adds values from data to element. + * + * @param doc + * {@link Document} + * @param data + * Map of {@link InstanceIdentifier} and {@link CompositeNode}. + * @param element + * {@link Element} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + */ + private void addValuesFromDataToElement(Document doc, + Map data, Element element, + Store store, Operation operation) { + if (data == null || data.isEmpty()) { + return; + } + for (Entry entry : data.entrySet()) { + Node node = createDataChangeEventElement(doc, entry.getKey(), + entry.getValue(), store, operation); + element.appendChild(node); + } + } + + /** + * Creates changed event element from data. + * + * @param doc + * {@link Document} + * @param path + * Path to data in data store. + * @param data + * {@link CompositeNode} + * @param store + * {@link Store} + * @param operation + * {@link Operation} + * @return {@link Node} node represented by changed event element. + */ + private Node createDataChangeEventElement(Document doc, + InstanceIdentifier path, CompositeNode data, Store store, + Operation operation) { + Element dataChangeEventElement = doc.createElement("data-change-event"); + + Element pathElement = doc.createElement("path"); + addPathAsValueToElement(path, pathElement); + dataChangeEventElement.appendChild(pathElement); + + Element storeElement = doc.createElement("store"); + storeElement.setTextContent(store.value); + dataChangeEventElement.appendChild(storeElement); + + Element operationElement = doc.createElement("operation"); + operationElement.setTextContent(operation.value); + dataChangeEventElement.appendChild(operationElement); + + if (data != null) { + Element dataElement = doc.createElement("data"); + Node dataAnyXml = translateToXml(path, data); + Node adoptedNode = doc.adoptNode(dataAnyXml); + dataElement.appendChild(adoptedNode); + dataChangeEventElement.appendChild(dataElement); + } + + return dataChangeEventElement; + } + + /** + * Translates {@link CompositeNode} data to XML format. + * + * @param path + * Path to data in data store. + * @param data + * {@link CompositeNode} + * @return Data in XML format. + */ + private Node translateToXml(InstanceIdentifier path, CompositeNode data) { + DataNodeContainer schemaNode = ControllerContext.getInstance() + .getDataNodeContainerFor(path); + if (schemaNode == null) { + logger.info( + "Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", + path); + return null; + } + try { + Document xml = xmlMapper.write(data, schemaNode); + return xml.getFirstChild(); + } catch (UnsupportedDataTypeException e) { + logger.error( + "Error occured during translation of notification to XML.", + e); + return null; + } + } + + /** + * Adds path as value to element. + * + * @param path + * Path to data in data store. + * @param element + * {@link Element} + */ + private void addPathAsValueToElement(InstanceIdentifier path, + Element element) { + // Map< key = namespace, value = prefix> + Map prefixes = new HashMap<>(); + InstanceIdentifier instanceIdentifier = path; + StringBuilder textContent = new StringBuilder(); + for (PathArgument pathArgument : instanceIdentifier.getPath()) { + textContent.append("/"); + writeIdentifierWithNamespacePrefix(element, textContent, + pathArgument.getNodeType(), prefixes); + if (pathArgument instanceof NodeIdentifierWithPredicates) { + Map predicates = ((NodeIdentifierWithPredicates) pathArgument) + .getKeyValues(); + for (QName keyValue : predicates.keySet()) { + String predicateValue = String.valueOf(predicates + .get(keyValue)); + textContent.append("["); + writeIdentifierWithNamespacePrefix(element, textContent, + keyValue, prefixes); + textContent.append("='"); + textContent.append(predicateValue); + textContent.append("'"); + textContent.append("]"); + } + } else if (pathArgument instanceof NodeWithValue) { + textContent.append("[.='"); + textContent.append(((NodeWithValue) pathArgument).getValue()); + textContent.append("'"); + textContent.append("]"); + } + } + element.setTextContent(textContent.toString()); + } + + /** + * Writes identifier that consists of prefix and QName. + * + * @param element + * {@link Element} + * @param textContent + * StringBuilder + * @param qName + * QName + * @param prefixes + * Map of namespaces and prefixes. + */ + private static void writeIdentifierWithNamespacePrefix(Element element, + StringBuilder textContent, QName qName, Map prefixes) { + String namespace = qName.getNamespace().toString(); + String prefix = prefixes.get(namespace); + if (prefix == null) { + prefix = qName.getPrefix(); + if (prefix == null || prefix.isEmpty() + || prefixes.containsValue(prefix)) { + prefix = generateNewPrefix(prefixes.values()); + } + } + + element.setAttribute("xmlns:" + prefix, namespace.toString()); + textContent.append(prefix); + prefixes.put(namespace, prefix); + + textContent.append(":"); + textContent.append(qName.getLocalName()); + } + + /** + * Generates new prefix which consists of four random characters . + * + * @param prefixes + * Collection of prefixes. + * @return New prefix which consists of four random characters . + */ + private static String generateNewPrefix(Collection prefixes) { + StringBuilder result = null; + Random random = new Random(); + do { + result = new StringBuilder(); + for (int i = 0; i < 4; i++) { + int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26); + result.append(Character.toChars(randomNumber)); + } + } while (prefixes.contains(result.toString())); + + return result.toString(); + } + + /** + * Gets path pointed to data in data store. + * + * @return Path pointed to data in data store. + */ + public InstanceIdentifier getPath() { + return path; + } + + /** + * Sets {@link ListenerRegistration} registration. + * + * @param registration + * ListenerRegistration + */ + public void setRegistration( + ListenerRegistration registration) { + this.registration = registration; + } + + /** + * Gets the name of the stream. + * + * @return The name of the stream. + */ + public String getStreamName() { + return streamName; + } + + /** + * Removes all subscribers and unregisters event bus change recorder form + * event bus. + */ + public void close() throws Exception { + subscribers = new ConcurrentSet<>(); + registration.close(); + registration = null; + eventBus.unregister(eventBusChangeRecorder); + } + + /** + * Checks if {@link ListenerRegistration} registration exist. + * + * @return True if exist, false otherwise. + */ + public boolean isListening() { + return registration == null ? false : true; + } + + /** + * Creates event of type {@link EventType#REGISTER}, set {@link Channel} + * subscriber to the event and post event into event bus. + * + * @param subscriber + * Channel + */ + public void addSubscriber(Channel subscriber) { + if (!subscriber.isActive()) { + logger.debug("Channel is not active between websocket server and subscriber {}" + + subscriber.remoteAddress()); + } + Event event = new Event(EventType.REGISTER); + event.setSubscriber(subscriber); + eventBus.post(event); + } + + /** + * Creates event of type {@link EventType#DEREGISTER}, sets {@link Channel} + * subscriber to the event and posts event into event bus. + * + * @param subscriber + */ + public void removeSubscriber(Channel subscriber) { + logger.debug("Subscriber {} is removed.", subscriber.remoteAddress()); + Event event = new Event(EventType.DEREGISTER); + event.setSubscriber(subscriber); + eventBus.post(event); + } + + /** + * Checks if exists at least one {@link Channel} subscriber. + * + * @return True if exist at least one {@link Channel} subscriber, false + * otherwise. + */ + public boolean hasSubscribers() { + return !subscribers.isEmpty(); + } + + /** + * Consists of two types {@link Store#CONFIG} and {@link Store#OPERATION}. + */ + private static enum Store { + CONFIG("config"), OPERATION("operation"); + + private final String value; + + private Store(String value) { + this.value = value; + } + } + + /** + * Consists of three types {@link Operation#CREATED}, + * {@link Operation#UPDATED} and {@link Operation#DELETED}. + */ + private static enum Operation { + CREATED("created"), UPDATED("updated"), DELETED("deleted"); + + private final String value; + + private Operation(String value) { + this.value = value; + } + } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java index d1cb25861a..36c9c67ffc 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/listeners/Notificator.java @@ -7,94 +7,164 @@ import java.util.concurrent.locks.ReentrantLock; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +/** + * {@link Notificator} is responsible to create, remove and find {@link ListenerAdapter} listener. + */ public class Notificator { - private static Map listenersByStreamName = new ConcurrentHashMap<>(); - private static Map listenersByInstanceIdentifier = new ConcurrentHashMap<>(); - private static final Lock lock = new ReentrantLock(); + private static Map listenersByStreamName = new ConcurrentHashMap<>(); + private static Map listenersByInstanceIdentifier = new ConcurrentHashMap<>(); + private static final Lock lock = new ReentrantLock(); - private Notificator() { - } + private Notificator() { + } - public static ListenerAdapter getListenerFor(String streamName) { - return listenersByStreamName.get(streamName); - } + /** + * Gets {@link ListenerAdapter} specified by stream name. + * + * @param streamName + * The name of the stream. + * @return {@link ListenerAdapter} specified by stream name. + */ + public static ListenerAdapter getListenerFor(String streamName) { + return listenersByStreamName.get(streamName); + } - public static ListenerAdapter getListenerFor(InstanceIdentifier path) { - return listenersByInstanceIdentifier.get(path); - } + /** + * Gets {@link ListenerAdapter} listener specified by + * {@link InstanceIdentifier} path. + * + * @param path + * Path to data in data repository. + * @return ListenerAdapter + */ + public static ListenerAdapter getListenerFor(InstanceIdentifier path) { + return listenersByInstanceIdentifier.get(path); + } - public static boolean existListenerFor(InstanceIdentifier path) { - return listenersByInstanceIdentifier.containsKey(path); - } + /** + * Checks if the listener specified by {@link InstanceIdentifier} path + * exist. + * + * @param path + * Path to data in data repository. + * @return True if the listener exist, false otherwise. + */ + public static boolean existListenerFor(InstanceIdentifier path) { + return listenersByInstanceIdentifier.containsKey(path); + } - public static ListenerAdapter createListener(InstanceIdentifier path, String streamName) { - ListenerAdapter listener = new ListenerAdapter(path, streamName); - try { - lock.lock(); - listenersByInstanceIdentifier.put(path, listener); - listenersByStreamName.put(streamName, listener); - } finally { - lock.unlock(); - } - return listener; - } + /** + * Creates new {@link ListenerAdapter} listener from + * {@link InstanceIdentifier} path and stream name. + * + * @param path + * Path to data in data repository. + * @param streamName + * The name of the stream. + * @return New {@link ListenerAdapter} listener from + * {@link InstanceIdentifier} path and stream name. + */ + public static ListenerAdapter createListener(InstanceIdentifier path, + String streamName) { + ListenerAdapter listener = new ListenerAdapter(path, streamName); + try { + lock.lock(); + listenersByInstanceIdentifier.put(path, listener); + listenersByStreamName.put(streamName, listener); + } finally { + lock.unlock(); + } + return listener; + } - public static void removeListener(InstanceIdentifier path) { - ListenerAdapter listener = listenersByInstanceIdentifier.get(path); - deleteListener(listener); - } + /** + * Looks for listener determined by {@link InstanceIdentifier} path and + * removes it. + * + * @param path + * InstanceIdentifier + */ + public static void removeListener(InstanceIdentifier path) { + ListenerAdapter listener = listenersByInstanceIdentifier.get(path); + deleteListener(listener); + } - public static String createStreamNameFromUri(String uri) { - if (uri == null) { - return null; - } - String result = uri; - if (result.startsWith("/")) { - result = result.substring(1); - } - if (result.endsWith("/")) { - result = result.substring(0, result.length()); - } - return result; - } + /** + * Creates String representation of stream name from URI. Removes slash from + * URI in start and end position. + * + * @param uri + * URI for creation stream name. + * @return String representation of stream name. + */ + public static String createStreamNameFromUri(String uri) { + if (uri == null) { + return null; + } + String result = uri; + if (result.startsWith("/")) { + result = result.substring(1); + } + if (result.endsWith("/")) { + result = result.substring(0, result.length()); + } + return result; + } - public static void removeAllListeners() { - for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) { - try { - listener.close(); - } catch (Exception e) { - } - } - try { - lock.lock(); - listenersByStreamName = new ConcurrentHashMap<>(); - listenersByInstanceIdentifier = new ConcurrentHashMap<>(); - } finally { - lock.unlock(); - } - } + /** + * Removes all listeners. + */ + public static void removeAllListeners() { + for (ListenerAdapter listener : listenersByInstanceIdentifier.values()) { + try { + listener.close(); + } catch (Exception e) { + } + } + try { + lock.lock(); + listenersByStreamName = new ConcurrentHashMap<>(); + listenersByInstanceIdentifier = new ConcurrentHashMap<>(); + } finally { + lock.unlock(); + } + } - public static void removeListenerIfNoSubscriberExists(ListenerAdapter listener) { - if (!listener.hasSubscribers()) { - deleteListener(listener); - } - } + /** + * Checks if listener has at least one subscriber. In case it has any, delete + * listener. + * + * @param listener + * ListenerAdapter + */ + public static void removeListenerIfNoSubscriberExists( + ListenerAdapter listener) { + if (!listener.hasSubscribers()) { + deleteListener(listener); + } + } - private static void deleteListener(ListenerAdapter listener) { - if (listener != null) { - try { - listener.close(); - } catch (Exception e) { - } - try { - lock.lock(); - listenersByInstanceIdentifier.remove(listener.getPath()); - listenersByStreamName.remove(listener.getStreamName()); - } finally { - lock.unlock(); - } - } - } + /** + * Delete {@link ListenerAdapter} listener specified in parameter. + * + * @param listener + * ListenerAdapter + */ + private static void deleteListener(ListenerAdapter listener) { + if (listener != null) { + try { + listener.close(); + } catch (Exception e) { + } + try { + lock.lock(); + listenersByInstanceIdentifier.remove(listener.getPath()); + listenersByStreamName.remove(listener.getStreamName()); + } finally { + lock.unlock(); + } + } + } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java index 142cde1400..d7b4bd9d9e 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServer.java @@ -6,47 +6,56 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; +/** + * {@link WebSocketServer} is responsible to start and stop web socket server at + * {@link #PORT}. + */ public class WebSocketServer implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(WebSocketServer.class); - - public static final int PORT = 8181; - private EventLoopGroup bossGroup; - private EventLoopGroup workerGroup; - - @Override - public void run() { - bossGroup = new NioEventLoopGroup(); - workerGroup = new NioEventLoopGroup(); - try { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new WebSocketServerInitializer()); - - Channel ch = b.bind(PORT).sync().channel(); - logger.info("Web socket server started at port {}.", PORT); - - ch.closeFuture().sync(); - } catch (InterruptedException e) { - // NOOP - } finally { - stop(); - } - } - - private void stop() { - Notificator.removeAllListeners(); - if (bossGroup != null) { - bossGroup.shutdownGracefully(); - } - if (workerGroup != null) { - workerGroup.shutdownGracefully(); - } - } + private static final Logger logger = LoggerFactory + .getLogger(WebSocketServer.class); + + public static final int PORT = 8181; + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + @Override + public void run() { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new WebSocketServerInitializer()); + + Channel ch = b.bind(PORT).sync().channel(); + logger.info("Web socket server started at port {}.", PORT); + + ch.closeFuture().sync(); + } catch (InterruptedException e) { + // NOOP + } finally { + stop(); + } + } + + /** + * Stops the web socket server and removes all listeners. + */ + private void stop() { + Notificator.removeAllListeners(); + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java index 618ee57aba..bf899a0b25 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerHandler.java @@ -33,102 +33,154 @@ import org.opendaylight.controller.sal.streams.listeners.Notificator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * {@link WebSocketServerHandler} is implementation of + * {@link SimpleChannelInboundHandler} which allow handle + * {@link FullHttpRequest} and {@link WebSocketFrame} messages. + */ public class WebSocketServerHandler extends SimpleChannelInboundHandler { - private static final Logger logger = LoggerFactory.getLogger(WebSocketServerHandler.class); - - private WebSocketServerHandshaker handshaker; - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof FullHttpRequest) { - handleHttpRequest(ctx, (FullHttpRequest) msg); - } else if (msg instanceof WebSocketFrame) { - handleWebSocketFrame(ctx, (WebSocketFrame) msg); - } - } - - private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) - throws Exception { - // Handle a bad request. - if (!req.getDecoderResult().isSuccess()) { - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST)); - return; - } - - // Allow only GET methods. - if (req.getMethod() != GET) { - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN)); - return; - } - - String streamName = Notificator.createStreamNameFromUri(req.getUri()); - ListenerAdapter listener = Notificator.getListenerFor(streamName); - if (listener != null) { - listener.addSubscriber(ctx.channel()); - logger.debug("Subscriber successfully registered."); - } else { - logger.error("Listener for stream with name '{}' was not found.", streamName); - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)); - } - - // Handshake - WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( - getWebSocketLocation(req), null, false); - handshaker = wsFactory.newHandshaker(req); - if (handshaker == null) { - WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); - } else { - handshaker.handshake(ctx.channel(), req); - } - - } - - private static void sendHttpResponse(ChannelHandlerContext ctx, - HttpRequest req, FullHttpResponse res) { - // Generate an error page if response getStatus code is not OK (200). - if (res.getStatus().code() != 200) { - ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); - res.content().writeBytes(buf); - buf.release(); - setContentLength(res, res.content().readableBytes()); - } - - // Send the response and close the connection if necessary. - ChannelFuture f = ctx.channel().writeAndFlush(res); - if (!isKeepAlive(req) || res.getStatus().code() != 200) { - f.addListener(ChannelFutureListener.CLOSE); - } - } - - private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws IOException { - if (frame instanceof CloseWebSocketFrame) { - handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); - String streamName = Notificator.createStreamNameFromUri(((CloseWebSocketFrame) frame).reasonText()); - ListenerAdapter listener = Notificator.getListenerFor(streamName); - if (listener != null) { - listener.removeSubscriber(ctx.channel()); - logger.debug("Subscriber successfully registered."); - } - Notificator.removeListenerIfNoSubscriberExists(listener); - return; - } else if (frame instanceof PingWebSocketFrame) { - ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); - return; - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - if (cause instanceof java.nio.channels.ClosedChannelException == false) { - //cause.printStackTrace(); - } - ctx.close(); - } - - private static String getWebSocketLocation(HttpRequest req) { - return "http://" + req.headers().get(HOST) + req.getUri(); - } + private static final Logger logger = LoggerFactory + .getLogger(WebSocketServerHandler.class); + + private WebSocketServerHandshaker handshaker; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (msg instanceof FullHttpRequest) { + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + handleWebSocketFrame(ctx, (WebSocketFrame) msg); + } + } + + /** + * Checks if HTTP request method is GET and if is possible to decode HTTP + * result of request. + * + * @param ctx + * ChannelHandlerContext + * @param req + * FullHttpRequest + */ + private void handleHttpRequest(ChannelHandlerContext ctx, + FullHttpRequest req) throws Exception { + // Handle a bad request. + if (!req.getDecoderResult().isSuccess()) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, + BAD_REQUEST)); + return; + } + + // Allow only GET methods. + if (req.getMethod() != GET) { + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, + FORBIDDEN)); + return; + } + + String streamName = Notificator.createStreamNameFromUri(req.getUri()); + ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.addSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } else { + logger.error("Listener for stream with name '{}' was not found.", + streamName); + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, + INTERNAL_SERVER_ERROR)); + } + + // Handshake + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( + getWebSocketLocation(req), null, false); + handshaker = wsFactory.newHandshaker(req); + if (handshaker == null) { + WebSocketServerHandshakerFactory + .sendUnsupportedWebSocketVersionResponse(ctx.channel()); + } else { + handshaker.handshake(ctx.channel(), req); + } + + } + + /** + * Checks response status, send response and close connection if necessary + * + * @param ctx + * ChannelHandlerContext + * @param req + * HttpRequest + * @param res + * FullHttpResponse + */ + private static void sendHttpResponse(ChannelHandlerContext ctx, + HttpRequest req, FullHttpResponse res) { + // Generate an error page if response getStatus code is not OK (200). + if (res.getStatus().code() != 200) { + ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), + CharsetUtil.UTF_8); + res.content().writeBytes(buf); + buf.release(); + setContentLength(res, res.content().readableBytes()); + } + + // Send the response and close the connection if necessary. + ChannelFuture f = ctx.channel().writeAndFlush(res); + if (!isKeepAlive(req) || res.getStatus().code() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + } + + /** + * Handles web socket frame. + * + * @param ctx + * {@link ChannelHandlerContext} + * @param frame + * {@link WebSocketFrame} + */ + private void handleWebSocketFrame(ChannelHandlerContext ctx, + WebSocketFrame frame) throws IOException { + if (frame instanceof CloseWebSocketFrame) { + handshaker.close(ctx.channel(), + (CloseWebSocketFrame) frame.retain()); + String streamName = Notificator + .createStreamNameFromUri(((CloseWebSocketFrame) frame) + .reasonText()); + ListenerAdapter listener = Notificator.getListenerFor(streamName); + if (listener != null) { + listener.removeSubscriber(ctx.channel()); + logger.debug("Subscriber successfully registered."); + } + Notificator.removeListenerIfNoSubscriberExists(listener); + return; + } else if (frame instanceof PingWebSocketFrame) { + ctx.channel().write( + new PongWebSocketFrame(frame.content().retain())); + return; + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + if (cause instanceof java.nio.channels.ClosedChannelException == false) { + // cause.printStackTrace(); + } + ctx.close(); + } + + /** + * Get web socket location from HTTP request. + * + * @param req + * HTTP request from which the location will be returned + * @return String representation of web socket location. + */ + private static String getWebSocketLocation(HttpRequest req) { + return "http://" + req.headers().get(HOST) + req.getUri(); + } } diff --git a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java index 5eb71ef491..65ae5d6fab 100644 --- a/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java +++ b/opendaylight/md-sal/sal-rest-connector/src/main/java/org/opendaylight/controller/sal/streams/websockets/WebSocketServerInitializer.java @@ -1,19 +1,25 @@ package org.opendaylight.controller.sal.streams.websockets; +import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; -public class WebSocketServerInitializer extends ChannelInitializer { +/** + * {@link WebSocketServerInitializer} is used to setup the + * {@link ChannelPipeline} of a {@link Channel}. + */ +public class WebSocketServerInitializer extends + ChannelInitializer { - @Override - protected void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("codec-http", new HttpServerCodec()); - pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); - pipeline.addLast("handler", new WebSocketServerHandler()); - } + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("codec-http", new HttpServerCodec()); + pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); + pipeline.addLast("handler", new WebSocketServerHandler()); + } } diff --git a/opendaylight/md-sal/sal-restconf-broker/src/main/java/org/opendaylight/controller/sal/restconf/broker/impl/DataBrokerServiceImpl.java b/opendaylight/md-sal/sal-restconf-broker/src/main/java/org/opendaylight/controller/sal/restconf/broker/impl/DataBrokerServiceImpl.java index e6659c2265..9410d17007 100644 --- a/opendaylight/md-sal/sal-restconf-broker/src/main/java/org/opendaylight/controller/sal/restconf/broker/impl/DataBrokerServiceImpl.java +++ b/opendaylight/md-sal/sal-restconf-broker/src/main/java/org/opendaylight/controller/sal/restconf/broker/impl/DataBrokerServiceImpl.java @@ -7,10 +7,10 @@ */ package org.opendaylight.controller.sal.restconf.broker.impl; -import com.google.common.base.Optional; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; + import org.opendaylight.controller.sal.binding.api.data.DataBrokerService; import org.opendaylight.controller.sal.binding.api.data.DataChangeListener; import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; @@ -33,11 +33,13 @@ import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Optional; + public class DataBrokerServiceImpl implements DataBrokerService { private static final Logger logger = LoggerFactory.getLogger(DataBrokerServiceImpl.class.toString()); - private RestconfClientContext restconfClientContext; - private SalRemoteService salRemoteService; + private final RestconfClientContext restconfClientContext; + private final SalRemoteService salRemoteService; public DataBrokerServiceImpl(RestconfClientContext restconfClientContext) { this.restconfClientContext = restconfClientContext; @@ -152,7 +154,7 @@ public class DataBrokerServiceImpl implements DataBrokerService { } private class SalRemoteDataListenerRegistration implements ListenerRegistration { - private DataChangeListener dataChangeListener; + private final DataChangeListener dataChangeListener; public SalRemoteDataListenerRegistration(DataChangeListener dataChangeListener){ this.dataChangeListener = dataChangeListener; } @@ -161,7 +163,7 @@ public class DataBrokerServiceImpl implements DataBrokerService { return this.dataChangeListener; } @Override - public void close() throws Exception { + public void close() { //noop } } -- 2.36.6