Rename addDeleteOperationTotTxChain => addDeleteOperationToTxChain
[openflowplugin.git] / openflowplugin-common / src / main / java / org / opendaylight / openflowplugin / common / txchain / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.common.txchain;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Objects;
18 import java.util.Optional;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23 import javax.annotation.Nonnull;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The openflowplugin-impl.org.opendaylight.openflowplugin.impl.device
43  * package protected class for controlling {@link WriteTransaction} life cycle. It is
44  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
45  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
46  * and submitTransaction method (wrapped {@link WriteTransaction#submit()}).
47  */
48 public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
49
50     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
51     private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
52
53     private final Object txLock = new Object();
54     private final DataBroker dataBroker;
55     private final String nodeId;
56
57     @GuardedBy("txLock")
58     private ReadWriteTransaction writeTx;
59     @GuardedBy("txLock")
60     private BindingTransactionChain transactionChain;
61     @GuardedBy("txLock")
62     private boolean submitIsEnabled;
63     @GuardedBy("txLock")
64     private ListenableFuture<Void> lastSubmittedFuture;
65
66     private volatile boolean initCommit;
67
68     @GuardedBy("txLock")
69     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
70
71     public TransactionChainManager(@Nonnull final DataBroker dataBroker,
72                                    @Nonnull final String deviceIdentifier) {
73         this.dataBroker = dataBroker;
74         this.nodeId = deviceIdentifier;
75         this.lastSubmittedFuture = Futures.immediateFuture(null);
76     }
77
78     @GuardedBy("txLock")
79     private void createTxChain() {
80         BindingTransactionChain txChainFactoryTemp = transactionChain;
81         transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
82         Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
83     }
84
85     public boolean initialSubmitWriteTransaction() {
86         enableSubmit();
87         return submitTransaction();
88     }
89
90     /**
91      * Method change status for TxChainManager to WORKING and it has to make
92      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
93      * transactions. Call this method for MASTER role only.
94      */
95     public void activateTransactionManager() {
96         if (LOG.isDebugEnabled()) {
97             LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
98                     this.nodeId, submitIsEnabled);
99         }
100         synchronized (txLock) {
101             if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
102                 Preconditions.checkState(transactionChain == null,
103                         "TxChainFactory survive last close.");
104                 Preconditions.checkState(writeTx == null,
105                         "We have some unexpected WriteTransaction.");
106                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
107                 this.submitIsEnabled = false;
108                 this.initCommit = true;
109                 createTxChain();
110             }
111         }
112     }
113
114     /**
115      * Method change status for TxChainManger to SLEEPING and it unregisters
116      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
117      * Call this method for SLAVE only.
118      * @return Future
119      */
120     public ListenableFuture<Void> deactivateTransactionManager() {
121         if (LOG.isDebugEnabled()) {
122             LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
123         }
124         final ListenableFuture<Void> future;
125         synchronized (txLock) {
126             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
127                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
128                 future = txChainShuttingDown();
129                 Preconditions.checkState(writeTx == null,
130                         "We have some unexpected WriteTransaction.");
131                 Futures.addCallback(future, new FutureCallback<Void>() {
132                     @Override
133                     public void onSuccess(final Void result) {
134                         closeTransactionChain();
135                     }
136
137                     @Override
138                     public void onFailure(@Nonnull final Throwable throwable) {
139                         closeTransactionChain();
140                     }
141                 });
142             } else {
143                 // ignoring redundant deactivate invocation
144                 future = Futures.immediateFuture(null);
145             }
146         }
147         return future;
148     }
149
150     private void closeTransactionChain() {
151         if (writeTx != null) {
152             writeTx.cancel();
153             writeTx = null;
154         }
155         Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
156         transactionChain = null;
157     }
158
159     public boolean submitTransaction() {
160         synchronized (txLock) {
161             if (!submitIsEnabled) {
162                 if (LOG.isTraceEnabled()) {
163                     LOG.trace("transaction not committed - submit block issued");
164                 }
165                 return false;
166             }
167             if (Objects.isNull(writeTx)) {
168                 if (LOG.isTraceEnabled()) {
169                     LOG.trace("nothing to commit - submit returns true");
170                 }
171                 return true;
172             }
173             Preconditions.checkState(TransactionChainManagerStatus.WORKING == transactionChainManagerStatus,
174                     "we have here Uncompleted Transaction for node {} and we are not MASTER",
175                     this.nodeId);
176             final ListenableFuture<Void> submitFuture = writeTx.submit();
177             lastSubmittedFuture = submitFuture;
178             writeTx = null;
179
180             if (initCommit) {
181                 try {
182                     submitFuture.get(5L, TimeUnit.SECONDS);
183                 } catch (InterruptedException | ExecutionException | TimeoutException ex) {
184                     LOG.error("Exception during INITIAL transaction submitting. ", ex);
185                     return false;
186                 }
187                 initCommit = false;
188                 return true;
189             }
190
191             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
192                 @Override
193                 public void onSuccess(final Void result) {
194                     //NOOP
195                 }
196
197                 @Override
198                 public void onFailure(final Throwable throwable) {
199                     if (throwable instanceof TransactionCommitFailedException) {
200                         LOG.error("Transaction commit failed. ", throwable);
201                     } else {
202                         if (throwable instanceof CancellationException) {
203                             LOG.warn("Submit task was canceled");
204                             LOG.trace("Submit exception: ", throwable);
205                         } else {
206                             LOG.error("Exception during transaction submitting. ", throwable);
207                         }
208                     }
209                 }
210             }, MoreExecutors.directExecutor());
211         }
212         return true;
213     }
214
215     public <T extends DataObject> void addDeleteOperationToTxChain(final LogicalDatastoreType store,
216                                                                     final InstanceIdentifier<T> path) {
217         synchronized (txLock) {
218             ensureTransaction();
219             if (writeTx == null) {
220                 LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
221                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
222             }
223
224             writeTx.delete(store, path);
225         }
226     }
227
228     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
229                                                           final InstanceIdentifier<T> path,
230                                                           final T data,
231                                                           final boolean createParents) {
232         synchronized (txLock) {
233             ensureTransaction();
234             if (writeTx == null) {
235                 LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
236                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
237             }
238
239             writeTx.put(store, path, data, createParents);
240         }
241     }
242
243     public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
244                                                           final InstanceIdentifier<T> path,
245                                                           final T data,
246                                                           final boolean createParents) {
247         synchronized (txLock) {
248             ensureTransaction();
249             if (writeTx == null) {
250                 LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
251                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
252             }
253
254             writeTx.merge(store, path, data, createParents);
255         }
256     }
257
258     public <T extends DataObject> CheckedFuture<com.google.common.base.Optional<T>, ReadFailedException>
259         readFromTransaction(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
260         synchronized (txLock) {
261             ensureTransaction();
262             if (writeTx == null) {
263                 LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
264                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
265             }
266
267             return writeTx.read(store, path);
268         }
269     }
270
271     @Override
272     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
273                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
274         synchronized (txLock) {
275             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
276                     && chain.equals(this.transactionChain)) {
277                 LOG.warn("Transaction chain failed, recreating chain due to ", cause);
278                 closeTransactionChain();
279                 createTxChain();
280                 writeTx = null;
281             }
282         }
283     }
284
285     @Override
286     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
287         // NOOP
288     }
289
290     @GuardedBy("txLock")
291     private void ensureTransaction() {
292         if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
293                 && transactionChain != null) {
294             writeTx = transactionChain.newReadWriteTransaction();
295         }
296     }
297
298     private void enableSubmit() {
299         synchronized (txLock) {
300             /* !!!IMPORTANT: never set true without transactionChain */
301             submitIsEnabled = transactionChain != null;
302         }
303     }
304
305     public ListenableFuture<Void> shuttingDown() {
306         if (LOG.isDebugEnabled()) {
307             LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
308         }
309         synchronized (txLock) {
310             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
311             return txChainShuttingDown();
312         }
313     }
314
315     @GuardedBy("txLock")
316     private ListenableFuture<Void> txChainShuttingDown() {
317         boolean wasSubmitEnabled = submitIsEnabled;
318         submitIsEnabled = false;
319         ListenableFuture<Void> future;
320
321         if (!wasSubmitEnabled || transactionChain == null) {
322             // stay with actual thread
323             future = Futures.immediateCheckedFuture(null);
324
325             if (writeTx != null) {
326                 writeTx.cancel();
327                 writeTx = null;
328             }
329         } else if (writeTx == null) {
330             // hijack md-sal thread
331             future = lastSubmittedFuture;
332         } else {
333             if (LOG.isDebugEnabled()) {
334                 LOG.debug("Submitting all transactions for Node {}", this.nodeId);
335             }
336             // hijack md-sal thread
337             future = writeTx.submit();
338             writeTx = null;
339         }
340
341         return future;
342     }
343
344     @Override
345     public void close() {
346         if (LOG.isDebugEnabled()) {
347             LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
348         }
349         synchronized (txLock) {
350             closeTransactionChain();
351         }
352     }
353
354     private enum TransactionChainManagerStatus {
355         /**
356          * txChainManager is working - is active (MASTER).
357          */
358         WORKING,
359         /**
360          * txChainManager is sleeping - is not active (SLAVE or default init value).
361          */
362         SLEEPING,
363         /**
364          * txChainManager is trying to be closed - device disconnecting.
365          */
366         SHUTTING_DOWN
367     }
368 }