Bump odlparent to 6.0.0
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / ActorProxyTransactionFacade.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
9
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.pattern.AskTimeoutException;
13 import akka.pattern.Patterns;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.FluentFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.Objects;
19 import java.util.Optional;
20 import org.opendaylight.mdsal.common.api.CommitInfo;
21 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
22 import org.opendaylight.mdsal.common.api.ReadFailedException;
23 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
24 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
25 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
26 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
30 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
32 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
34 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.ExecutionContext;
40 import scala.concurrent.Future;
41
42 /**
43  * ProxyTransactionFacade implementation that interfaces with an actor.
44  *
45  * @author Thomas Pantelis
46  */
47 class ActorProxyTransactionFacade implements ProxyTransactionFacade {
48     private static final Logger LOG = LoggerFactory.getLogger(ActorProxyTransactionFacade.class);
49
50     private final ActorRef masterTxActor;
51     private final RemoteDeviceId id;
52     private final ExecutionContext executionContext;
53     private final Timeout askTimeout;
54
55     ActorProxyTransactionFacade(final ActorRef masterTxActor, final RemoteDeviceId id,
56             final ExecutionContext executionContext, final Timeout askTimeout) {
57         this.masterTxActor = Objects.requireNonNull(masterTxActor);
58         this.id = Objects.requireNonNull(id);
59         this.executionContext = Objects.requireNonNull(executionContext);
60         this.askTimeout = Objects.requireNonNull(askTimeout);
61     }
62
63     @Override
64     public Object getIdentifier() {
65         return id;
66     }
67
68     @Override
69     public boolean cancel() {
70         LOG.debug("{}: Cancel via actor {}", id, masterTxActor);
71
72         final Future<Object> future = Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
73
74         future.onComplete(new OnComplete<Object>() {
75             @Override
76             public void onComplete(final Throwable failure, final Object response) {
77                 if (failure != null) {
78                     LOG.warn("{}: Cancel failed", id, failure);
79                     return;
80                 }
81
82                 LOG.debug("{}: Cancel succeeded", id);
83             }
84         }, executionContext);
85
86         return true;
87     }
88
89     @Override
90     public FluentFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
91             final YangInstanceIdentifier path) {
92         LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor);
93
94         final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
95
96         final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
97         future.onComplete(new OnComplete<Object>() {
98             @Override
99             public void onComplete(final Throwable failure, final Object response) {
100                 if (failure != null) {
101                     LOG.debug("{}: Read {} {} failed", id, store, path, failure);
102
103                     final Throwable processedFailure = processFailure(failure);
104                     if (processedFailure instanceof ReadFailedException) {
105                         settableFuture.setException(processedFailure);
106                     } else {
107                         settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
108                             + " failed", processedFailure));
109                     }
110                     return;
111                 }
112
113                 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
114
115                 if (response instanceof EmptyReadResponse) {
116                     settableFuture.set(Optional.empty());
117                     return;
118                 }
119
120                 if (response instanceof NormalizedNodeMessage) {
121                     final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
122                     settableFuture.set(Optional.of(data.getNode()));
123                 }
124             }
125         }, executionContext);
126
127         return FluentFuture.from(settableFuture);
128     }
129
130     @Override
131     public FluentFuture<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
132         LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor);
133
134         final Future<Object> future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
135
136         final SettableFuture<Boolean> settableFuture = SettableFuture.create();
137         future.onComplete(new OnComplete<Object>() {
138             @Override
139             public void onComplete(final Throwable failure, final Object response) {
140                 if (failure != null) {
141                     LOG.debug("{}: Exists {} {} failed", id, store, path, failure);
142
143                     final Throwable processedFailure = processFailure(failure);
144                     if (processedFailure instanceof ReadFailedException) {
145                         settableFuture.setException(processedFailure);
146                     } else {
147                         settableFuture.setException(new ReadFailedException("Exists of store " + store + " path " + path
148                             + " failed", processedFailure));
149                     }
150                     return;
151                 }
152
153                 LOG.debug("{}: Exists {} {} succeeded: {}", id, store, path, response);
154
155                 settableFuture.set((Boolean) response);
156             }
157         }, executionContext);
158
159         return FluentFuture.from(settableFuture);
160     }
161
162     @Override
163     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
164         LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor);
165         masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender());
166     }
167
168     @Override
169     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
170             final NormalizedNode<?, ?> data) {
171         LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor);
172         masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
173     }
174
175     @Override
176     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
177             final NormalizedNode<?, ?> data) {
178         LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor);
179         masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
180     }
181
182     @Override
183     public FluentFuture<? extends CommitInfo> commit() {
184         LOG.debug("{}: Commit via actor {}", id, masterTxActor);
185
186         final Future<Object> future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
187
188         final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
189         future.onComplete(new OnComplete<Object>() {
190             @Override
191             public void onComplete(final Throwable failure, final Object response) {
192                 if (failure != null) {
193                     LOG.debug("{}: Commit failed", id, failure);
194                     settableFuture.setException(newTransactionCommitFailedException(processFailure(failure)));
195                     return;
196                 }
197
198                 LOG.debug("{}: Commit succeeded", id);
199
200                 settableFuture.set(CommitInfo.empty());
201             }
202
203             private TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure) {
204                 return new TransactionCommitFailedException(String.format("%s: Commit of transaction failed",
205                     getIdentifier()), failure);
206             }
207         }, executionContext);
208
209         return FluentFuture.from(settableFuture);
210     }
211
212     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
213             justification = "https://github.com/spotbugs/spotbugs/issues/811")
214     private Throwable processFailure(final Throwable failure) {
215         return failure instanceof AskTimeoutException
216                 ? NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure) : failure;
217     }
218 }