32be30f054d768b77da5dce4fd779aff796fd9ae
[openflowplugin.git] / openflowplugin-common / src / main / java / org / opendaylight / openflowplugin / common / txchain / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.common.txchain;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.MoreExecutors;
16 import java.util.Objects;
17 import java.util.Optional;
18 import java.util.concurrent.CancellationException;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.TimeoutException;
22 import javax.annotation.Nonnull;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
34 import org.opendaylight.yangtools.yang.binding.DataObject;
35 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * The openflowplugin-impl.org.opendaylight.openflowplugin.impl.device
41  * package protected class for controlling {@link WriteTransaction} life cycle. It is
42  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
43  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
44  * and submitTransaction method (wrapped {@link WriteTransaction#submit()}).
45  */
46 public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
47
48     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
49     private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
50
51     private final Object txLock = new Object();
52     private final DataBroker dataBroker;
53     private final String nodeId;
54
55     @GuardedBy("txLock")
56     private ReadWriteTransaction writeTx;
57     @GuardedBy("txLock")
58     private BindingTransactionChain transactionChain;
59     @GuardedBy("txLock")
60     private boolean submitIsEnabled;
61     @GuardedBy("txLock")
62     private ListenableFuture<Void> lastSubmittedFuture;
63
64     private volatile boolean initCommit;
65
66     @GuardedBy("txLock")
67     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
68
69     public TransactionChainManager(@Nonnull final DataBroker dataBroker,
70                                    @Nonnull final String deviceIdentifier) {
71         this.dataBroker = dataBroker;
72         this.nodeId = deviceIdentifier;
73         this.lastSubmittedFuture = Futures.immediateFuture(null);
74     }
75
76     @GuardedBy("txLock")
77     private void createTxChain() {
78         BindingTransactionChain txChainFactoryTemp = transactionChain;
79         transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
80         Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
81     }
82
83     public boolean initialSubmitWriteTransaction() {
84         enableSubmit();
85         return submitTransaction();
86     }
87
88     /**
89      * Method change status for TxChainManager to WORKING and it has to make
90      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
91      * transactions. Call this method for MASTER role only.
92      */
93     public void activateTransactionManager() {
94         if (LOG.isDebugEnabled()) {
95             LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
96                     this.nodeId, submitIsEnabled);
97         }
98         synchronized (txLock) {
99             if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
100                 Preconditions.checkState(transactionChain == null,
101                         "TxChainFactory survive last close.");
102                 Preconditions.checkState(writeTx == null,
103                         "We have some unexpected WriteTransaction.");
104                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
105                 this.submitIsEnabled = false;
106                 this.initCommit = true;
107                 createTxChain();
108             }
109         }
110     }
111
112     /**
113      * Method change status for TxChainManger to SLEEPING and it unregisters
114      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
115      * Call this method for SLAVE only.
116      * @return Future
117      */
118     public ListenableFuture<Void> deactivateTransactionManager() {
119         if (LOG.isDebugEnabled()) {
120             LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
121         }
122         final ListenableFuture<Void> future;
123         synchronized (txLock) {
124             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
125                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
126                 future = txChainShuttingDown();
127                 Preconditions.checkState(writeTx == null,
128                         "We have some unexpected WriteTransaction.");
129                 Futures.addCallback(future, new FutureCallback<Void>() {
130                     @Override
131                     public void onSuccess(final Void result) {
132                         closeTransactionChain();
133                     }
134
135                     @Override
136                     public void onFailure(@Nonnull final Throwable throwable) {
137                         closeTransactionChain();
138                     }
139                 }, MoreExecutors.directExecutor());
140             } else {
141                 // ignoring redundant deactivate invocation
142                 future = Futures.immediateFuture(null);
143             }
144         }
145         return future;
146     }
147
148     private void closeTransactionChain() {
149         if (writeTx != null) {
150             writeTx.cancel();
151             writeTx = null;
152         }
153         Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
154         transactionChain = null;
155     }
156
157     @GuardedBy("txLock")
158     public boolean submitTransaction() {
159         return submitTransaction(false);
160     }
161
162     @GuardedBy("txLock")
163     public boolean submitTransaction(boolean doSync) {
164         synchronized (txLock) {
165             if (!submitIsEnabled) {
166                 if (LOG.isTraceEnabled()) {
167                     LOG.trace("transaction not committed - submit block issued");
168                 }
169                 return false;
170             }
171             if (Objects.isNull(writeTx)) {
172                 if (LOG.isTraceEnabled()) {
173                     LOG.trace("nothing to commit - submit returns true");
174                 }
175                 return true;
176             }
177             Preconditions.checkState(TransactionChainManagerStatus.WORKING == transactionChainManagerStatus,
178                     "we have here Uncompleted Transaction for node {} and we are not MASTER",
179                     this.nodeId);
180             final ListenableFuture<Void> submitFuture = writeTx.submit();
181             lastSubmittedFuture = submitFuture;
182             writeTx = null;
183
184             if (initCommit || doSync) {
185                 try {
186                     submitFuture.get(5L, TimeUnit.SECONDS);
187                 } catch (InterruptedException | ExecutionException | TimeoutException ex) {
188                     LOG.error("Exception during INITIAL({}) || doSync({}) transaction submitting. ",
189                             initCommit, doSync, ex);
190                     return false;
191                 }
192                 initCommit = false;
193                 return true;
194             }
195
196             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
197                 @Override
198                 public void onSuccess(final Void result) {
199                     //NOOP
200                 }
201
202                 @Override
203                 public void onFailure(final Throwable throwable) {
204                     if (throwable instanceof TransactionCommitFailedException) {
205                         LOG.error("Transaction commit failed. ", throwable);
206                     } else {
207                         if (throwable instanceof CancellationException) {
208                             LOG.warn("Submit task was canceled");
209                             LOG.trace("Submit exception: ", throwable);
210                         } else {
211                             LOG.error("Exception during transaction submitting. ", throwable);
212                         }
213                     }
214                 }
215             }, MoreExecutors.directExecutor());
216         }
217         return true;
218     }
219
220     public <T extends DataObject> void addDeleteOperationToTxChain(final LogicalDatastoreType store,
221                                                                     final InstanceIdentifier<T> path) {
222         synchronized (txLock) {
223             ensureTransaction();
224             if (writeTx == null) {
225                 LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
226                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
227             }
228
229             writeTx.delete(store, path);
230         }
231     }
232
233     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
234                                                           final InstanceIdentifier<T> path,
235                                                           final T data,
236                                                           final boolean createParents) {
237         synchronized (txLock) {
238             ensureTransaction();
239             if (writeTx == null) {
240                 LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
241                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
242             }
243
244             writeTx.put(store, path, data, createParents);
245         }
246     }
247
248     public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
249                                                           final InstanceIdentifier<T> path,
250                                                           final T data,
251                                                           final boolean createParents) {
252         synchronized (txLock) {
253             ensureTransaction();
254             if (writeTx == null) {
255                 LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
256                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
257             }
258
259             writeTx.merge(store, path, data, createParents);
260         }
261     }
262
263     public <T extends DataObject> ListenableFuture<com.google.common.base.Optional<T>>
264         readFromTransaction(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
265         synchronized (txLock) {
266             ensureTransaction();
267             if (writeTx == null) {
268                 LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
269                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
270             }
271
272             return writeTx.read(store, path);
273         }
274     }
275
276     @Override
277     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
278                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
279         synchronized (txLock) {
280             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
281                     && chain.equals(this.transactionChain)) {
282                 LOG.warn("Transaction chain failed, recreating chain due to ", cause);
283                 closeTransactionChain();
284                 createTxChain();
285                 writeTx = null;
286             }
287         }
288     }
289
290     @Override
291     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
292         // NOOP
293     }
294
295     @GuardedBy("txLock")
296     private void ensureTransaction() {
297         if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
298                 && transactionChain != null) {
299             writeTx = transactionChain.newReadWriteTransaction();
300         }
301     }
302
303     private void enableSubmit() {
304         synchronized (txLock) {
305             /* !!!IMPORTANT: never set true without transactionChain */
306             submitIsEnabled = transactionChain != null;
307         }
308     }
309
310     public ListenableFuture<Void> shuttingDown() {
311         if (LOG.isDebugEnabled()) {
312             LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
313         }
314         synchronized (txLock) {
315             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
316             return txChainShuttingDown();
317         }
318     }
319
320     @GuardedBy("txLock")
321     private ListenableFuture<Void> txChainShuttingDown() {
322         boolean wasSubmitEnabled = submitIsEnabled;
323         submitIsEnabled = false;
324         ListenableFuture<Void> future;
325
326         if (!wasSubmitEnabled || transactionChain == null) {
327             // stay with actual thread
328             future = Futures.immediateFuture(null);
329
330             if (writeTx != null) {
331                 writeTx.cancel();
332                 writeTx = null;
333             }
334         } else if (writeTx == null) {
335             // hijack md-sal thread
336             future = lastSubmittedFuture;
337         } else {
338             if (LOG.isDebugEnabled()) {
339                 LOG.debug("Submitting all transactions for Node {}", this.nodeId);
340             }
341             // hijack md-sal thread
342             future = writeTx.submit();
343             writeTx = null;
344         }
345
346         return future;
347     }
348
349     @Override
350     public void close() {
351         if (LOG.isDebugEnabled()) {
352             LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
353         }
354         synchronized (txLock) {
355             closeTransactionChain();
356         }
357     }
358
359     private enum TransactionChainManagerStatus {
360         /**
361          * txChainManager is working - is active (MASTER).
362          */
363         WORKING,
364         /**
365          * txChainManager is sleeping - is not active (SLAVE or default init value).
366          */
367         SLEEPING,
368         /**
369          * txChainManager is trying to be closed - device disconnecting.
370          */
371         SHUTTING_DOWN
372     }
373 }