Merge "BUG-362: add some diagnostic information Changed Remote RPC Server Implementat...
[controller.git] / opendaylight / md-sal / sal-rest-connector / src / main / java / org / opendaylight / controller / sal / streams / listeners / ListenerAdapter.java
1 package org.opendaylight.controller.sal.streams.listeners;
2
3 import io.netty.channel.Channel;
4 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
5 import io.netty.util.internal.ConcurrentSet;
6
7 import java.io.ByteArrayOutputStream;
8 import java.io.OutputStreamWriter;
9 import java.io.UnsupportedEncodingException;
10 import java.text.SimpleDateFormat;
11 import java.util.Collection;
12 import java.util.Date;
13 import java.util.HashMap;
14 import java.util.Map;
15 import java.util.Map.Entry;
16 import java.util.Random;
17 import java.util.Set;
18 import java.util.concurrent.Executors;
19
20 import javax.activation.UnsupportedDataTypeException;
21 import javax.xml.parsers.DocumentBuilder;
22 import javax.xml.parsers.DocumentBuilderFactory;
23 import javax.xml.parsers.ParserConfigurationException;
24 import javax.xml.transform.OutputKeys;
25 import javax.xml.transform.Transformer;
26 import javax.xml.transform.TransformerException;
27 import javax.xml.transform.TransformerFactory;
28 import javax.xml.transform.dom.DOMSource;
29 import javax.xml.transform.stream.StreamResult;
30
31 import org.opendaylight.controller.md.sal.common.api.data.DataChangeEvent;
32 import org.opendaylight.controller.sal.core.api.data.DataChangeListener;
33 import org.opendaylight.controller.sal.rest.impl.XmlMapper;
34 import org.opendaylight.controller.sal.restconf.impl.ControllerContext;
35 import org.opendaylight.yangtools.concepts.ListenerRegistration;
36 import org.opendaylight.yangtools.yang.common.QName;
37 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeIdentifierWithPredicates;
40 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.NodeWithValue;
41 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
42 import org.opendaylight.yangtools.yang.model.api.DataNodeContainer;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45 import org.w3c.dom.Document;
46 import org.w3c.dom.Element;
47 import org.w3c.dom.Node;
48
49 import com.google.common.base.Preconditions;
50 import com.google.common.eventbus.AsyncEventBus;
51 import com.google.common.eventbus.EventBus;
52 import com.google.common.eventbus.Subscribe;
53
54 public class ListenerAdapter implements DataChangeListener {
55
56     private static final Logger logger = LoggerFactory.getLogger(ListenerAdapter.class);
57     private final XmlMapper xmlMapper = new XmlMapper();
58     private final SimpleDateFormat rfc3339 = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ssZ");
59
60     private final InstanceIdentifier path;
61     private ListenerRegistration<DataChangeListener> registration;
62     private final String streamName;
63     private Set<Channel> subscribers = new ConcurrentSet<>();
64     private final EventBus eventBus;
65     private final EventBusChangeRecorder eventBusChangeRecorder;
66
67     ListenerAdapter(InstanceIdentifier path, String streamName) {
68         Preconditions.checkNotNull(path);
69         Preconditions.checkArgument(streamName != null && !streamName.isEmpty());
70         this.path = path;
71         this.streamName = streamName;
72         eventBus = new AsyncEventBus(Executors.newSingleThreadExecutor());
73         eventBusChangeRecorder = new EventBusChangeRecorder();
74         eventBus.register(eventBusChangeRecorder);
75     }
76
77     @Override
78     public void onDataChanged(DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
79         if (!change.getCreatedConfigurationData().isEmpty() || !change.getCreatedOperationalData().isEmpty()
80                 || !change.getUpdatedConfigurationData().isEmpty() || !change.getUpdatedOperationalData().isEmpty()
81                 || !change.getRemovedConfigurationData().isEmpty() || !change.getRemovedOperationalData().isEmpty()) {
82             String xml = prepareXmlFrom(change);
83             Event event = new Event(EventType.NOTIFY);
84             event.setData(xml);
85             eventBus.post(event);
86         }
87     }
88
89     private final class EventBusChangeRecorder {
90         @Subscribe public void recordCustomerChange(Event event) {
91             if (event.getType() == EventType.REGISTER) {
92                 Channel subscriber = event.getSubscriber();
93                 if (!subscribers.contains(subscriber)) {
94                     subscribers.add(subscriber);
95                 }
96             } else if (event.getType() == EventType.DEREGISTER) {
97                 subscribers.remove(event.getSubscriber());
98                 Notificator.removeListenerIfNoSubscriberExists(ListenerAdapter.this);
99             } else if (event.getType() == EventType.NOTIFY) {
100                 for (Channel subscriber : subscribers) {
101                     if (subscriber.isActive()) {
102                         logger.debug("Data are sent to subscriber {}:", subscriber.remoteAddress());
103                         subscriber.writeAndFlush(new TextWebSocketFrame(event.getData()));
104                     } else {
105                         logger.debug("Subscriber {} is removed - channel is not active yet.", subscriber.remoteAddress());
106                         subscribers.remove(subscriber);
107                     }
108                 }
109             }
110         }
111     }
112
113     private final class Event {
114         private final EventType type;
115         private Channel subscriber;
116         private String data;
117
118         public Event(EventType type) {
119             this.type = type;
120         }
121
122         public Channel getSubscriber() {
123             return subscriber;
124         }
125
126         public void setSubscriber(Channel subscriber) {
127             this.subscriber = subscriber;
128         }
129
130         public String getData() {
131             return data;
132         }
133
134         public void setData(String data) {
135             this.data = data;
136         }
137
138         public EventType getType() {
139             return type;
140         }
141     }
142
143     private enum EventType {
144         REGISTER,
145         DEREGISTER,
146         NOTIFY;
147     }
148
149     private String prepareXmlFrom(DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
150         Document doc = createDocument();
151         Element notificationElement = doc.createElementNS("urn:ietf:params:xml:ns:netconf:notification:1.0",
152                 "notification");
153         doc.appendChild(notificationElement);
154
155         Element eventTimeElement = doc.createElement("eventTime");
156         eventTimeElement.setTextContent(toRFC3339(new Date()));
157         notificationElement.appendChild(eventTimeElement);
158
159         Element dataChangedNotificationEventElement = doc.createElementNS(
160                 "urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote", "data-changed-notification");
161         addValuesToDataChangedNotificationEventElement(doc, dataChangedNotificationEventElement, change);
162         notificationElement.appendChild(dataChangedNotificationEventElement);
163
164         try {
165             ByteArrayOutputStream out = new ByteArrayOutputStream();
166             TransformerFactory tf = TransformerFactory.newInstance();
167             Transformer transformer = tf.newTransformer();
168             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
169             transformer.setOutputProperty(OutputKeys.METHOD, "xml");
170             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
171             transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
172             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");
173             transformer.transform(new DOMSource(doc), new StreamResult(new OutputStreamWriter(out, "UTF-8")));
174             byte[] charData = out.toByteArray();
175             return new String(charData, "UTF-8");
176         } catch (TransformerException | UnsupportedEncodingException e) {
177             String msg = "Error during transformation of Document into String";
178             logger.error(msg, e);
179             return msg;
180         }
181     }
182
183     private String toRFC3339(Date d) {
184         return rfc3339.format(d).replaceAll("(\\d\\d)(\\d\\d)$", "$1:$2");
185     }
186
187     private Document createDocument() {
188         DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
189         Document doc = null;
190         try {
191             DocumentBuilder bob = dbf.newDocumentBuilder();
192             doc = bob.newDocument();
193         } catch (ParserConfigurationException e) {
194             return null;
195         }
196         return doc;
197     }
198
199     private void addValuesToDataChangedNotificationEventElement(Document doc,
200             Element dataChangedNotificationEventElement, DataChangeEvent<InstanceIdentifier, CompositeNode> change) {
201         addValuesFromDataToElement(doc, change.getCreatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.CREATED);
202         addValuesFromDataToElement(doc, change.getCreatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.CREATED);
203         if (change.getCreatedConfigurationData().isEmpty()) {
204             addValuesFromDataToElement(doc, change.getUpdatedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.UPDATED);
205         }
206         if (change.getCreatedOperationalData().isEmpty()) {
207             addValuesFromDataToElement(doc, change.getUpdatedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.UPDATED);
208         }
209         addValuesFromDataToElement(doc, change.getRemovedConfigurationData(), dataChangedNotificationEventElement, Store.CONFIG, Operation.DELETED);
210         addValuesFromDataToElement(doc, change.getRemovedOperationalData(), dataChangedNotificationEventElement, Store.OPERATION, Operation.DELETED);
211     }
212
213     private void addValuesFromDataToElement(Document doc, Set<InstanceIdentifier> data, Element element, Store store,
214             Operation operation) {
215         if (data == null || data.isEmpty()) {
216             return;
217         }
218         for (InstanceIdentifier path : data) {
219             Node node = createDataChangeEventElement(doc, path, null, store, operation);
220             element.appendChild(node);
221         }
222     }
223
224     private void addValuesFromDataToElement(Document doc, Map<InstanceIdentifier, CompositeNode> data, Element element, Store store,
225             Operation operation) {
226         if (data == null || data.isEmpty()) {
227             return;
228         }
229         for (Entry<InstanceIdentifier, CompositeNode> entry : data.entrySet()) {
230             Node node = createDataChangeEventElement(doc, entry.getKey(), entry.getValue(), store, operation);
231             element.appendChild(node);
232         }
233     }
234
235     private Node createDataChangeEventElement(Document doc, InstanceIdentifier path, CompositeNode data, Store store,
236             Operation operation) {
237         Element dataChangeEventElement = doc.createElement("data-change-event");
238
239         Element pathElement = doc.createElement("path");
240         addPathAsValueToElement(path, pathElement);
241         dataChangeEventElement.appendChild(pathElement);
242
243         Element storeElement = doc.createElement("store");
244         storeElement.setTextContent(store.value);
245         dataChangeEventElement.appendChild(storeElement);
246
247         Element operationElement = doc.createElement("operation");
248         operationElement.setTextContent(operation.value);
249         dataChangeEventElement.appendChild(operationElement);
250
251         if (data != null) {
252             Element dataElement = doc.createElement("data");
253             Node dataAnyXml = translateToXml(path, data);
254             Node adoptedNode = doc.adoptNode(dataAnyXml);
255             dataElement.appendChild(adoptedNode);
256             dataChangeEventElement.appendChild(dataElement);
257         }
258
259         return dataChangeEventElement;
260     }
261
262     private Node translateToXml(InstanceIdentifier path, CompositeNode data) {
263         DataNodeContainer schemaNode = ControllerContext.getInstance().getDataNodeContainerFor(path);
264         if (schemaNode == null) {
265             logger.info("Path '{}' contains node with unsupported type (supported type is Container or List) or some node was not found.", path);
266             return null;
267         }
268         try {
269             Document xml = xmlMapper.write(data, schemaNode);
270             return xml.getFirstChild();
271         } catch (UnsupportedDataTypeException e) {
272             logger.error("Error occured during translation of notification to XML.", e);
273             return null;
274         }
275     }
276
277     private void addPathAsValueToElement(InstanceIdentifier path, Element element) {
278         // Map< key = namespace, value = prefix>
279         Map<String, String> prefixes = new HashMap<>();
280         InstanceIdentifier instanceIdentifier = path;
281         StringBuilder textContent = new StringBuilder();
282         for (PathArgument pathArgument : instanceIdentifier.getPath()) {
283             textContent.append("/");
284             writeIdentifierWithNamespacePrefix(element, textContent, pathArgument.getNodeType(), prefixes);
285             if (pathArgument instanceof NodeIdentifierWithPredicates) {
286                 Map<QName, Object> predicates = ((NodeIdentifierWithPredicates) pathArgument).getKeyValues();
287                 for (QName keyValue : predicates.keySet()) {
288                     String predicateValue = String.valueOf(predicates.get(keyValue));
289                     textContent.append("[");
290                     writeIdentifierWithNamespacePrefix(element, textContent, keyValue, prefixes);
291                     textContent.append("='");
292                     textContent.append(predicateValue);
293                     textContent.append("'");
294                     textContent.append("]");
295                 }
296             } else if (pathArgument instanceof NodeWithValue) {
297                 textContent.append("[.='");
298                 textContent.append(((NodeWithValue)pathArgument).getValue());
299                 textContent.append("'");
300                 textContent.append("]");
301             }
302         }
303         element.setTextContent(textContent.toString());
304     }
305
306     private static void writeIdentifierWithNamespacePrefix(Element element, StringBuilder textContent, QName qName,
307             Map<String, String> prefixes) {
308         String namespace = qName.getNamespace().toString();
309         String prefix = prefixes.get(namespace);
310         if (prefix == null) {
311             prefix = qName.getPrefix();
312             if (prefix == null || prefix.isEmpty() || prefixes.containsValue(prefix)) {
313                 prefix = generateNewPrefix(prefixes.values());
314             }
315         }
316
317         element.setAttribute("xmlns:" + prefix, namespace.toString());
318         textContent.append(prefix);
319         prefixes.put(namespace, prefix);
320
321         textContent.append(":");
322         textContent.append(qName.getLocalName());
323     }
324
325     private static String generateNewPrefix(Collection<String> prefixes) {
326         StringBuilder result = null;
327         Random random = new Random();
328         do {
329             result = new StringBuilder();
330             for (int i = 0; i < 4; i++) {
331                 int randomNumber = 0x61 + (Math.abs(random.nextInt()) % 26);
332                 result.append(Character.toChars(randomNumber));
333             }
334         } while (prefixes.contains(result.toString()));
335
336         return result.toString();
337     }
338
339     public InstanceIdentifier getPath() {
340         return path;
341     }
342
343     public void setRegistration(ListenerRegistration<DataChangeListener> registration) {
344         this.registration = registration;
345     }
346
347     public String getStreamName() {
348         return streamName;
349     }
350
351     public void close() throws Exception {
352         subscribers = new ConcurrentSet<>();
353         registration.close();
354         registration = null;
355         eventBus.unregister(eventBusChangeRecorder);
356     }
357
358     public boolean isListening() {
359         return registration == null ? false : true;
360     }
361
362     public void addSubscriber(Channel subscriber) {
363         if (!subscriber.isActive()) {
364             logger.debug("Channel is not active between websocket server and subscriber {}"
365                     + subscriber.remoteAddress());
366         }
367         Event event = new Event(EventType.REGISTER);
368         event.setSubscriber(subscriber);
369         eventBus.post(event);
370     }
371
372     public void removeSubscriber(Channel subscriber) {
373         logger.debug("Subscriber {} is removed.", subscriber.remoteAddress());
374         Event event = new Event(EventType.DEREGISTER);
375         event.setSubscriber(subscriber);
376         eventBus.post(event);
377     }
378
379     public boolean hasSubscribers() {
380         return !subscribers.isEmpty();
381     }
382
383     private static enum Store {
384         CONFIG("config"),
385         OPERATION("operation");
386
387         private final String value;
388
389         private Store(String value) {
390             this.value = value;
391         }
392     }
393
394     private static enum Operation {
395         CREATED("created"),
396         UPDATED("updated"),
397         DELETED("deleted");
398
399         private final String value;
400
401         private Operation(String value) {
402             this.value = value;
403         }
404     }
405
406 }