Bug 6714 - Use singleton service in clustered netconf topology
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / NetconfMasterDOMTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.singleton.impl.tx;
10
11 import com.google.common.base.Optional;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import javax.annotation.Nonnull;
16 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
17 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
18 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
19 import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
20 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
21 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
22 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
23 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
24 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
25 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
26 import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
27 import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
30 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
31 import scala.concurrent.Future;
32 import scala.concurrent.impl.Promise.DefaultPromise;
33
34 public class NetconfMasterDOMTransaction implements NetconfDOMTransaction {
35
36     private final DOMDataBroker delegateBroker;
37
38     private DOMDataReadOnlyTransaction readTx;
39     private DOMDataWriteTransaction writeTx;
40
41     public NetconfMasterDOMTransaction(final RemoteDeviceId id,
42                                        final SchemaContext schemaContext, final DOMRpcService rpc,
43                                        final NetconfSessionPreferences netconfSessionPreferences) {
44
45         delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences);
46
47         // only ever need 1 readTx since it doesnt need to be closed
48         readTx = delegateBroker.newReadOnlyTransaction();
49     }
50
51     public NetconfMasterDOMTransaction(final DOMDataBroker delegateBroker) {
52         this.delegateBroker = delegateBroker;
53
54         // only ever need 1 readTx since it doesnt need to be closed
55         readTx = delegateBroker.newReadOnlyTransaction();
56     }
57
58     @Override
59     public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
60                                                         final YangInstanceIdentifier path) {
61         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
62
63         final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
64         Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
65             @Override
66             public void onSuccess(final Optional<NormalizedNode<?, ?>> result) {
67                 if (!result.isPresent()) {
68                     promise.success(Optional.absent());
69                 } else {
70                     promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
71                 }
72             }
73
74             @Override
75             public void onFailure(@Nonnull final Throwable throwable) {
76                 promise.failure(throwable);
77             }
78         });
79         return promise.future();
80     }
81
82     @Override
83     public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
84         final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
85
86         final DefaultPromise<Boolean> promise = new DefaultPromise<>();
87         Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
88             @Override
89             public void onSuccess(final Boolean result) {
90                 promise.success(result);
91             }
92
93             @Override
94             public void onFailure(@Nonnull final Throwable throwable) {
95                 promise.failure(throwable);
96             }
97         });
98         return promise.future();
99     }
100
101     @Override
102     public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
103         if (writeTx == null) {
104             writeTx = delegateBroker.newWriteOnlyTransaction();
105         }
106         writeTx.put(store, data.getIdentifier(), data.getNode());
107     }
108
109     @Override
110     public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
111         if (writeTx == null) {
112             writeTx = delegateBroker.newWriteOnlyTransaction();
113         }
114         writeTx.merge(store, data.getIdentifier(), data.getNode());
115     }
116
117     @Override
118     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
119         if (writeTx == null) {
120             writeTx = delegateBroker.newWriteOnlyTransaction();
121         }
122         writeTx.delete(store, path);
123     }
124
125     @Override
126     public boolean cancel() {
127         return writeTx.cancel();
128     }
129
130     @Override
131     public Future<Void> submit() {
132         final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
133         final DefaultPromise<Void> promise = new DefaultPromise<>();
134         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
135             @Override
136             public void onSuccess(final Void result) {
137                 promise.success(result);
138                 writeTx = null;
139             }
140
141             @Override
142             public void onFailure(@Nonnull final Throwable throwable) {
143                 promise.failure(throwable);
144                 writeTx = null;
145             }
146         });
147         return promise.future();
148     }
149
150 }