+ }
+
+ device.bringDown();
+ }
+
+ @Override
+ public void onSessionDown(final NetconfClientSession session, final Exception e) {
+ LOG.debug("Session with {} went down", device.getName(), e);
+ tearDown(e);
+ }
+
+ @Override
+ public void onSessionTerminated(final NetconfClientSession session, final NetconfTerminationReason reason) {
+ LOG.debug("Session with {} terminated {}", session, reason);
+ tearDown(new RuntimeException(reason.getErrorMessage()));
+ }
+
+ @Override
+ public void onMessage(final NetconfClientSession session, final NetconfMessage message) {
+ /*
+ * Dispatch between notifications and messages. Messages need to be processed
+ * with lock held, notifications do not.
+ */
+ if (isNotification(message)) {
+ processNotification(message);
+ } else {
+ processMessage(message);
+ }
+ }
+
+ private synchronized void processMessage(final NetconfMessage message) {
+ final Request r = requests.peek();
+ if (r.future.isUncancellable()) {
+ requests.poll();
+ LOG.debug("Matched {} to {}", r.request, message);
+
+ // FIXME: this can throw exceptions, which should result
+ // in the future failing
+ NetconfMapping.checkValidReply(r.request, message);
+ r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
+ Collections.<RpcError>emptyList()));
+ } else {
+ LOG.warn("Ignoring unsolicited message", message);
+ }
+ }
+
+ synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
+ if (session == null) {
+ LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
+ return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
+ @Override
+ public boolean isSuccessful() {
+ return false;
+ }
+
+ @Override
+ public CompositeNode getResult() {
+ return null;
+ }
+
+ @Override
+ public Collection<RpcError> getErrors() {
+ // FIXME: indicate that the session is down
+ return Collections.emptySet();
+ }
+ });
+ }
+
+ final Request req = new Request(new UncancellableFuture<RpcResult<CompositeNode>>(true), message);
+ requests.add(req);
+
+ session.sendMessage(req.request).addListener(new FutureListener<Void>() {
+ @Override
+ public void operationComplete(final Future<Void> future) throws Exception {
+ if (!future.isSuccess()) {
+ // We expect that a session down will occur at this point
+ LOG.debug("Failed to send request {}", req.request, future.cause());
+ req.future.setException(future.cause());
+ } else {
+ LOG.trace("Finished sending request {}", req.request);
+ }