2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.netconf.topology.singleton.impl.tx;
11 import akka.actor.ActorRef;
12 import akka.dispatch.OnComplete;
13 import akka.util.Timeout;
14 import com.google.common.base.Optional;
15 import com.google.common.base.Preconditions;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FluentFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.concurrent.atomic.AtomicBoolean;
23 import java.util.function.Consumer;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.eclipse.jdt.annotation.NonNull;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncWriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
29 import org.opendaylight.controller.md.sal.dom.api.DOMDataReadWriteTransaction;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.MappingCheckedFuture;
32 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import scala.concurrent.ExecutionContext;
38 import scala.concurrent.Future;
41 * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
42 * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
44 public class ProxyReadWriteTransaction implements DOMDataReadWriteTransaction {
45 private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class);
47 private final RemoteDeviceId id;
48 private final AtomicBoolean opened = new AtomicBoolean(true);
50 @GuardedBy("queuedTxOperations")
51 private final List<Consumer<ProxyTransactionFacade>> queuedTxOperations = new ArrayList<>();
53 private volatile ProxyTransactionFacade transactionFacade;
55 public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
56 final ExecutionContext executionContext, final Timeout askTimeout) {
59 masterTxActorFuture.onComplete(new OnComplete<Object>() {
61 public void onComplete(final Throwable failure, final Object masterTxActor) {
62 final ProxyTransactionFacade newTransactionFacade;
63 if (failure != null) {
64 LOG.debug("{}: Failed to obtain master actor", id, failure);
65 newTransactionFacade = new FailedProxyTransactionFacade(id, failure);
67 LOG.debug("{}: Obtained master actor {}", id, masterTxActor);
68 newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id,
69 executionContext, askTimeout);
72 executePriorTransactionOperations(newTransactionFacade);
78 public boolean cancel() {
79 if (!opened.compareAndSet(true, false)) {
83 processTransactionOperation(AsyncWriteTransaction::cancel);
88 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final LogicalDatastoreType store,
89 final YangInstanceIdentifier path) {
90 LOG.debug("{}: Read {} {}", id, store, path);
92 final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
93 processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path)));
94 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
98 public CheckedFuture<Boolean, ReadFailedException> exists(final LogicalDatastoreType store,
99 final YangInstanceIdentifier path) {
100 LOG.debug("{}: Exists {} {}", id, store, path);
102 final SettableFuture<Boolean> returnFuture = SettableFuture.create();
103 processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path)));
104 return MappingCheckedFuture.create(returnFuture, ReadFailedException.MAPPER);
108 public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
110 LOG.debug("{}: Delete {} {}", id, store, path);
111 processTransactionOperation(facade -> facade.delete(store, path));
115 public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
116 final NormalizedNode<?, ?> data) {
118 LOG.debug("{}: Put {} {}", id, store, path);
119 processTransactionOperation(facade -> facade.put(store, path, data));
123 public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
124 final NormalizedNode<?, ?> data) {
126 LOG.debug("{}: Merge {} {}", id, store, path);
127 processTransactionOperation(facade -> facade.merge(store, path, data));
131 public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
132 Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id);
133 LOG.debug("{}: Commit", id);
135 final SettableFuture<CommitInfo> returnFuture = SettableFuture.create();
136 processTransactionOperation(facade -> returnFuture.setFuture(facade.commit()));
141 public Object getIdentifier() {
145 private void processTransactionOperation(final Consumer<ProxyTransactionFacade> operation) {
146 final ProxyTransactionFacade facadeOnEntry;
147 synchronized (queuedTxOperations) {
148 if (transactionFacade == null) {
149 LOG.debug("{}: Queuing transaction operation", id);
151 queuedTxOperations.add(operation);
152 facadeOnEntry = null;
154 facadeOnEntry = transactionFacade;
158 if (facadeOnEntry != null) {
159 operation.accept(facadeOnEntry);
163 private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) {
165 // Access to queuedTxOperations and transactionFacade must be protected and atomic
166 // (ie synchronized) with respect to #processTransactionOperation to handle timing
167 // issues and ensure no ProxyTransactionFacade is missed and that they are processed
168 // in the order they occurred.
170 // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
171 // in case a transaction operation results in another transaction operation being
172 // queued (eg a put operation from a client read Future callback that is notified
174 final Collection<Consumer<ProxyTransactionFacade>> operationsBatch;
175 synchronized (queuedTxOperations) {
176 if (queuedTxOperations.isEmpty()) {
177 // We're done invoking the transaction operations so we can now publish the
178 // ProxyTransactionFacade.
179 transactionFacade = newTransactionFacade;
183 operationsBatch = new ArrayList<>(queuedTxOperations);
184 queuedTxOperations.clear();
187 // Invoke transaction operations outside the sync block to avoid unnecessary blocking.
188 for (Consumer<ProxyTransactionFacade> oper : operationsBatch) {
189 oper.accept(newTransactionFacade);
194 private void checkOpen() {
195 Preconditions.checkState(opened.get(), "%s: Transaction is closed", id);