1b94724e0a4d93b20c2e4f9450f8b1e32e005abd
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / ProxyReadWriteTransaction.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
9
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import java.util.ArrayList;
17 import java.util.Collection;
18 import java.util.List;
19 import java.util.Optional;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.function.Consumer;
22 import org.checkerframework.checker.lock.qual.GuardedBy;
23 import org.eclipse.jdt.annotation.NonNull;
24 import org.opendaylight.mdsal.common.api.CommitInfo;
25 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
28 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import scala.concurrent.ExecutionContext;
34 import scala.concurrent.Future;
35
36 /**
37  * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
38  * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
39  */
40 public class ProxyReadWriteTransaction implements DOMDataTreeReadWriteTransaction {
41     private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class);
42
43     private final RemoteDeviceId id;
44     private final AtomicBoolean opened = new AtomicBoolean(true);
45
46     @GuardedBy("queuedTxOperations")
47     private final List<Consumer<ProxyTransactionFacade>> queuedTxOperations = new ArrayList<>();
48
49     private volatile ProxyTransactionFacade transactionFacade;
50
51     public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
52             final ExecutionContext executionContext, final Timeout askTimeout) {
53         this.id = id;
54
55         masterTxActorFuture.onComplete(new OnComplete<Object>() {
56             @Override
57             public void onComplete(final Throwable failure, final Object masterTxActor) {
58                 final ProxyTransactionFacade newTransactionFacade;
59                 if (failure != null) {
60                     LOG.debug("{}: Failed to obtain master actor", id, failure);
61                     newTransactionFacade = new FailedProxyTransactionFacade(id, failure);
62                 } else {
63                     LOG.debug("{}: Obtained master actor {}", id, masterTxActor);
64                     newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id,
65                             executionContext, askTimeout);
66                 }
67
68                 executePriorTransactionOperations(newTransactionFacade);
69             }
70         }, executionContext);
71     }
72
73     @Override
74     public boolean cancel() {
75         if (!opened.compareAndSet(true, false)) {
76             return false;
77         }
78
79         processTransactionOperation(DOMDataTreeWriteTransaction::cancel);
80         return true;
81     }
82
83     @Override
84     public void close() {
85         cancel();
86     }
87
88     @Override
89     public FluentFuture<Optional<NormalizedNode<?, ?>>> read(final LogicalDatastoreType store,
90             final YangInstanceIdentifier path) {
91         LOG.debug("{}: Read {} {}", id, store, path);
92
93         final SettableFuture<Optional<NormalizedNode<?, ?>>> returnFuture = SettableFuture.create();
94         processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path)));
95         return FluentFuture.from(returnFuture);
96     }
97
98     @Override
99     public FluentFuture<Boolean> exists(final LogicalDatastoreType store,
100             final YangInstanceIdentifier path) {
101         LOG.debug("{}: Exists {} {}", id, store, path);
102
103         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
104         processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path)));
105         return FluentFuture.from(returnFuture);
106     }
107
108     @Override
109     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
110         checkOpen();
111         LOG.debug("{}: Delete {} {}", id, store, path);
112         processTransactionOperation(facade -> facade.delete(store, path));
113     }
114
115     @Override
116     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path,
117                     final NormalizedNode<?, ?> data) {
118         checkOpen();
119         LOG.debug("{}: Put {} {}", id, store, path);
120         processTransactionOperation(facade -> facade.put(store, path, data));
121     }
122
123     @Override
124     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path,
125                       final NormalizedNode<?, ?> data) {
126         checkOpen();
127         LOG.debug("{}: Merge {} {}", id, store, path);
128         processTransactionOperation(facade -> facade.merge(store, path, data));
129     }
130
131     @Override
132     public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
133         Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id);
134         LOG.debug("{}: Commit", id);
135
136         final SettableFuture<CommitInfo> returnFuture = SettableFuture.create();
137         processTransactionOperation(facade -> returnFuture.setFuture(facade.commit()));
138         return FluentFuture.from(returnFuture);
139     }
140
141     @Override
142     public Object getIdentifier() {
143         return id;
144     }
145
146     private void processTransactionOperation(final Consumer<ProxyTransactionFacade> operation) {
147         final ProxyTransactionFacade facadeOnEntry;
148         synchronized (queuedTxOperations) {
149             if (transactionFacade == null) {
150                 LOG.debug("{}: Queuing transaction operation", id);
151
152                 queuedTxOperations.add(operation);
153                 facadeOnEntry = null;
154             }  else {
155                 facadeOnEntry = transactionFacade;
156             }
157         }
158
159         if (facadeOnEntry != null) {
160             operation.accept(facadeOnEntry);
161         }
162     }
163
164     private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) {
165         while (true) {
166             // Access to queuedTxOperations and transactionFacade must be protected and atomic
167             // (ie synchronized) with respect to #processTransactionOperation to handle timing
168             // issues and ensure no ProxyTransactionFacade is missed and that they are processed
169             // in the order they occurred.
170
171             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
172             // in case a transaction operation results in another transaction operation being
173             // queued (eg a put operation from a client read Future callback that is notified
174             // synchronously).
175             final Collection<Consumer<ProxyTransactionFacade>> operationsBatch;
176             synchronized (queuedTxOperations) {
177                 if (queuedTxOperations.isEmpty()) {
178                     // We're done invoking the transaction operations so we can now publish the
179                     // ProxyTransactionFacade.
180                     transactionFacade = newTransactionFacade;
181                     break;
182                 }
183
184                 operationsBatch = new ArrayList<>(queuedTxOperations);
185                 queuedTxOperations.clear();
186             }
187
188             // Invoke transaction operations outside the sync block to avoid unnecessary blocking.
189             for (Consumer<ProxyTransactionFacade> oper : operationsBatch) {
190                 oper.accept(newTransactionFacade);
191             }
192         }
193     }
194
195     private void checkOpen() {
196         Preconditions.checkState(opened.get(), "%s: Transaction is closed", id);
197     }
198 }