2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Optional;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.function.Consumer;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.mdsal.common.api.CommitInfo;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
28 import org.opendaylight.netconf.sal.connect.api.RemoteDeviceId;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.ExecutionContext;
34 import scala.concurrent.Future;
37 * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
38 * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
40 public class ProxyReadWriteTransaction implements DOMDataTreeReadWriteTransaction {
41 private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class);
43 private final RemoteDeviceId id;
44 private final AtomicBoolean opened = new AtomicBoolean(true);
46 @GuardedBy("queuedTxOperations")
47 private final List<Consumer<ProxyTransactionFacade>> queuedTxOperations = new ArrayList<>();
49 private volatile ProxyTransactionFacade transactionFacade;
51 public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
52 final ExecutionContext executionContext, final Timeout askTimeout) {
55 masterTxActorFuture.onComplete(new OnComplete<>() {
57 public void onComplete(final Throwable failure, final Object masterTxActor) {
58 final ProxyTransactionFacade newTransactionFacade;
59 if (failure != null) {
60 LOG.debug("{}: Failed to obtain master actor", id, failure);
61 newTransactionFacade = new FailedProxyTransactionFacade(id, failure);
63 LOG.debug("{}: Obtained master actor {}", id, masterTxActor);
64 newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id,
65 executionContext, askTimeout);
68 executePriorTransactionOperations(newTransactionFacade);
74 public boolean cancel() {
75 if (!opened.compareAndSet(true, false)) {
79 processTransactionOperation(DOMDataTreeWriteTransaction::cancel);
84 public FluentFuture<Optional<NormalizedNode>> read(final LogicalDatastoreType store,
85 final YangInstanceIdentifier path) {
86 LOG.debug("{}: Read {} {}", id, store, path);
88 final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
89 processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path)));
90 return FluentFuture.from(returnFuture);
94 public FluentFuture<Boolean> exists(final LogicalDatastoreType store,
95 final YangInstanceIdentifier path) {
96 LOG.debug("{}: Exists {} {}", id, store, path);
98 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
99 processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path)));
100 return FluentFuture.from(returnFuture);
104 public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
106 LOG.debug("{}: Delete {} {}", id, store, path);
107 processTransactionOperation(facade -> facade.delete(store, path));
111 public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) {
113 LOG.debug("{}: Put {} {}", id, store, path);
114 processTransactionOperation(facade -> facade.put(store, path, data));
118 public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) {
120 LOG.debug("{}: Merge {} {}", id, store, path);
121 processTransactionOperation(facade -> facade.merge(store, path, data));
125 public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
126 Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id);
127 LOG.debug("{}: Commit", id);
129 final SettableFuture<CommitInfo> returnFuture = SettableFuture.create();
130 processTransactionOperation(facade -> returnFuture.setFuture(facade.commit()));
131 return FluentFuture.from(returnFuture);
135 public Object getIdentifier() {
139 private void processTransactionOperation(final Consumer<ProxyTransactionFacade> operation) {
140 final ProxyTransactionFacade facadeOnEntry;
141 synchronized (queuedTxOperations) {
142 if (transactionFacade == null) {
143 LOG.debug("{}: Queuing transaction operation", id);
145 queuedTxOperations.add(operation);
146 facadeOnEntry = null;
148 facadeOnEntry = transactionFacade;
152 if (facadeOnEntry != null) {
153 operation.accept(facadeOnEntry);
157 private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) {
159 // Access to queuedTxOperations and transactionFacade must be protected and atomic
160 // (ie synchronized) with respect to #processTransactionOperation to handle timing
161 // issues and ensure no ProxyTransactionFacade is missed and that they are processed
162 // in the order they occurred.
164 // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
165 // in case a transaction operation results in another transaction operation being
166 // queued (eg a put operation from a client read Future callback that is notified
168 final Collection<Consumer<ProxyTransactionFacade>> operationsBatch;
169 synchronized (queuedTxOperations) {
170 if (queuedTxOperations.isEmpty()) {
171 // We're done invoking the transaction operations so we can now publish the
172 // ProxyTransactionFacade.
173 transactionFacade = newTransactionFacade;
177 operationsBatch = new ArrayList<>(queuedTxOperations);
178 queuedTxOperations.clear();
181 // Invoke transaction operations outside the sync block to avoid unnecessary blocking.
182 for (Consumer<ProxyTransactionFacade> oper : operationsBatch) {
183 oper.accept(newTransactionFacade);
188 private void checkOpen() {
189 Preconditions.checkState(opened.get(), "%s: Transaction is closed", id);