+ public synchronized void onSessionUp(final NetconfClientSession session) {
+ LOG.debug("Session with {} established as address {} session-id {}",
+ device.getName(), device.getSocketAddress(), session.getSessionId());
+
+ this.session = session;
+
+ final Set<QName> caps = device.getCapabilities(session.getServerCapabilities());
+ LOG.trace("Server {} advertized capabilities {}", device.getName(), caps);
+
+ // Select the appropriate provider
+ final SchemaSourceProvider<String> delegate;
+ if (NetconfRemoteSchemaSourceProvider.isSupportedFor(caps)) {
+ delegate = new NetconfRemoteSchemaSourceProvider(device);
+ // FIXME caps do not contain urn:ietf:params:xml:ns:yang:ietf-netconf-monitoring, since it is filtered out in getCapabilitites
+ } else if(session.getServerCapabilities().contains(NetconfRemoteSchemaSourceProvider.IETF_NETCONF_MONITORING.getNamespace().toString())) {
+ delegate = new NetconfRemoteSchemaSourceProvider(device);
+ } else {
+ LOG.info("Netconf server {} does not support IETF Netconf Monitoring", device.getName());
+ delegate = SchemaSourceProviders.noopProvider();
+ }
+
+ device.bringUp(delegate, caps, isRollbackSupported(session.getServerCapabilities()));
+
+ }
+
+ private static boolean isRollbackSupported(final Collection<String> serverCapabilities) {
+ // TODO rollback capability cannot be searched for in Set<QName> caps
+ // since this set does not contain module-less capabilities
+ return Sets.newHashSet(serverCapabilities).contains(NetconfMapping.NETCONF_ROLLBACK_ON_ERROR_URI.toString());
+ }
+
+ private synchronized void tearDown(final Exception e) {
+ session = null;
+
+ /*
+ * Walk all requests, check if they have been executing
+ * or cancelled and remove them from the queue.
+ */
+ final Iterator<Request> it = requests.iterator();
+ while (it.hasNext()) {
+ final Request r = it.next();
+ if (r.future.isUncancellable()) {
+ // FIXME: add a RpcResult instead?
+ r.future.setException(e);
+ it.remove();
+ } else if (r.future.isCancelled()) {
+ // This just does some house-cleaning
+ it.remove();
+ }
+ }
+
+ device.bringDown();
+ }
+
+ @Override
+ public void onSessionDown(final NetconfClientSession session, final Exception e) {
+ LOG.debug("Session with {} went down", device.getName(), e);
+ tearDown(e);
+ }
+
+ @Override
+ public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
+ LOG.debug("Session with {} terminated {}", session, reason);
+ tearDown(new RuntimeException(reason.getErrorMessage()));
+ }
+
+ @Override
+ public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+ /*
+ * Dispatch between notifications and messages. Messages need to be processed
+ * with lock held, notifications do not.
+ */
+ if (isNotification(message)) {
+ processNotification(message);
+ } else {
+ processMessage(message);
+ }
+ }
+
+ private synchronized void processMessage(final NetconfMessage message) {
+ final Request r = requests.peek();
+ if (r.future.isUncancellable()) {
+ requests.poll();
+ LOG.debug("Matched {} to {}", r.request, message);
+
+ try {
+ NetconfMapping.checkValidReply(r.request, message);
+ } catch (IllegalStateException e) {
+ LOG.warn("Invalid request-reply match, reply message contains different message-id", e);
+ r.future.setException(e);
+ return;
+ }
+
+ try {
+ NetconfMapping.checkSuccessReply(message);
+ } catch (NetconfDocumentedException | IllegalStateException e) {
+ LOG.warn("Error reply from remote device", e);
+ r.future.setException(e);
+ return;