- public NetconfMessage getLastMessage(final int attempts, final int attemptMsDelay) throws InterruptedException {
- final Promise<NetconfMessage> promise = this.promiseReply();
- this.device.logger.debug("Waiting for reply {}", promise);
- int _plus = (attempts * attemptMsDelay);
- final boolean messageAvailable = promise.await(_plus);
- if (messageAvailable) {
- try {
- try {
- return promise.get();
- } catch (Throwable _e) {
- throw Exceptions.sneakyThrow(_e);
+ private synchronized void processMessage(final NetconfMessage message) {
+ final Request r = requests.peek();
+ if (r.future.isUncancellable()) {
+ requests.poll();
+ LOG.debug("Matched {} to {}", r.request, message);
+
+ // FIXME: this can throw exceptions, which should result
+ // in the future failing
+ NetconfMapping.checkValidReply(r.request, message);
+ r.future.set(Rpcs.getRpcResult(true, NetconfMapping.toNotificationNode(message, device.getSchemaContext()),
+ Collections.<RpcError>emptyList()));
+ } else {
+ LOG.warn("Ignoring unsolicited message", message);
+ }
+ }
+
+ synchronized ListenableFuture<RpcResult<CompositeNode>> sendRequest(final NetconfMessage message) {
+ if (session == null) {
+ LOG.debug("Session to {} is disconnected, failing RPC request {}", device.getName(), message);
+ return Futures.<RpcResult<CompositeNode>>immediateFuture(new RpcResult<CompositeNode>() {
+ @Override
+ public boolean isSuccessful() {
+ return false;
+ }
+
+ @Override
+ public CompositeNode getResult() {
+ return null;
+ }
+
+ @Override
+ public Collection<RpcError> getErrors() {
+ // FIXME: indicate that the session is down
+ return Collections.emptySet();