2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl.tx;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.dispatch.OnComplete;
14 import akka.pattern.Patterns;
15 import com.google.common.base.Optional;
16 import org.opendaylight.controller.config.util.xml.DocumentedException;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
19 import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
20 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
21 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
22 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
23 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
24 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
25 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
26 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
30 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
36 import scala.concurrent.impl.Promise.DefaultPromise;
39 public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
41 private static final Logger LOG = LoggerFactory.getLogger(NetconfProxyDOMTransaction.class);
43 private final RemoteDeviceId id;
44 private final ActorSystem actorSystem;
45 private final ActorRef masterContextRef;
47 public NetconfProxyDOMTransaction(final RemoteDeviceId id,
48 final ActorSystem actorSystem,
49 final ActorRef masterContextRef) {
51 this.actorSystem = actorSystem;
52 this.masterContextRef = masterContextRef;
56 public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
57 final YangInstanceIdentifier path) {
59 final Future<Object> readScalaFuture =
60 Patterns.ask(masterContextRef, new ReadRequest(store, path), NetconfTopologyUtils.TIMEOUT);
62 LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
64 final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
66 readScalaFuture.onComplete(new OnComplete<Object>() {
68 public void onComplete(final Throwable failure, final Object success) throws Throwable {
69 if (failure != null) { // ask timeout
70 Exception exception = new DocumentedException(id + ":Master is down. Please try again.",
71 DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
72 DocumentedException.ErrorSeverity.WARNING);
73 promise.failure(exception);
76 if (success instanceof Throwable) { // Error sended by master
77 promise.failure((Throwable) success);
80 if (success instanceof EmptyReadResponse) {
81 promise.success(Optional.absent());
84 promise.success(Optional.of((NormalizedNodeMessage) success));
86 }, actorSystem.dispatcher());
88 return promise.future();
92 public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
93 final Future<Object> existsScalaFuture =
94 Patterns.ask(masterContextRef, new ExistsRequest(store, path), NetconfTopologyUtils.TIMEOUT);
96 LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
98 final DefaultPromise<Boolean> promise = new DefaultPromise<>();
99 existsScalaFuture.onComplete(new OnComplete<Object>() {
101 public void onComplete(final Throwable failure, final Object success) throws Throwable {
102 if (failure != null) { // ask timeout
103 Exception exception = new DocumentedException(id + ":Master is down. Please try again.",
104 DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
105 DocumentedException.ErrorSeverity.WARNING);
106 promise.failure(exception);
109 if (success instanceof Throwable) {
110 promise.failure((Throwable) success);
113 promise.success((Boolean) success);
115 }, actorSystem.dispatcher());
116 return promise.future();
120 public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
121 LOG.trace("{}: Write {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
123 masterContextRef.tell(new PutRequest(store, data), ActorRef.noSender());
128 public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
129 LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
131 masterContextRef.tell(new MergeRequest(store, data), ActorRef.noSender());
135 public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
136 LOG.trace("{}: Delete {} via NETCONF: {}", id, store, path);
138 masterContextRef.tell(new DeleteRequest(store, path), ActorRef.noSender());
142 public boolean cancel() {
143 final Future<Object> cancelScalaFuture =
144 Patterns.ask(masterContextRef, new CancelRequest(), NetconfTopologyUtils.TIMEOUT);
146 LOG.trace("{}: Cancel {} via NETCONF", id);
149 // here must be Await because AsyncWriteTransaction do not return future
150 return (boolean) Await.result(cancelScalaFuture, NetconfTopologyUtils.TIMEOUT.duration());
151 } catch (Exception e) {
157 public Future<Void> submit() {
158 final Future<Object> submitScalaFuture =
159 Patterns.ask(masterContextRef, new SubmitRequest(), NetconfTopologyUtils.TIMEOUT);
161 LOG.trace("{}: Submit {} via NETCONF", id);
163 final DefaultPromise<Void> promise = new DefaultPromise<>();
165 submitScalaFuture.onComplete(new OnComplete<Object>() {
167 public void onComplete(final Throwable failure, final Object success) throws Throwable {
168 if (failure != null) { // ask timeout
169 Exception exception = new DocumentedException(id + ":Master is down. Please try again.",
170 DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
171 DocumentedException.ErrorSeverity.WARNING);
172 promise.failure(exception);
175 if (success instanceof Throwable) {
176 promise.failure((Throwable) success);
178 if (success instanceof SubmitFailedReply) {
179 LOG.error("{}: Transaction was not submitted because already closed.", id);
181 promise.success(null);
184 }, actorSystem.dispatcher());
186 return promise.future();