2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl.tx;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorSystem;
13 import akka.dispatch.OnComplete;
14 import akka.pattern.Patterns;
15 import com.google.common.base.Optional;
16 import org.opendaylight.controller.config.util.xml.DocumentedException;
17 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
18 import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
19 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
20 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
21 import org.opendaylight.netconf.topology.singleton.messages.SubmitFailedReply;
22 import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
23 import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
24 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
25 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
26 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.Await;
34 import scala.concurrent.Future;
35 import scala.concurrent.impl.Promise.DefaultPromise;
38 public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
40 private static final Logger LOG = LoggerFactory.getLogger(NetconfProxyDOMTransaction.class);
42 private final ActorSystem actorSystem;
43 private final ActorRef masterContextRef;
45 public NetconfProxyDOMTransaction(final ActorSystem actorSystem, final ActorRef masterContextRef) {
46 this.actorSystem = actorSystem;
47 this.masterContextRef = masterContextRef;
51 public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
52 final YangInstanceIdentifier path) {
54 final Future<Object> readScalaFuture =
55 Patterns.ask(masterContextRef, new ReadRequest(store, path), NetconfTopologyUtils.TIMEOUT);
57 final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
59 readScalaFuture.onComplete(new OnComplete<Object>() {
61 public void onComplete(final Throwable failure, final Object success) throws Throwable {
62 if (failure != null) { // ask timeout
63 Exception exception = new DocumentedException("Master is down. Please try again.",
64 DocumentedException.ErrorType.application, DocumentedException.ErrorTag.operation_failed,
65 DocumentedException.ErrorSeverity.warning);
66 promise.failure(exception);
69 if (success instanceof Throwable) { // Error sended by master
70 promise.failure((Throwable) success);
73 if (success instanceof EmptyReadResponse) {
74 promise.success(Optional.absent());
78 promise.success(Optional.of((NormalizedNodeMessage) success));
80 }, actorSystem.dispatcher());
82 return promise.future();
86 public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
87 final Future<Object> existsScalaFuture =
88 Patterns.ask(masterContextRef, new ExistsRequest(store, path), NetconfTopologyUtils.TIMEOUT);
90 final DefaultPromise<Boolean> promise = new DefaultPromise<>();
91 existsScalaFuture.onComplete(new OnComplete<Object>() {
93 public void onComplete(final Throwable failure, final Object success) throws Throwable {
94 if (failure != null) { // ask timeout
95 Exception exception = new DocumentedException("Master is down. Please try again.",
96 DocumentedException.ErrorType.application, DocumentedException.ErrorTag.operation_failed,
97 DocumentedException.ErrorSeverity.warning);
98 promise.failure(exception);
101 if (success instanceof Throwable) {
102 promise.failure((Throwable) success);
105 promise.success((Boolean) success);
107 }, actorSystem.dispatcher());
108 return promise.future();
112 public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
113 masterContextRef.tell(new PutRequest(store, data), ActorRef.noSender());
118 public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
119 masterContextRef.tell(new MergeRequest(store, data), ActorRef.noSender());
123 public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
124 masterContextRef.tell(new DeleteRequest(store, path), ActorRef.noSender());
128 public boolean cancel() {
129 final Future<Object> cancelScalaFuture =
130 Patterns.ask(masterContextRef, new CancelRequest(), NetconfTopologyUtils.TIMEOUT);
132 // here must be Await because AsyncWriteTransaction do not return future
133 return (boolean) Await.result(cancelScalaFuture, NetconfTopologyUtils.TIMEOUT.duration());
134 } catch (Exception e) {
140 public Future<Void> submit() {
141 final Future<Object> submitScalaFuture =
142 Patterns.ask(masterContextRef, new SubmitRequest(), NetconfTopologyUtils.TIMEOUT);
144 final DefaultPromise<Void> promise = new DefaultPromise<>();
146 submitScalaFuture.onComplete(new OnComplete<Object>() {
148 public void onComplete(final Throwable failure, final Object success) throws Throwable {
149 if (failure != null) { // ask timeout
150 Exception exception = new DocumentedException("Master is down. Please try again.",
151 DocumentedException.ErrorType.application, DocumentedException.ErrorTag.operation_failed,
152 DocumentedException.ErrorSeverity.warning);
153 promise.failure(exception);
156 if (success instanceof Throwable) {
157 promise.failure((Throwable) success);
159 if (success instanceof SubmitFailedReply) {
160 LOG.error("Transaction was not submitted.");
162 promise.success(null);
165 }, actorSystem.dispatcher());
167 return promise.future();