Prevent ConfigPusher from killing its thread
[controller.git] / opendaylight / md-sal / sal-netconf-connector / src / main / java / org / opendaylight / controller / sal / connect / netconf / NetconfDeviceListener.java
1 package org.opendaylight.controller.sal.connect.netconf;
2
3 import com.google.common.base.Objects;
4
5 import io.netty.util.concurrent.EventExecutor;
6 import io.netty.util.concurrent.Promise;
7
8 import java.util.List;
9 import java.util.concurrent.ConcurrentMap;
10 import java.util.concurrent.ExecutionException;
11 import java.util.concurrent.locks.ReentrantLock;
12
13 import org.eclipse.xtext.xbase.lib.Exceptions;
14 import org.eclipse.xtext.xbase.lib.Functions.Function0;
15 import org.opendaylight.controller.netconf.api.NetconfMessage;
16 import org.opendaylight.controller.netconf.client.NetconfClientSession;
17 import org.opendaylight.controller.netconf.client.NetconfClientSessionListener;
18 import org.opendaylight.controller.netconf.util.xml.XmlElement;
19 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
20 import org.opendaylight.controller.sal.connect.netconf.NetconfDevice;
21 import org.opendaylight.controller.sal.connect.netconf.NetconfMapping;
22 import org.opendaylight.controller.sal.core.api.mount.MountProvisionInstance;
23 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
24 import org.opendaylight.yangtools.yang.data.api.Node;
25 import org.w3c.dom.Document;
26
27 @SuppressWarnings("all")
28 class NetconfDeviceListener extends NetconfClientSessionListener {
29     private final NetconfDevice device;
30     private final EventExecutor eventExecutor;
31
32     public NetconfDeviceListener(final NetconfDevice device, final EventExecutor eventExecutor) {
33         this.device = device;
34         this.eventExecutor = eventExecutor;
35     }
36
37     private Promise<NetconfMessage> messagePromise;
38     private ConcurrentMap<String, Promise<NetconfMessage>> promisedMessages;
39
40     private final ReentrantLock promiseLock = new ReentrantLock();
41
42     public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
43         if (isNotification(message)) {
44             this.onNotification(session, message);
45         } else {
46             try {
47                 this.promiseLock.lock();
48                 boolean _notEquals = (!Objects.equal(this.messagePromise, null));
49                 if (_notEquals) {
50                     this.device.logger.debug("Setting promised reply {} with message {}", this.messagePromise, message);
51                     this.messagePromise.setSuccess(message);
52                     this.messagePromise = null;
53                 }
54             } finally {
55                 this.promiseLock.unlock();
56             }
57         }
58     }
59
60     /**
61      * Method intended to customize notification processing.
62      * 
63      * @param session
64      *            {@see
65      *            NetconfClientSessionListener#onMessage(NetconfClientSession,
66      *            NetconfMessage)}
67      * @param message
68      *            {@see
69      *            NetconfClientSessionListener#onMessage(NetconfClientSession,
70      *            NetconfMessage)}
71      */
72     public void onNotification(final NetconfClientSession session, final NetconfMessage message) {
73         this.device.logger.debug("Received NETCONF notification.", message);
74         CompositeNode _notificationBody = null;
75         CompositeNode _compositeNode = null;
76         if (message != null) {
77             _compositeNode = NetconfMapping.toCompositeNode(message,device.getSchemaContext());
78         }
79         if (_compositeNode != null) {
80             _notificationBody = NetconfDeviceListener.getNotificationBody(_compositeNode);
81         }
82         final CompositeNode domNotification = _notificationBody;
83         boolean _notEquals = (!Objects.equal(domNotification, null));
84         if (_notEquals) {
85             MountProvisionInstance _mountInstance = null;
86             if (this.device != null) {
87                 _mountInstance = this.device.getMountInstance();
88             }
89             if (_mountInstance != null) {
90                 _mountInstance.publish(domNotification);
91             }
92         }
93     }
94
95     private static CompositeNode getNotificationBody(final CompositeNode node) {
96         List<Node<? extends Object>> _children = node.getChildren();
97         for (final Node<? extends Object> child : _children) {
98             if ((child instanceof CompositeNode)) {
99                 return ((CompositeNode) child);
100             }
101         }
102         return null;
103     }
104
105     public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException {
106         final Promise<NetconfMessage> promise = this.promiseReply();
107         this.device.logger.debug("Waiting for reply {}", promise);
108         int _plus = (attempts * attemptMsDelay);
109         final boolean messageAvailable = promise.await(_plus);
110         if (messageAvailable) {
111             try {
112                 try {
113                     return promise.get();
114                 } catch (Throwable _e) {
115                     throw Exceptions.sneakyThrow(_e);
116                 }
117             } catch (final Throwable _t) {
118                 if (_t instanceof ExecutionException) {
119                     final ExecutionException e = (ExecutionException) _t;
120                     IllegalStateException _illegalStateException = new IllegalStateException(e);
121                     throw _illegalStateException;
122                 } else {
123                     throw Exceptions.sneakyThrow(_t);
124                 }
125             }
126         }
127         String _plus_1 = ("Unsuccessful after " + Integer.valueOf(attempts));
128         String _plus_2 = (_plus_1 + " attempts.");
129         IllegalStateException _illegalStateException_1 = new IllegalStateException(_plus_2);
130         throw _illegalStateException_1;
131     }
132
133     public synchronized Promise<NetconfMessage> promiseReply() {
134         this.device.logger.debug("Promising reply.");
135         this.promiseLock.lock();
136         try {
137             boolean _equals = Objects.equal(this.messagePromise, null);
138             if (_equals) {
139                 Promise<NetconfMessage> _newPromise = this.eventExecutor.<NetconfMessage> newPromise();
140                 this.messagePromise = _newPromise;
141                 return this.messagePromise;
142             }
143             return this.messagePromise;
144         } finally {
145             this.promiseLock.unlock();
146         }
147     }
148
149     public boolean isNotification(final NetconfMessage message) {
150         Document _document = message.getDocument();
151         final XmlElement xmle = XmlElement.fromDomDocument(_document);
152         String _name = xmle.getName();
153         return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(_name);
154     }
155 }