2 * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.pattern.AskTimeoutException;
13 import akka.pattern.Patterns;
14 import akka.util.Timeout;
15 import com.google.common.util.concurrent.FluentFuture;
16 import com.google.common.util.concurrent.SettableFuture;
17 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
18 import java.util.Objects;
19 import java.util.Optional;
20 import org.opendaylight.mdsal.common.api.CommitInfo;
21 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
22 import org.opendaylight.mdsal.common.api.ReadFailedException;
23 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
24 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
25 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
26 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
30 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
32 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
34 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.ExecutionContext;
40 import scala.concurrent.Future;
43 * ProxyTransactionFacade implementation that interfaces with an actor.
45 * @author Thomas Pantelis
47 class ActorProxyTransactionFacade implements ProxyTransactionFacade {
48 private static final Logger LOG = LoggerFactory.getLogger(ActorProxyTransactionFacade.class);
50 private final ActorRef masterTxActor;
51 private final RemoteDeviceId id;
52 private final ExecutionContext executionContext;
53 private final Timeout askTimeout;
55 ActorProxyTransactionFacade(final ActorRef masterTxActor, final RemoteDeviceId id,
56 final ExecutionContext executionContext, final Timeout askTimeout) {
57 this.masterTxActor = Objects.requireNonNull(masterTxActor);
58 this.id = Objects.requireNonNull(id);
59 this.executionContext = Objects.requireNonNull(executionContext);
60 this.askTimeout = Objects.requireNonNull(askTimeout);
64 public Object getIdentifier() {
69 public boolean cancel() {
70 LOG.debug("{}: Cancel via actor {}", id, masterTxActor);
72 final Future<Object> future = Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
74 future.onComplete(new OnComplete<>() {
76 public void onComplete(final Throwable failure, final Object response) {
77 if (failure != null) {
78 LOG.warn("{}: Cancel failed", id, failure);
82 LOG.debug("{}: Cancel succeeded", id);
90 public FluentFuture<Optional<NormalizedNode>> read(final LogicalDatastoreType store,
91 final YangInstanceIdentifier path) {
92 LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor);
94 final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
96 final SettableFuture<Optional<NormalizedNode>> settableFuture = SettableFuture.create();
97 future.onComplete(new OnComplete<>() {
99 public void onComplete(final Throwable failure, final Object response) {
100 if (failure != null) {
101 LOG.debug("{}: Read {} {} failed", id, store, path, failure);
103 final Throwable processedFailure = processFailure(failure);
104 if (processedFailure instanceof ReadFailedException) {
105 settableFuture.setException(processedFailure);
107 settableFuture.setException(new ReadFailedException("Read of store " + store + " path " + path
108 + " failed", processedFailure));
113 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
115 if (response instanceof EmptyReadResponse) {
116 settableFuture.set(Optional.empty());
120 if (response instanceof NormalizedNodeMessage) {
121 final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
122 settableFuture.set(Optional.of(data.getNode()));
125 }, executionContext);
127 return FluentFuture.from(settableFuture);
131 public FluentFuture<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
132 LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor);
134 final Future<Object> future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
136 final SettableFuture<Boolean> settableFuture = SettableFuture.create();
137 future.onComplete(new OnComplete<>() {
139 public void onComplete(final Throwable failure, final Object response) {
140 if (failure != null) {
141 LOG.debug("{}: Exists {} {} failed", id, store, path, failure);
143 final Throwable processedFailure = processFailure(failure);
144 if (processedFailure instanceof ReadFailedException) {
145 settableFuture.setException(processedFailure);
147 settableFuture.setException(new ReadFailedException("Exists of store " + store + " path " + path
148 + " failed", processedFailure));
153 LOG.debug("{}: Exists {} {} succeeded: {}", id, store, path, response);
155 settableFuture.set((Boolean) response);
157 }, executionContext);
159 return FluentFuture.from(settableFuture);
163 public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
164 LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor);
165 masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender());
169 public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) {
170 LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor);
171 masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
175 public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) {
176 LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor);
177 masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
181 public FluentFuture<? extends CommitInfo> commit() {
182 LOG.debug("{}: Commit via actor {}", id, masterTxActor);
184 final Future<Object> future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
186 final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
187 future.onComplete(new OnComplete<>() {
189 public void onComplete(final Throwable failure, final Object response) {
190 if (failure != null) {
191 LOG.debug("{}: Commit failed", id, failure);
192 settableFuture.setException(newTransactionCommitFailedException(processFailure(failure)));
196 LOG.debug("{}: Commit succeeded", id);
198 settableFuture.set(CommitInfo.empty());
201 private TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure) {
202 return new TransactionCommitFailedException(String.format("%s: Commit of transaction failed",
203 getIdentifier()), failure);
205 }, executionContext);
207 return FluentFuture.from(settableFuture);
210 @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
211 justification = "https://github.com/spotbugs/spotbugs/issues/811")
212 private Throwable processFailure(final Throwable failure) {
213 return failure instanceof AskTimeoutException
214 ? NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure) : failure;