2 * Copyright (c) 2018 Inocybe Technologies and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.pattern.AskTimeoutException;
13 import akka.pattern.Patterns;
14 import akka.util.Timeout;
15 import com.google.common.base.Optional;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FluentFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.Objects;
20 import org.eclipse.jdt.annotation.NonNull;
21 import org.opendaylight.controller.md.sal.common.api.MappingCheckedFuture;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
25 import org.opendaylight.mdsal.common.api.CommitInfo;
26 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
27 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
28 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
30 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
31 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
32 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
33 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
34 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
35 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
36 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
37 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
38 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import scala.concurrent.ExecutionContext;
42 import scala.concurrent.Future;
45 * ProxyTransactionFacade implementation that interfaces with an actor.
47 * @author Thomas Pantelis
49 class ActorProxyTransactionFacade implements ProxyTransactionFacade {
50 private static final Logger LOG = LoggerFactory.getLogger(ActorProxyTransactionFacade.class);
52 private final ActorRef masterTxActor;
53 private final RemoteDeviceId id;
54 private final ExecutionContext executionContext;
55 private final Timeout askTimeout;
57 ActorProxyTransactionFacade(ActorRef masterTxActor, RemoteDeviceId id, ExecutionContext executionContext,
59 this.masterTxActor = Objects.requireNonNull(masterTxActor);
60 this.id = Objects.requireNonNull(id);
61 this.executionContext = Objects.requireNonNull(executionContext);
62 this.askTimeout = Objects.requireNonNull(askTimeout);
66 public Object getIdentifier() {
71 public boolean cancel() {
72 LOG.debug("{}: Cancel via actor {}", id, masterTxActor);
74 final Future<Object> future = Patterns.ask(masterTxActor, new CancelRequest(), askTimeout);
76 future.onComplete(new OnComplete<Object>() {
78 public void onComplete(final Throwable failure, final Object response) {
79 if (failure != null) {
80 LOG.warn("{}: Cancel failed", id, failure);
84 LOG.debug("{}: Cancel succeeded", id);
92 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(LogicalDatastoreType store,
93 YangInstanceIdentifier path) {
94 LOG.debug("{}: Read {} {} via actor {}", id, store, path, masterTxActor);
96 final Future<Object> future = Patterns.ask(masterTxActor, new ReadRequest(store, path), askTimeout);
98 final SettableFuture<Optional<NormalizedNode<?, ?>>> settableFuture = SettableFuture.create();
99 future.onComplete(new OnComplete<Object>() {
101 public void onComplete(final Throwable failure, final Object response) {
102 if (failure != null) {
103 LOG.debug("{}: Read {} {} failed", id, store, path, failure);
104 settableFuture.setException(processFailure(failure));
108 LOG.debug("{}: Read {} {} succeeded: {}", id, store, path, response);
110 if (response instanceof EmptyReadResponse) {
111 settableFuture.set(Optional.absent());
115 if (response instanceof NormalizedNodeMessage) {
116 final NormalizedNodeMessage data = (NormalizedNodeMessage) response;
117 settableFuture.set(Optional.of(data.getNode()));
120 }, executionContext);
122 return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
126 public CheckedFuture<Boolean, ReadFailedException> exists(LogicalDatastoreType store, YangInstanceIdentifier path) {
127 LOG.debug("{}: Exists {} {} via actor {}", id, store, path, masterTxActor);
129 final Future<Object> future = Patterns.ask(masterTxActor, new ExistsRequest(store, path), askTimeout);
131 final SettableFuture<Boolean> settableFuture = SettableFuture.create();
132 future.onComplete(new OnComplete<Object>() {
134 public void onComplete(final Throwable failure, final Object response) {
135 if (failure != null) {
136 LOG.debug("{}: Exists {} {} failed", id, store, path, failure);
137 settableFuture.setException(processFailure(failure));
141 LOG.debug("{}: Exists {} {} succeeded: {}", id, store, path, response);
143 settableFuture.set((Boolean) response);
145 }, executionContext);
147 return MappingCheckedFuture.create(settableFuture, ReadFailedException.MAPPER);
151 public void delete(LogicalDatastoreType store, YangInstanceIdentifier path) {
152 LOG.debug("{}: Delete {} {} via actor {}", id, store, path, masterTxActor);
153 masterTxActor.tell(new DeleteRequest(store, path), ActorRef.noSender());
157 public void put(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
158 LOG.debug("{}: Put {} {} via actor {}", id, store, path, masterTxActor);
159 masterTxActor.tell(new PutRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
163 public void merge(LogicalDatastoreType store, YangInstanceIdentifier path, NormalizedNode<?, ?> data) {
164 LOG.debug("{}: Merge {} {} via actor {}", id, store, path, masterTxActor);
165 masterTxActor.tell(new MergeRequest(store, new NormalizedNodeMessage(path, data)), ActorRef.noSender());
169 public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
170 LOG.debug("{}: Commit via actor {}", id, masterTxActor);
172 final Future<Object> future = Patterns.ask(masterTxActor, new SubmitRequest(), askTimeout);
174 final SettableFuture<CommitInfo> settableFuture = SettableFuture.create();
175 future.onComplete(new OnComplete<Object>() {
177 public void onComplete(final Throwable failure, final Object response) {
178 if (failure != null) {
179 LOG.debug("{}: Commit failed", id, failure);
180 settableFuture.setException(newTransactionCommitFailedException(processFailure(failure)));
184 LOG.debug("{}: Commit succeeded", id);
186 settableFuture.set(CommitInfo.empty());
188 }, executionContext);
190 return settableFuture;
193 private TransactionCommitFailedException newTransactionCommitFailedException(final Throwable failure) {
194 return new TransactionCommitFailedException(String.format("%s: Commit of transaction failed", getIdentifier()),
198 private Throwable processFailure(Throwable failure) {
199 if (failure instanceof AskTimeoutException) {
200 return NetconfTopologyUtils.createMasterIsDownException(id, (Exception)failure);