2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl;
11 import akka.actor.ActorRef;
12 import akka.actor.Status;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import javax.annotation.Nonnull;
18 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
19 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
22 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
23 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
24 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
25 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
26 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcessor, AutoCloseable {
37 private static final Logger LOG = LoggerFactory.getLogger(RemoteOperationTxProcessorImpl.class);
39 private final DOMDataBroker dataBroker;
40 private final RemoteDeviceId id;
41 private DOMDataWriteTransaction writeTx;
42 private DOMDataReadOnlyTransaction readTx;
44 private ActorRef currentUser = null;
46 public RemoteOperationTxProcessorImpl(final DOMDataBroker dataBroker, final RemoteDeviceId id) {
47 this.dataBroker = dataBroker;
49 this.readTx = dataBroker.newReadOnlyTransaction();
53 public void doOpenTransaction(ActorRef recipient, ActorRef sender) {
54 if (currentUser != null) {
55 LOG.error("{}: Opening a new transaction for {} failed.", id, recipient);
56 recipient.tell(new Status.Failure(
57 new IllegalStateException("Transaction is already opened for another user")), recipient);
61 LOG.debug("{}: Opening a new transaction for {}", id, recipient);
62 currentUser = recipient;
63 recipient.tell(new Status.Success(null), sender);
67 public void doDelete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
68 if (writeTx == null) {
69 writeTx = dataBroker.newWriteOnlyTransaction();
71 writeTx.delete(store, path);
75 public void doSubmit(final ActorRef recipient, final ActorRef sender) {
77 if (writeTx != null) {
78 CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
79 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
81 public void onSuccess(Void result) {
82 recipient.tell(new SubmitReply(), sender);
86 public void onFailure(@Nonnull Throwable throwable) {
87 recipient.tell(throwable, sender);
91 recipient.tell(new SubmitFailedReply(), sender);
92 LOG.warn("{}: Couldn't submit transaction because it was already closed.", id);
97 public void doCancel(final ActorRef recipient, final ActorRef sender) {
99 boolean cancel = false;
100 if (writeTx != null) {
101 cancel = writeTx.cancel();
103 recipient.tell(cancel, sender);
108 public void doPut(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
109 if (writeTx == null) {
110 writeTx = dataBroker.newWriteOnlyTransaction();
112 writeTx.put(store, data.getIdentifier(), data.getNode());
116 public void doMerge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
117 if (writeTx == null) {
118 writeTx = dataBroker.newWriteOnlyTransaction();
120 writeTx.merge(store, data.getIdentifier(), data.getNode());
124 public void doRead(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
125 final ActorRef sender) {
126 final CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> readFuture =
127 readTx.read(store, path);
129 Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
132 public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
133 if (!result.isPresent()) {
134 recipient.tell(new EmptyReadResponse(), sender);
137 recipient.tell(new NormalizedNodeMessage(path, result.get()), sender);
141 public void onFailure(@Nonnull final Throwable throwable) {
142 recipient.tell(throwable, sender);
148 public void doExists(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
149 final ActorRef sender) {
150 final CheckedFuture<Boolean, ReadFailedException> readFuture =
151 readTx.exists(store, path);
152 Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
154 public void onSuccess(final Boolean result) {
155 if (result == null) {
156 recipient.tell(false, sender);
158 recipient.tell(result, sender);
163 public void onFailure(@Nonnull final Throwable throwable) {
164 recipient.tell(throwable, sender);
170 public void close() throws Exception {
172 if (readTx != null) {