Bump MRI upstreams
[netconf.git] / netconf / netconf-topology-singleton / src / main / java / org / opendaylight / netconf / topology / singleton / impl / tx / ProxyReadWriteTransaction.java
1 /*
2  * Copyright (c) 2017 Pantheon Technologies s.r.o. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.netconf.topology.singleton.impl.tx;
9
10 import akka.actor.ActorRef;
11 import akka.dispatch.OnComplete;
12 import akka.util.Timeout;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.SettableFuture;
16 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.List;
20 import java.util.Optional;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.function.Consumer;
23 import org.checkerframework.checker.lock.qual.GuardedBy;
24 import org.eclipse.jdt.annotation.NonNull;
25 import org.opendaylight.mdsal.common.api.CommitInfo;
26 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
29 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
30 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import scala.concurrent.ExecutionContext;
35 import scala.concurrent.Future;
36
37 /**
38  * ProxyReadWriteTransaction uses provided {@link ActorRef} to delegate method calls to master
39  * {@link org.opendaylight.netconf.topology.singleton.impl.actors.ReadWriteTransactionActor}.
40  */
41 public class ProxyReadWriteTransaction implements DOMDataTreeReadWriteTransaction {
42     private static final Logger LOG = LoggerFactory.getLogger(ProxyReadWriteTransaction.class);
43
44     private final RemoteDeviceId id;
45     private final AtomicBoolean opened = new AtomicBoolean(true);
46
47     @GuardedBy("queuedTxOperations")
48     private final List<Consumer<ProxyTransactionFacade>> queuedTxOperations = new ArrayList<>();
49
50     private volatile ProxyTransactionFacade transactionFacade;
51
52     public ProxyReadWriteTransaction(final RemoteDeviceId id, final Future<Object> masterTxActorFuture,
53             final ExecutionContext executionContext, final Timeout askTimeout) {
54         this.id = id;
55
56         masterTxActorFuture.onComplete(new OnComplete<>() {
57             @Override
58             public void onComplete(final Throwable failure, final Object masterTxActor) {
59                 final ProxyTransactionFacade newTransactionFacade;
60                 if (failure != null) {
61                     LOG.debug("{}: Failed to obtain master actor", id, failure);
62                     newTransactionFacade = new FailedProxyTransactionFacade(id, failure);
63                 } else {
64                     LOG.debug("{}: Obtained master actor {}", id, masterTxActor);
65                     newTransactionFacade = new ActorProxyTransactionFacade((ActorRef)masterTxActor, id,
66                             executionContext, askTimeout);
67                 }
68
69                 executePriorTransactionOperations(newTransactionFacade);
70             }
71         }, executionContext);
72     }
73
74     @Override
75     public boolean cancel() {
76         if (!opened.compareAndSet(true, false)) {
77             return false;
78         }
79
80         processTransactionOperation(DOMDataTreeWriteTransaction::cancel);
81         return true;
82     }
83
84     @Override
85     public FluentFuture<Optional<NormalizedNode>> read(final LogicalDatastoreType store,
86             final YangInstanceIdentifier path) {
87         LOG.debug("{}: Read {} {}", id, store, path);
88
89         final SettableFuture<Optional<NormalizedNode>> returnFuture = SettableFuture.create();
90         processTransactionOperation(facade -> returnFuture.setFuture(facade.read(store, path)));
91         return FluentFuture.from(returnFuture);
92     }
93
94     @Override
95     public FluentFuture<Boolean> exists(final LogicalDatastoreType store,
96             final YangInstanceIdentifier path) {
97         LOG.debug("{}: Exists {} {}", id, store, path);
98
99         final SettableFuture<Boolean> returnFuture = SettableFuture.create();
100         processTransactionOperation(facade -> returnFuture.setFuture(facade.exists(store, path)));
101         return FluentFuture.from(returnFuture);
102     }
103
104     @Override
105     public void delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
106         checkOpen();
107         LOG.debug("{}: Delete {} {}", id, store, path);
108         processTransactionOperation(facade -> facade.delete(store, path));
109     }
110
111     @Override
112     public void put(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) {
113         checkOpen();
114         LOG.debug("{}: Put {} {}", id, store, path);
115         processTransactionOperation(facade -> facade.put(store, path, data));
116     }
117
118     @Override
119     public void merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data) {
120         checkOpen();
121         LOG.debug("{}: Merge {} {}", id, store, path);
122         processTransactionOperation(facade -> facade.merge(store, path, data));
123     }
124
125     @Override
126     public @NonNull FluentFuture<? extends @NonNull CommitInfo> commit() {
127         Preconditions.checkState(opened.compareAndSet(true, false), "%s: Transaction is already closed", id);
128         LOG.debug("{}: Commit", id);
129
130         final SettableFuture<CommitInfo> returnFuture = SettableFuture.create();
131         processTransactionOperation(facade -> returnFuture.setFuture(facade.commit()));
132         return FluentFuture.from(returnFuture);
133     }
134
135     @Override
136     public Object getIdentifier() {
137         return id;
138     }
139
140     private void processTransactionOperation(final Consumer<ProxyTransactionFacade> operation) {
141         final ProxyTransactionFacade facadeOnEntry;
142         synchronized (queuedTxOperations) {
143             if (transactionFacade == null) {
144                 LOG.debug("{}: Queuing transaction operation", id);
145
146                 queuedTxOperations.add(operation);
147                 facadeOnEntry = null;
148             }  else {
149                 facadeOnEntry = transactionFacade;
150             }
151         }
152
153         if (facadeOnEntry != null) {
154             operation.accept(facadeOnEntry);
155         }
156     }
157
158     @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
159             justification = "https://github.com/spotbugs/spotbugs/issues/811")
160     private void executePriorTransactionOperations(final ProxyTransactionFacade newTransactionFacade) {
161         while (true) {
162             // Access to queuedTxOperations and transactionFacade must be protected and atomic
163             // (ie synchronized) with respect to #processTransactionOperation to handle timing
164             // issues and ensure no ProxyTransactionFacade is missed and that they are processed
165             // in the order they occurred.
166
167             // We'll make a local copy of the queuedTxOperations list to handle re-entrancy
168             // in case a transaction operation results in another transaction operation being
169             // queued (eg a put operation from a client read Future callback that is notified
170             // synchronously).
171             final Collection<Consumer<ProxyTransactionFacade>> operationsBatch;
172             synchronized (queuedTxOperations) {
173                 if (queuedTxOperations.isEmpty()) {
174                     // We're done invoking the transaction operations so we can now publish the
175                     // ProxyTransactionFacade.
176                     transactionFacade = newTransactionFacade;
177                     break;
178                 }
179
180                 operationsBatch = new ArrayList<>(queuedTxOperations);
181                 queuedTxOperations.clear();
182             }
183
184             // Invoke transaction operations outside the sync block to avoid unnecessary blocking.
185             for (Consumer<ProxyTransactionFacade> oper : operationsBatch) {
186                 oper.accept(newTransactionFacade);
187             }
188         }
189     }
190
191     private void checkOpen() {
192         Preconditions.checkState(opened.get(), "%s: Transaction is closed", id);
193     }
194 }