Bug 6911 - RPC support in singleton
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / NetconfProxyDOMTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.tx;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.dispatch.OnComplete;
14 import akka.pattern.Patterns;
15 import com.google.common.base.Optional;
16 import org.opendaylight.controller.config.util.xml.DocumentedException;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
19 import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
20 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
21 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
22 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
23 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
24 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
25 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
26 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
30 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
36 import scala.concurrent.impl.Promise.DefaultPromise;
37
38
39 public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
40
41     private static final Logger LOG = LoggerFactory.getLogger(NetconfProxyDOMTransaction.class);
42
43     private final RemoteDeviceId id;
44     private final ActorSystem actorSystem;
45     private final ActorRef masterContextRef;
46
47     public NetconfProxyDOMTransaction(final RemoteDeviceId id,
48                                       final ActorSystem actorSystem,
49                                       final ActorRef masterContextRef) {
50         this.id = id;
51         this.actorSystem = actorSystem;
52         this.masterContextRef = masterContextRef;
53     }
54
55     @Override
56     public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
57                                                         final YangInstanceIdentifier path) {
58
59         final Future<Object> readScalaFuture =
60                 Patterns.ask(masterContextRef, new ReadRequest(store, path), NetconfTopologyUtils.TIMEOUT);
61
62         LOG.trace("{}: Read {} via NETCONF: {}", id, store, path);
63
64         final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
65
66         readScalaFuture.onComplete(new OnComplete<Object>() {
67             @Override
68             public void onComplete(final Throwable failure, final Object success) throws Throwable {
69                 if (failure != null) { // ask timeout
70                     Exception exception = new DocumentedException(id + ":Master is down. Please try again.",
71                             DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
72                             DocumentedException.ErrorSeverity.WARNING);
73                     promise.failure(exception);
74                     return;
75                 }
76                 if (success instanceof Throwable) { // Error sended by master
77                     promise.failure((Throwable) success);
78                     return;
79                 }
80                 if (success instanceof EmptyReadResponse) {
81                     promise.success(Optional.absent());
82                     return;
83                 }
84                 promise.success(Optional.of((NormalizedNodeMessage) success));
85             }
86         }, actorSystem.dispatcher());
87
88         return promise.future();
89     }
90
91     @Override
92     public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
93         final Future<Object> existsScalaFuture =
94                 Patterns.ask(masterContextRef, new ExistsRequest(store, path), NetconfTopologyUtils.TIMEOUT);
95
96         LOG.trace("{}: Exists {} via NETCONF: {}", id, store, path);
97
98         final DefaultPromise<Boolean> promise = new DefaultPromise<>();
99         existsScalaFuture.onComplete(new OnComplete<Object>() {
100             @Override
101             public void onComplete(final Throwable failure, final Object success) throws Throwable {
102                 if (failure != null) { // ask timeout
103                     Exception exception = new DocumentedException(id + ":Master is down. Please try again.",
104                             DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
105                             DocumentedException.ErrorSeverity.WARNING);
106                     promise.failure(exception);
107                     return;
108                 }
109                 if (success instanceof Throwable) {
110                     promise.failure((Throwable) success);
111                     return;
112                 }
113                 promise.success((Boolean) success);
114             }
115         }, actorSystem.dispatcher());
116         return promise.future();
117     }
118
119     @Override
120     public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
121         LOG.trace("{}: Write {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
122
123         masterContextRef.tell(new PutRequest(store, data), ActorRef.noSender());
124
125     }
126
127     @Override
128     public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
129         LOG.trace("{}: Merge {} via NETCONF: {} with payload {}", id, store, data.getIdentifier(), data.getNode());
130
131         masterContextRef.tell(new MergeRequest(store, data), ActorRef.noSender());
132     }
133
134     @Override
135     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
136         LOG.trace("{}: Delete {} via NETCONF: {}", id, store, path);
137
138         masterContextRef.tell(new DeleteRequest(store, path), ActorRef.noSender());
139     }
140
141     @Override
142     public boolean cancel() {
143         final Future<Object> cancelScalaFuture =
144                 Patterns.ask(masterContextRef, new CancelRequest(), NetconfTopologyUtils.TIMEOUT);
145
146         LOG.trace("{}: Cancel {} via NETCONF", id);
147
148         try {
149             // here must be Await because AsyncWriteTransaction do not return future
150             return (boolean) Await.result(cancelScalaFuture, NetconfTopologyUtils.TIMEOUT.duration());
151         } catch (Exception e) {
152             return false;
153         }
154     }
155
156     @Override
157     public Future<Void> submit() {
158         final Future<Object> submitScalaFuture =
159                 Patterns.ask(masterContextRef, new SubmitRequest(), NetconfTopologyUtils.TIMEOUT);
160
161         LOG.trace("{}: Submit {} via NETCONF", id);
162
163         final DefaultPromise<Void> promise = new DefaultPromise<>();
164
165         submitScalaFuture.onComplete(new OnComplete<Object>() {
166             @Override
167             public void onComplete(final Throwable failure, final Object success) throws Throwable {
168                 if (failure != null) { // ask timeout
169                     Exception exception = new DocumentedException(id + ":Master is down. Please try again.",
170                             DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED,
171                             DocumentedException.ErrorSeverity.WARNING);
172                     promise.failure(exception);
173                     return;
174                 }
175                 if (success instanceof Throwable) {
176                     promise.failure((Throwable) success);
177                 } else {
178                     if (success instanceof SubmitFailedReply) {
179                         LOG.error("{}: Transaction was not submitted because already closed.", id);
180                     }
181                     promise.success(null);
182                 }
183             }
184         }, actorSystem.dispatcher());
185
186         return promise.future();
187     }
188
189 }