Bug 6714 - Use singleton service in clustered netconf topology
[netconf.git] / netconf / netconf-topology / src / main / java / org / opendaylight / netconf / topology / pipeline / NetconfDeviceMasterDataBroker.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.netconf.topology.pipeline;
10
11 import akka.actor.ActorSystem;
12 import akka.actor.TypedActor;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.Collections;
19 import java.util.Map;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
32 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
34 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
35 import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
36 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
37 import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction;
38 import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction;
39 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import scala.concurrent.Future;
46 import scala.concurrent.impl.Promise.DefaultPromise;
47
48 public class NetconfDeviceMasterDataBroker implements ProxyNetconfDeviceDataBroker {
49
50     private final RemoteDeviceId id;
51
52     private final NetconfDeviceDataBroker delegateBroker;
53     private final ActorSystem actorSystem;
54
55     private DOMDataReadOnlyTransaction readTx;
56     private DOMDataWriteTransaction writeTx;
57
58     public NetconfDeviceMasterDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
59                                          final SchemaContext schemaContext, final DOMRpcService rpc,
60                                          final NetconfSessionPreferences netconfSessionPreferences) {
61         this.id = id;
62         delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences);
63         this.actorSystem = actorSystem;
64
65         // only ever need 1 readTx since it doesnt need to be closed
66         readTx = delegateBroker.newReadOnlyTransaction();
67     }
68
69     @Override
70     public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
71         return new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self());
72     }
73
74     @Override
75     public DOMDataReadWriteTransaction newReadWriteTransaction() {
76         return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self()),
77                 newWriteOnlyTransaction());
78     }
79
80     @Override
81     public DOMDataWriteTransaction newWriteOnlyTransaction() {
82         writeTx = delegateBroker.newWriteOnlyTransaction();
83         return new ProxyWriteOnlyTransaction(actorSystem, TypedActor.<NetconfDeviceMasterDataBroker>self());
84     }
85
86     @Override
87     public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) {
88         throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
89     }
90
91     @Override
92     public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
93         throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
94     }
95
96     @Nonnull
97     @Override
98     public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
99         return Collections.emptyMap();
100     }
101
102     @Override
103     public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
104         final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
105
106         final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
107         Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
108             @Override
109             public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
110                 if (!result.isPresent()) {
111                     promise.success(Optional.<NormalizedNodeMessage>absent());
112                 } else {
113                     promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
114                 }
115             }
116
117             @Override
118             public void onFailure(Throwable t) {
119                 promise.failure(t);
120             }
121         });
122         return promise.future();
123     }
124
125     @Override
126     public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
127         final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
128
129         final DefaultPromise<Boolean> promise = new DefaultPromise<>();
130         Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
131             @Override
132             public void onSuccess(Boolean result) {
133                 promise.success(result);
134             }
135
136             @Override
137             public void onFailure(Throwable t) {
138                 promise.failure(t);
139             }
140         });
141         return promise.future();
142     }
143
144     @Override
145     public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
146         if (writeTx == null) {
147             writeTx = delegateBroker.newWriteOnlyTransaction();
148         }
149         writeTx.put(store, data.getIdentifier(), data.getNode());
150     }
151
152     @Override
153     public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
154         if (writeTx == null) {
155             writeTx = delegateBroker.newWriteOnlyTransaction();
156         }
157         writeTx.merge(store, data.getIdentifier(), data.getNode());
158     }
159
160     @Override
161     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
162         if (writeTx == null) {
163             writeTx = delegateBroker.newWriteOnlyTransaction();
164         }
165         writeTx.delete(store, path);
166     }
167
168     @Override
169     public boolean cancel() {
170         return writeTx.cancel();
171     }
172
173     @Override
174     public Future<Void> submit() {
175         final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
176         final DefaultPromise<Void> promise = new DefaultPromise<>();
177         Futures.addCallback(submitFuture, new FutureCallback<Void>() {
178             @Override
179             public void onSuccess(Void result) {
180                 promise.success(result);
181                 writeTx = null;
182             }
183
184             @Override
185             public void onFailure(Throwable t) {
186                 promise.failure(t);
187                 writeTx = null;
188             }
189         });
190         return promise.future();
191     }
192
193     @Override
194     @Deprecated
195     public Future<RpcResult<TransactionStatus>> commit() {
196         final ListenableFuture<RpcResult<TransactionStatus>> commitFuture = writeTx.commit();
197         final DefaultPromise<RpcResult<TransactionStatus>> promise = new DefaultPromise<>();
198         Futures.addCallback(commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
199             @Override
200             public void onSuccess(RpcResult<TransactionStatus> result) {
201                 promise.success(result);
202                 writeTx = null;
203             }
204
205             @Override
206             public void onFailure(Throwable t) {
207                 promise.failure(t);
208                 writeTx = null;
209             }
210         });
211         return promise.future();
212     }
213
214 }