d39e960aeb32ee2e019e6d5d27d48a06c3f38e8c
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / ActorProxyTransactionFacade.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
9
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.pattern.AskTimeoutException;
13 import akka.pattern.Patterns;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.FluentFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import java.util.Objects;
18 import java.util.Optional;
19 import org.opendaylight.mdsal.common.api.CommitInfo;
20 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
21 import org.opendaylight.mdsal.common.api.ReadFailedException;
22 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
23 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
24 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
25 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
26 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
30 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
32 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
34 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import scala.concurrent.ExecutionContext;
39 import scala.concurrent.Future;
40
41 /**
42  * ProxyTransactionFacade implementation that interfaces with an actor.
43  *
44  * @author Thomas Pantelis
45  */
46 class ActorProxyTransactionFacade implements ProxyTransactionFacade {
47     private static final Logger LOG = LoggerFactory.getLogger(ActorProxyTransactionFacade.class);
48
49     private final ActorRef masterTxActor;
50     private final RemoteDeviceId id;
51     private final ExecutionContext executionContext;
52     private final Timeout askTimeout;
53
54     ActorProxyTransactionFacade(final ActorRef masterTxActor, final RemoteDeviceId id,
55             final ExecutionContext executionContext, final Timeout askTimeout) {
56         this.masterTxActor = Objects.requireNonNull(masterTxActor);
57         this.id = Objects.requireNonNull(id);
58         this.executionContext = Objects.requireNonNull(executionContext);
59         this.askTimeout = Objects.requireNonNull(askTimeout);
60     }
61
62     @Override
63     public Object getIdentifier() {
64         return id;
65     }
66
67     @Override
68     public boolean cancel() {
69         LOG.debug("{}: Cancel via actor {}", id, masterTxActor);
70
71         final Future<Object> future = Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
72
73         future.onComplete(new OnComplete<Object>() {
74             @Override
75             public void onComplete(final Throwable failure, final Object response) {
76                 if (failure != null) {
77                     LOG.warn("{}: Cancel failed", id, failure);
78                     return;
79                 }
80
81                 LOG.debug("{}: Cancel succeeded", id);
82             }
83         }, executionContext);
84
85         return true;
86     }
87
88     @Override
89     public void close() {
90         cancel();
91     }
92
93     @Override
94     public FluentFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
95             final YangInstanceIdentifier path) {
96         LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor);
97
98         final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
99
100         final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
101         future.onComplete(new OnComplete<Object>() {
102             @Override
103             public void onComplete(final Throwable failure, final Object response) {
104                 if (failure != null) {
105                     LOG.debug("{}: Read {} {} failed", id, store, path, failure);
106
107                     final Throwable processedFailure = processFailure(failure);
108                     if (processedFailure instanceof ReadFailedException) {
109                         settableFuture.setException(processedFailure);
110                     } else {
111                         settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
112                             + " failed", processedFailure));
113                     }
114                     return;
115                 }
116
117                 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
118
119                 if (response instanceof EmptyReadResponse) {
120                     settableFuture.set(Optional.empty());
121                     return;
122                 }
123
124                 if (response instanceof NormalizedNodeMessage) {
125                     final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
126                     settableFuture.set(Optional.of(data.getNode()));
127                 }
128             }
129         }, executionContext);
130
131         return FluentFuture.from(settableFuture);
132     }
133
134     @Override
135     public FluentFuture<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
136         LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor);
137
138         final Future<Object> future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
139
140         final SettableFuture<Boolean> settableFuture = SettableFuture.create();
141         future.onComplete(new OnComplete<Object>() {
142             @Override
143             public void onComplete(final Throwable failure, final Object response) {
144                 if (failure != null) {
145                     LOG.debug("{}: Exists {} {} failed", id, store, path, failure);
146
147                     final Throwable processedFailure = processFailure(failure);
148                     if (processedFailure instanceof ReadFailedException) {
149                         settableFuture.setException(processedFailure);
150                     } else {
151                         settableFuture.setException(new ReadFailedException("Exists of store " + store + " path " + path
152                             + " failed", processedFailure));
153                     }
154                     return;
155                 }
156
157                 LOG.debug("{}: Exists {} {} succeeded: {}", id, store, path, response);
158
159                 settableFuture.set((Boolean) response);
160             }
161         }, executionContext);
162
163         return FluentFuture.from(settableFuture);
164     }
165
166     @Override
167     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
168         LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor);
169         masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender());
170     }
171
172     @Override
173     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
174             final NormalizedNode<?, ?> data) {
175         LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor);
176         masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
177     }
178
179     @Override
180     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
181             final NormalizedNode<?, ?> data) {
182         LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor);
183         masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
184     }
185
186     @Override
187     public FluentFuture<? extends CommitInfo> commit() {
188         LOG.debug("{}: Commit via actor {}", id, masterTxActor);
189
190         final Future<Object> future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
191
192         final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
193         future.onComplete(new OnComplete<Object>() {
194             @Override
195             public void onComplete(final Throwable failure, final Object response) {
196                 if (failure != null) {
197                     LOG.debug("{}: Commit failed", id, failure);
198                     settableFuture.setException(newTransactionCommitFailedException(processFailure(failure)));
199                     return;
200                 }
201
202                 LOG.debug("{}: Commit succeeded", id);
203
204                 settableFuture.set(CommitInfo.empty());
205             }
206         }, executionContext);
207
208         return FluentFuture.from(settableFuture);
209     }
210
211     private TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure) {
212         return new TransactionCommitFailedException(String.format("%s: Commit of transaction failed", getIdentifier()),
213                 failure);
214     }
215
216     private Throwable processFailure(final Throwable failure) {
217         if (failure instanceof AskTimeoutException) {
218             return NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure);
219         }
220
221         return failure;
222     }
223 }