+
+package class NetconfDeviceListener extends NetconfClientSessionListener {
+
+ val NetconfDevice device
+ val EventExecutor eventExecutor
+
+ new(NetconfDevice device,EventExecutor eventExecutor) {
+ this.device = device
+ this.eventExecutor = eventExecutor
+ }
+
+ var Promise<NetconfMessage> messagePromise;
+ val promiseLock = new ReentrantLock;
+
+ override onMessage(NetconfClientSession session, NetconfMessage message) {
+ if (isNotification(message)) {
+ onNotification(session, message);
+ } else try {
+ promiseLock.lock
+ if (messagePromise != null) {
+ messagePromise.setSuccess(message);
+ messagePromise = null;
+ }
+ } finally {
+ promiseLock.unlock
+ }
+ }
+
+ /**
+ * Method intended to customize notification processing.
+ *
+ * @param session
+ * {@see
+ * NetconfClientSessionListener#onMessage(NetconfClientSession,
+ * NetconfMessage)}
+ * @param message
+ * {@see
+ * NetconfClientSessionListener#onMessage(NetconfClientSession,
+ * NetconfMessage)}
+ */
+ def void onNotification(NetconfClientSession session, NetconfMessage message) {
+ device.logger.debug("Received NETCONF notification.",message);
+ val domNotification = message?.toCompositeNode?.notificationBody;
+ if(domNotification != null) {
+ device?.mountInstance?.publish(domNotification);
+ }
+ }
+
+ private static def CompositeNode getNotificationBody(CompositeNode node) {
+ for(child : node.children) {
+ if(child instanceof CompositeNode) {
+ return child as CompositeNode;
+ }
+ }
+ }
+
+ override getLastMessage(int attempts, int attemptMsDelay) throws InterruptedException {
+ val promise = promiseReply();
+ val messageAvailable = promise.await(attempts + attemptMsDelay);
+ if (messageAvailable) {
+ try {
+ return promise.get();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ throw new IllegalStateException("Unsuccessful after " + attempts + " attempts.");
+
+ // throw new TimeoutException("Message was not received on time.");
+ }
+
+ def Promise<NetconfMessage> promiseReply() {
+ promiseLock.lock
+ try {
+ if (messagePromise == null) {
+ messagePromise = eventExecutor.newPromise();
+ return messagePromise;
+ }
+ return messagePromise;
+ } finally {
+ promiseLock.unlock
+ }
+ }
+
+ def boolean isNotification(NetconfMessage message) {
+ val xmle = XmlElement.fromDomDocument(message.getDocument());
+ return XmlNetconfConstants.NOTIFICATION_ELEMENT_NAME.equals(xmle.getName());
+ }
+}
+
+package class NetconfDeviceSchemaContextProvider {
+
+ @Property
+ val NetconfDevice device;
+
+ @Property
+ val SchemaSourceProvider<InputStream> sourceProvider;
+
+ @Property
+ var Optional<SchemaContext> currentContext;
+
+ new(NetconfDevice device, SchemaSourceProvider<InputStream> sourceProvider) {
+ _device = device
+ _sourceProvider = sourceProvider
+ }
+
+ def createContextFromCapabilities(Iterable<QName> capabilities) {
+
+ val modelsToParse = ImmutableMap.<QName, InputStream>builder();
+ for (cap : capabilities) {
+ val source = sourceProvider.getSchemaSource(cap.localName, Optional.fromNullable(cap.formattedRevision));
+ if (source.present) {
+ modelsToParse.put(cap, source.get());
+ }
+ }
+ val context = tryToCreateContext(modelsToParse.build);
+ currentContext = Optional.fromNullable(context);
+ }
+
+ def SchemaContext tryToCreateContext(Map<QName, InputStream> modelsToParse) {
+ val parser = new YangParserImpl();
+ try {
+ val models = parser.parseYangModelsFromStreams(ImmutableList.copyOf(modelsToParse.values));
+ val result = parser.resolveSchemaContext(models);
+ return result;
+ } catch (Exception e) {
+ device.logger.debug("Error occured during parsing YANG schemas", e);
+ return null;
+ }
+ }
+}
+
+package class NetconfDeviceSchemaSourceProvider implements SchemaSourceProvider<String> {
+
+ val NetconfDevice device;
+
+ new(NetconfDevice device) {
+ this.device = device;
+ }
+
+ override getSchemaSource(String moduleName, Optional<String> revision) {
+ val it = ImmutableCompositeNode.builder() //
+ setQName(QName::create(NetconfState.QNAME, "get-schema")) //
+ addLeaf("format", "yang")
+ addLeaf("identifier", moduleName)
+ if (revision.present) {
+ addLeaf("version", revision.get())
+ }
+
+ device.logger.info("Loading YANG schema source for {}:{}", moduleName, revision)
+ val schemaReply = device.invokeRpc(getQName(), toInstance());
+
+ if (schemaReply.successful) {
+ val schemaBody = schemaReply.result.getFirstSimpleByName(
+ QName::create(NetconfState.QNAME.namespace, null, "data"))?.value;
+ device.logger.info("YANG Schema successfully received for: {}:{}", moduleName, revision);
+ return Optional.of(schemaBody as String);
+ }
+ return Optional.absent();
+ }
+}