f19d0d535b98bf5afffe37bdaaa01979024afdbc
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / RemoteOperationTxProcessorImpl.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Status;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import javax.annotation.Nonnull;
18 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
19 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
20 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
22 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
23 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
24 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
25 import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
26 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
27 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
28 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
29 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcessor, AutoCloseable {
36
37     private static final Logger LOG = LoggerFactory.getLogger(RemoteOperationTxProcessorImpl.class);
38
39     private final DOMDataBroker dataBroker;
40     private final RemoteDeviceId id;
41     private DOMDataWriteTransaction writeTx;
42     private DOMDataReadOnlyTransaction readTx;
43
44     private ActorRef currentUser = null;
45
46     public RemoteOperationTxProcessorImpl(final DOMDataBroker dataBroker, final RemoteDeviceId id) {
47         this.dataBroker = dataBroker;
48         this.id = id;
49         this.readTx = dataBroker.newReadOnlyTransaction();
50     }
51
52     @Override
53     public void doOpenTransaction(ActorRef recipient, ActorRef sender) {
54         if (currentUser != null) {
55             LOG.error("{}: Opening a new transaction for {} failed.", id, recipient);
56             recipient.tell(new Status.Failure(
57                     new IllegalStateException("Transaction is already opened for another user")), recipient);
58             return;
59         }
60
61         LOG.debug("{}: Opening a new transaction for {}", id, recipient);
62         currentUser = recipient;
63         recipient.tell(new Status.Success(null), sender);
64     }
65
66     @Override
67     public void doDelete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
68         if (writeTx == null) {
69             writeTx = dataBroker.newWriteOnlyTransaction();
70         }
71         writeTx.delete(store, path);
72     }
73
74     @Override
75     public void doSubmit(final ActorRef recipient, final ActorRef sender) {
76         currentUser = null;
77         if (writeTx != null) {
78             CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
79             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
80                 @Override
81                 public void onSuccess(Void result) {
82                     recipient.tell(new SubmitReply(), sender);
83                 }
84
85                 @Override
86                 public void onFailure(@Nonnull Throwable throwable) {
87                     recipient.tell(throwable, sender);
88                 }
89             });
90         } else {
91             recipient.tell(new SubmitFailedReply(), sender);
92             LOG.warn("{}: Couldn't submit transaction because it was already closed.", id);
93         }
94     }
95
96     @Override
97     public void doCancel(final ActorRef recipient, final ActorRef sender) {
98         currentUser = null;
99         boolean cancel = false;
100         if (writeTx != null) {
101             cancel = writeTx.cancel();
102         }
103         recipient.tell(cancel, sender);
104
105     }
106
107     @Override
108     public void doPut(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
109         if (writeTx == null) {
110             writeTx = dataBroker.newWriteOnlyTransaction();
111         }
112         writeTx.put(store, data.getIdentifier(), data.getNode());
113     }
114
115     @Override
116     public void doMerge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
117         if (writeTx == null) {
118             writeTx = dataBroker.newWriteOnlyTransaction();
119         }
120         writeTx.merge(store, data.getIdentifier(), data.getNode());
121     }
122
123     @Override
124     public void doRead(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
125                        final ActorRef sender) {
126         final CheckedFuture<Optional<NormalizedNode<?,?>>, ReadFailedException> readFuture =
127                 readTx.read(store, path);
128
129         Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
130
131             @Override
132             public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
133                 if (!result.isPresent()) {
134                     recipient.tell(new EmptyReadResponse(), sender);
135                     return;
136                 }
137                 recipient.tell(new NormalizedNodeMessage(path, result.get()), sender);
138             }
139
140             @Override
141             public void onFailure(@Nonnull final Throwable throwable) {
142                 recipient.tell(throwable, sender);
143             }
144         });
145     }
146
147     @Override
148     public void doExists(final LogicalDatastoreType store, final YangInstanceIdentifier path, final ActorRef recipient,
149                          final ActorRef sender) {
150         final CheckedFuture<Boolean, ReadFailedException> readFuture =
151                 readTx.exists(store, path);
152         Futures.addCallback(readFuture, new FutureCallback<Boolean>() {
153             @Override
154             public void onSuccess(final Boolean result) {
155                 if (result == null) {
156                     recipient.tell(false, sender);
157                 } else {
158                     recipient.tell(result, sender);
159                 }
160             }
161
162             @Override
163             public void onFailure(@Nonnull final Throwable throwable) {
164                 recipient.tell(throwable, sender);
165             }
166         });
167     }
168
169     @Override
170     public void close() throws Exception {
171         currentUser = null;
172         if (readTx != null) {
173             readTx.close();
174         }
175     }
176 }