Convert ProxyDOMDataBroker tx creation to async
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / ActorProxyTransactionFacade.java
1 /*
2  * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
9
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.pattern.AskTimeoutException;
13 import akka.pattern.Patterns;
14 import akka.util.Timeout;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FluentFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.Objects;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
22 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
23 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
24 import org.opendaylight.mdsal.common.api.CommitInfo;
25 import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
26 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
27 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
28 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
30 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
32 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
34 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import scala.concurrent.ExecutionContext;
42 import scala.concurrent.Future;
43
44 /**
45  * ProxyTransactionFacade implementation that interfaces with an actor.
46  *
47  * @author Thomas Pantelis
48  */
49 class ActorProxyTransactionFacade implements ProxyTransactionFacade {
50     private static final Logger LOG = LoggerFactory.getLogger(ActorProxyTransactionFacade.class);
51
52     private final ActorRef masterTxActor;
53     private final RemoteDeviceId id;
54     private final ExecutionContext executionContext;
55     private final Timeout askTimeout;
56
57     ActorProxyTransactionFacade(ActorRef masterTxActor, RemoteDeviceId id, ExecutionContext executionContext,
58             Timeout askTimeout) {
59         this.masterTxActor = Objects.requireNonNull(masterTxActor);
60         this.id = Objects.requireNonNull(id);
61         this.executionContext = Objects.requireNonNull(executionContext);
62         this.askTimeout = Objects.requireNonNull(askTimeout);
63     }
64
65     @Override
66     public Object getIdentifier() {
67         return id;
68     }
69
70     @Override
71     public boolean cancel() {
72         LOG.debug("{}: Cancel via actor {}", id, masterTxActor);
73
74         final Future<Object> future = Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
75
76         future.onComplete(new OnComplete<Object>() {
77             @Override
78             public void onComplete(final Throwable failure, final Object response) {
79                 if (failure != null) {
80                     LOG.warn("{}: Cancel failed", id, failure);
81                     return;
82                 }
83
84                 LOG.debug("{}: Cancel succeeded", id);
85             }
86         }, executionContext);
87
88         return true;
89     }
90
91     @Override
92     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(LogicalDatastoreType store,
93             YangInstanceIdentifier path) {
94         LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor);
95
96         final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
97
98         final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
99         future.onComplete(new OnComplete<Object>() {
100             @Override
101             public void onComplete(final Throwable failure, final Object response) {
102                 if (failure != null) {
103                     LOG.debug("{}: Read {} {} failed", id, store, path, failure);
104                     settableFuture.setException(processFailure(failure));
105                     return;
106                 }
107
108                 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
109
110                 if (response instanceof EmptyReadResponse) {
111                     settableFuture.set(Optional.absent());
112                     return;
113                 }
114
115                 if (response instanceof NormalizedNodeMessage) {
116                     final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
117                     settableFuture.set(Optional.of(data.getNode()));
118                 }
119             }
120         }, executionContext);
121
122         return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
123     }
124
125     @Override
126     public CheckedFuture<Boolean, ReadFailedException> exists(LogicalDatastoreType store, YangInstanceIdentifier path) {
127         LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor);
128
129         final Future<Object> future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
130
131         final SettableFuture<Boolean> settableFuture = SettableFuture.create();
132         future.onComplete(new OnComplete<Object>() {
133             @Override
134             public void onComplete(final Throwable failure, final Object response) {
135                 if (failure != null) {
136                     LOG.debug("{}: Exists {} {} failed", id, store, path, failure);
137                     settableFuture.setException(processFailure(failure));
138                     return;
139                 }
140
141                 LOG.debug("{}: Exists {} {} succeeded: {}", id, store, path, response);
142
143                 settableFuture.set((Boolean) response);
144             }
145         }, executionContext);
146
147         return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
148     }
149
150     @Override
151     public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) {
152         LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor);
153         masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender());
154     }
155
156     @Override
157     public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
158         LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor);
159         masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
160     }
161
162     @Override
163     public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
164         LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor);
165         masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
166     }
167
168     @Override
169     public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
170         LOG.debug("{}: Commit via actor {}", id, masterTxActor);
171
172         final Future<Object> future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
173
174         final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
175         future.onComplete(new OnComplete<Object>() {
176             @Override
177             public void onComplete(final Throwable failure, final Object response) {
178                 if (failure != null) {
179                     LOG.debug("{}: Commit failed", id, failure);
180                     settableFuture.setException(newTransactionCommitFailedException(processFailure(failure)));
181                     return;
182                 }
183
184                 LOG.debug("{}: Commit succeeded", id);
185
186                 settableFuture.set(CommitInfo.empty());
187             }
188         }, executionContext);
189
190         return settableFuture;
191     }
192
193     private TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure) {
194         return new TransactionCommitFailedException(String.format("%s: Commit of transaction failed", getIdentifier()),
195                 failure);
196     }
197
198     private Throwable processFailure(Throwable failure) {
199         if (failure instanceof AskTimeoutException) {
200             return NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure);
201         }
202
203         return failure;
204     }
205 }