2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.pipeline;
11 import akka.actor.ActorSystem;
12 import akka.actor.TypedActor;
13 import com.google.common.base.Optional;
14 import com.google.common.util.concurrent.CheckedFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import java.util.Collections;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.controller.md.sal.common.api.TransactionStatus;
22 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
23 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
26 import org.opendaylight.controller.md.sal.dom.api.DOMDataBrokerExtension;
27 import org.opendaylight.controller.md.sal.dom.api.DOMDataChangeListener;
28 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
30 import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
31 import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
32 import org.opendaylight.controller.md.sal.dom.api.DOMTransactionChain;
33 import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences;
34 import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceDataBroker;
35 import org.opendaylight.netconf.sal.connect.netconf.sal.tx.ReadWriteTx;
36 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
37 import org.opendaylight.netconf.topology.pipeline.tx.ProxyReadOnlyTransaction;
38 import org.opendaylight.netconf.topology.pipeline.tx.ProxyWriteOnlyTransaction;
39 import org.opendaylight.netconf.topology.util.messages.NormalizedNodeMessage;
40 import org.opendaylight.yangtools.concepts.ListenerRegistration;
41 import org.opendaylight.yangtools.yang.common.RpcResult;
42 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
43 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
44 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
45 import scala.concurrent.Future;
46 import scala.concurrent.impl.Promise.DefaultPromise;
48 public class NetconfDeviceMasterDataBroker implements ProxyNetconfDeviceDataBroker {
50 private final RemoteDeviceId id;
52 private final NetconfDeviceDataBroker delegateBroker;
53 private final ActorSystem actorSystem;
55 private DOMDataReadOnlyTransaction readTx;
56 private DOMDataWriteTransaction writeTx;
58 public NetconfDeviceMasterDataBroker(final ActorSystem actorSystem, final RemoteDeviceId id,
59 final SchemaContext schemaContext, final DOMRpcService rpc,
60 final NetconfSessionPreferences netconfSessionPreferences) {
62 delegateBroker = new NetconfDeviceDataBroker(id, schemaContext, rpc, netconfSessionPreferences);
63 this.actorSystem = actorSystem;
65 // only ever need 1 readTx since it doesnt need to be closed
66 readTx = delegateBroker.newReadOnlyTransaction();
70 public DOMDataReadOnlyTransaction newReadOnlyTransaction() {
71 return new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self());
75 public DOMDataReadWriteTransaction newReadWriteTransaction() {
76 return new ReadWriteTx(new ProxyReadOnlyTransaction(actorSystem, id, TypedActor.<NetconfDeviceMasterDataBroker>self()),
77 newWriteOnlyTransaction());
81 public DOMDataWriteTransaction newWriteOnlyTransaction() {
82 writeTx = delegateBroker.newWriteOnlyTransaction();
83 return new ProxyWriteOnlyTransaction(actorSystem, TypedActor.<NetconfDeviceMasterDataBroker>self());
87 public ListenerRegistration<DOMDataChangeListener> registerDataChangeListener(LogicalDatastoreType store, YangInstanceIdentifier path, DOMDataChangeListener listener, DataChangeScope triggeringScope) {
88 throw new UnsupportedOperationException(id + ": Data change listeners not supported for netconf mount point");
92 public DOMTransactionChain createTransactionChain(TransactionChainListener listener) {
93 throw new UnsupportedOperationException(id + ": Transaction chains not supported for netconf mount point");
98 public Map<Class<? extends DOMDataBrokerExtension>, DOMDataBrokerExtension> getSupportedExtensions() {
99 return Collections.emptyMap();
103 public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
104 final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> readFuture = readTx.read(store, path);
106 final DefaultPromise<Optional<NormalizedNodeMessage>> promise = new DefaultPromise<>();
107 Futures.addCallback(readFuture, new FutureCallback<Optional<NormalizedNode<?, ?>>>() {
109 public void onSuccess(Optional<NormalizedNode<?, ?>> result) {
110 if (!result.isPresent()) {
111 promise.success(Optional.<NormalizedNodeMessage>absent());
113 promise.success(Optional.of(new NormalizedNodeMessage(path, result.get())));
118 public void onFailure(Throwable t) {
122 return promise.future();
126 public Future<Boolean> exists(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
127 final CheckedFuture<Boolean, ReadFailedException> existsFuture = readTx.exists(store, path);
129 final DefaultPromise<Boolean> promise = new DefaultPromise<>();
130 Futures.addCallback(existsFuture, new FutureCallback<Boolean>() {
132 public void onSuccess(Boolean result) {
133 promise.success(result);
137 public void onFailure(Throwable t) {
141 return promise.future();
145 public void put(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
146 if (writeTx == null) {
147 writeTx = delegateBroker.newWriteOnlyTransaction();
149 writeTx.put(store, data.getIdentifier(), data.getNode());
153 public void merge(final LogicalDatastoreType store, final NormalizedNodeMessage data) {
154 if (writeTx == null) {
155 writeTx = delegateBroker.newWriteOnlyTransaction();
157 writeTx.merge(store, data.getIdentifier(), data.getNode());
161 public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
162 if (writeTx == null) {
163 writeTx = delegateBroker.newWriteOnlyTransaction();
165 writeTx.delete(store, path);
169 public boolean cancel() {
170 return writeTx.cancel();
174 public Future<Void> submit() {
175 final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
176 final DefaultPromise<Void> promise = new DefaultPromise<>();
177 Futures.addCallback(submitFuture, new FutureCallback<Void>() {
179 public void onSuccess(Void result) {
180 promise.success(result);
184 public void onFailure(Throwable t) {
188 return promise.future();
193 public Future<RpcResult<TransactionStatus>> commit() {
194 final ListenableFuture<RpcResult<TransactionStatus>> commitFuture = writeTx.commit();
195 final DefaultPromise<RpcResult<TransactionStatus>> promise = new DefaultPromise<>();
196 Futures.addCallback(commitFuture, new FutureCallback<RpcResult<TransactionStatus>>() {
198 public void onSuccess(RpcResult<TransactionStatus> result) {
199 promise.success(result);
203 public void onFailure(Throwable t) {
207 return promise.future();