Bug 7011 - Race condition in statistics collection related transaction chain handling
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import java.util.Objects;
18 import java.util.Optional;
19 import java.util.concurrent.CancellationException;
20 import javax.annotation.Nonnull;
21 import javax.annotation.Nullable;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
34 import org.opendaylight.yangtools.yang.binding.DataObject;
35 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * openflowplugin-impl
41  * org.opendaylight.openflowplugin.impl.device
42  * <p/>
43  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
44  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
45  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
46  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
47  */
48 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
49
50     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
51     private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
52
53     private final Object txLock = new Object();
54     private final DataBroker dataBroker;
55     private final String nodeId;
56     private LifecycleService lifecycleService;
57
58     @GuardedBy("txLock")
59     private WriteTransaction wTx;
60     @GuardedBy("txLock")
61     private BindingTransactionChain txChainFactory;
62     @GuardedBy("txLock")
63     private boolean submitIsEnabled;
64     @GuardedBy("txLock")
65     private ListenableFuture<Void> lastSubmittedFuture;
66
67     private boolean initCommit;
68
69     @GuardedBy("txLock")
70     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
71
72     TransactionChainManager(@Nonnull final DataBroker dataBroker,
73                             @Nonnull final DeviceInfo deviceInfo) {
74         this.dataBroker = dataBroker;
75         this.nodeId = deviceInfo.getNodeInstanceIdentifier().getKey().getId().getValue();
76         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
77         this.lastSubmittedFuture = Futures.immediateFuture(null);
78     }
79
80     @GuardedBy("txLock")
81     private void createTxChain() {
82         BindingTransactionChain txChainFactoryTemp = txChainFactory;
83         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
84         Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
85     }
86
87     public void setLifecycleService(final LifecycleService lifecycleService) {
88         this.lifecycleService = lifecycleService;
89     }
90
91     void initialSubmitWriteTransaction() {
92         enableSubmit();
93         submitWriteTransaction();
94     }
95
96     /**
97      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
98      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
99      * transactions. Call this method for MASTER role only.
100      */
101     void activateTransactionManager() {
102         if (LOG.isDebugEnabled()) {
103             LOG.debug("activateTransactionManager for node {} transaction submit is set to {}", this.nodeId, submitIsEnabled);
104         }
105         synchronized (txLock) {
106             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
107                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
108                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
109                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
110                 this.submitIsEnabled = false;
111                 this.initCommit = true;
112                 createTxChain();
113             }
114         }
115     }
116
117     /**
118      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
119      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
120      * Call this method for SLAVE only.
121      * @return Future
122      */
123     ListenableFuture<Void> deactivateTransactionManager() {
124         if (LOG.isDebugEnabled()) {
125             LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
126         }
127         final ListenableFuture<Void> future;
128         synchronized (txLock) {
129             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
130                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
131                 future = txChainShuttingDown();
132                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
133                 Futures.addCallback(future, new FutureCallback<Void>() {
134                     @Override
135                     public void onSuccess(final Void result) {
136                         removeTxChainFactory();
137                     }
138
139                     @Override
140                     public void onFailure(final Throwable t) {
141                         removeTxChainFactory();
142                     }
143                 });
144             } else {
145                 // TODO : ignoring redundant deactivate invocation
146                 future = Futures.immediateCheckedFuture(null);
147             }
148         }
149         return future;
150     }
151
152     private void removeTxChainFactory() {
153         Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
154         txChainFactory = null;
155     }
156
157     boolean submitWriteTransaction() {
158         synchronized (txLock) {
159             if (!submitIsEnabled) {
160                 if (LOG.isTraceEnabled()) {
161                     LOG.trace("transaction not committed - submit block issued");
162                 }
163                 return false;
164             }
165             if (Objects.isNull(wTx)) {
166                 if (LOG.isTraceEnabled()) {
167                     LOG.trace("nothing to commit - submit returns true");
168                 }
169                 return true;
170             }
171             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
172                     "we have here Uncompleted Transaction for node {} and we are not MASTER", this.nodeId);
173             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
174             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
175                 @Override
176                 public void onSuccess(final Void result) {
177                     if (initCommit) {
178                         initCommit = false;
179                     }
180                 }
181
182                 @Override
183                 public void onFailure(final Throwable t) {
184                     if (t instanceof TransactionCommitFailedException) {
185                         LOG.error("Transaction commit failed. ", t);
186                     } else {
187                         if (t instanceof CancellationException) {
188                             LOG.warn("Submit task was canceled");
189                             LOG.trace("Submit exception: ", t);
190                         } else {
191                             LOG.error("Exception during transaction submitting. ", t);
192                         }
193                     }
194                     if (initCommit) {
195                         wTx = null;
196                         Optional.ofNullable(lifecycleService).ifPresent(LifecycleService::closeConnection);
197                     }
198                 }
199             });
200             lastSubmittedFuture = submitFuture;
201             wTx = null;
202         }
203         return true;
204     }
205
206     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
207                                                              final InstanceIdentifier<T> path){
208         final WriteTransaction writeTx = getTransactionSafely();
209         if (Objects.nonNull(writeTx)) {
210             writeTx.delete(store, path);
211         } else {
212             if (LOG.isDebugEnabled()) {
213                 LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
214             }
215             throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
216         }
217     }
218
219     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
220                                                    final InstanceIdentifier<T> path,
221                                                    final T data,
222                                                    final boolean createParents){
223         final WriteTransaction writeTx = getTransactionSafely();
224         if (Objects.nonNull(writeTx)) {
225             writeTx.put(store, path, data, createParents);
226         } else {
227             if (LOG.isDebugEnabled()) {
228                 LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
229             }
230             throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
231         }
232     }
233
234     @Override
235     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
236                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
237         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
238             LOG.warn("Transaction chain failed, recreating chain due to ", cause);
239             recreateTxChain();
240         }
241     }
242
243     @Override
244     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
245         // NOOP
246     }
247
248     private void recreateTxChain() {
249         synchronized (txLock) {
250             createTxChain();
251             wTx = null;
252         }
253     }
254
255     @Nullable
256     private WriteTransaction getTransactionSafely() {
257             synchronized (txLock) {
258                 if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
259                     Optional.ofNullable(txChainFactory).ifPresent(bindingTransactionChain -> wTx = txChainFactory.newWriteOnlyTransaction());
260                 }
261             }
262         return wTx;
263     }
264
265     @VisibleForTesting
266     void enableSubmit() {
267         synchronized (txLock) {
268             /* !!!IMPORTANT: never set true without txChainFactory */
269             submitIsEnabled = txChainFactory != null;
270         }
271     }
272
273     ListenableFuture<Void> shuttingDown() {
274         if (LOG.isDebugEnabled()) {
275             LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
276         }
277         ListenableFuture<Void> future;
278         synchronized (txLock) {
279             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
280             future = txChainShuttingDown();
281         }
282         return future;
283     }
284
285     @GuardedBy("txLock")
286     private ListenableFuture<Void> txChainShuttingDown() {
287         submitIsEnabled = false;
288         ListenableFuture<Void> future;
289         if (txChainFactory == null) {
290             // stay with actual thread
291             future = Futures.immediateCheckedFuture(null);
292         } else if (wTx == null) {
293             // hijack md-sal thread
294             future = lastSubmittedFuture;
295         } else {
296             if (LOG.isDebugEnabled()) {
297                 LOG.debug("Submitting all transactions for Node {}", this.nodeId);
298             }
299             // hijack md-sal thread
300             future = wTx.submit();
301             wTx = null;
302         }
303         return future;
304     }
305
306     @Override
307     public void close() {
308         if (LOG.isDebugEnabled()) {
309             LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
310         }
311         synchronized (txLock) {
312             removeTxChainFactory();
313         }
314     }
315
316     private enum TransactionChainManagerStatus {
317         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
318         WORKING,
319         /** txChainManager is working - is active (MASTER) */
320         SLEEPING,
321         /** txChainManager is trying to be closed - device disconnecting */
322         SHUTTING_DOWN;
323     }
324 }