Switch to MD-SAL APIs
[openflowplugin.git] / openflowplugin-common / src / main / java / org / opendaylight / openflowplugin / common / txchain / TransactionChainManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.common.txchain;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.util.concurrent.FluentFuture;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.ListenableFuture;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import java.util.Objects;
16 import java.util.Optional;
17 import java.util.concurrent.CancellationException;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.TimeoutException;
21 import javax.annotation.Nonnull;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.mdsal.binding.api.DataBroker;
24 import org.opendaylight.mdsal.binding.api.ReadWriteTransaction;
25 import org.opendaylight.mdsal.binding.api.Transaction;
26 import org.opendaylight.mdsal.binding.api.TransactionChain;
27 import org.opendaylight.mdsal.binding.api.TransactionChainClosedException;
28 import org.opendaylight.mdsal.binding.api.TransactionChainListener;
29 import org.opendaylight.mdsal.binding.api.WriteTransaction;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
32 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * The openflowplugin-impl.org.opendaylight.openflowplugin.impl.device
40  * package protected class for controlling {@link WriteTransaction} life cycle. It is
41  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
42  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
43  * and submitTransaction method (wrapped {@link WriteTransaction#commit()}).
44  */
45 public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
46
47     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
48     private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
49
50     private final Object txLock = new Object();
51     private final DataBroker dataBroker;
52     private final String nodeId;
53
54     @GuardedBy("txLock")
55     private ReadWriteTransaction writeTx;
56     @GuardedBy("txLock")
57     private TransactionChain transactionChain;
58     @GuardedBy("txLock")
59     private boolean submitIsEnabled;
60     @GuardedBy("txLock")
61     private FluentFuture<? extends CommitInfo> lastSubmittedFuture;
62
63     private volatile boolean initCommit;
64
65     @GuardedBy("txLock")
66     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
67
68     public TransactionChainManager(@Nonnull final DataBroker dataBroker,
69                                    @Nonnull final String deviceIdentifier) {
70         this.dataBroker = dataBroker;
71         this.nodeId = deviceIdentifier;
72         this.lastSubmittedFuture = CommitInfo.emptyFluentFuture();
73     }
74
75     @GuardedBy("txLock")
76     private void createTxChain() {
77         TransactionChain txChainFactoryTemp = transactionChain;
78         transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
79         Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
80     }
81
82     public boolean initialSubmitWriteTransaction() {
83         enableSubmit();
84         return submitTransaction();
85     }
86
87     /**
88      * Method change status for TxChainManager to WORKING and it has to make
89      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
90      * transactions. Call this method for MASTER role only.
91      */
92     public void activateTransactionManager() {
93         if (LOG.isDebugEnabled()) {
94             LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
95                     this.nodeId, submitIsEnabled);
96         }
97         synchronized (txLock) {
98             if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
99                 Preconditions.checkState(transactionChain == null,
100                         "TxChainFactory survive last close.");
101                 Preconditions.checkState(writeTx == null,
102                         "We have some unexpected WriteTransaction.");
103                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
104                 this.submitIsEnabled = false;
105                 this.initCommit = true;
106                 createTxChain();
107             }
108         }
109     }
110
111     /**
112      * Method change status for TxChainManger to SLEEPING and it unregisters
113      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
114      * Call this method for SLAVE only.
115      * @return Future
116      */
117     public FluentFuture<?> deactivateTransactionManager() {
118         if (LOG.isDebugEnabled()) {
119             LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
120         }
121         final FluentFuture<? extends CommitInfo> future;
122         synchronized (txLock) {
123             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
124                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
125                 future = txChainShuttingDown();
126                 Preconditions.checkState(writeTx == null,
127                         "We have some unexpected WriteTransaction.");
128                 future.addCallback(new FutureCallback<CommitInfo>() {
129                     @Override
130                     public void onSuccess(final CommitInfo result) {
131                         closeTransactionChain();
132                     }
133
134                     @Override
135                     public void onFailure(@Nonnull final Throwable throwable) {
136                         closeTransactionChain();
137                     }
138                 }, MoreExecutors.directExecutor());
139             } else {
140                 // ignoring redundant deactivate invocation
141                 future = CommitInfo.emptyFluentFuture();
142             }
143         }
144         return future;
145     }
146
147     private void closeTransactionChain() {
148         if (writeTx != null) {
149             writeTx.cancel();
150             writeTx = null;
151         }
152         Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
153         transactionChain = null;
154     }
155
156     @GuardedBy("txLock")
157     public boolean submitTransaction() {
158         return submitTransaction(false);
159     }
160
161     @GuardedBy("txLock")
162     public boolean submitTransaction(boolean doSync) {
163         synchronized (txLock) {
164             if (!submitIsEnabled) {
165                 if (LOG.isTraceEnabled()) {
166                     LOG.trace("transaction not committed - submit block issued");
167                 }
168                 return false;
169             }
170             if (Objects.isNull(writeTx)) {
171                 if (LOG.isTraceEnabled()) {
172                     LOG.trace("nothing to commit - submit returns true");
173                 }
174                 return true;
175             }
176             Preconditions.checkState(TransactionChainManagerStatus.WORKING == transactionChainManagerStatus,
177                     "we have here Uncompleted Transaction for node {} and we are not MASTER",
178                     this.nodeId);
179             final FluentFuture<? extends CommitInfo> submitFuture = writeTx.commit();
180             lastSubmittedFuture = submitFuture;
181             writeTx = null;
182
183             if (initCommit || doSync) {
184                 try {
185                     submitFuture.get(5L, TimeUnit.SECONDS);
186                 } catch (InterruptedException | ExecutionException | TimeoutException ex) {
187                     LOG.error("Exception during INITIAL({}) || doSync({}) transaction submitting. ",
188                             initCommit, doSync, ex);
189                     return false;
190                 }
191                 initCommit = false;
192                 return true;
193             }
194
195             submitFuture.addCallback(new FutureCallback<CommitInfo>() {
196                 @Override
197                 public void onSuccess(final CommitInfo result) {
198                     //NOOP
199                 }
200
201                 @Override
202                 public void onFailure(final Throwable throwable) {
203                     if (throwable instanceof TransactionCommitFailedException) {
204                         LOG.error("Transaction commit failed. ", throwable);
205                     } else {
206                         if (throwable instanceof CancellationException) {
207                             LOG.warn("Submit task was canceled");
208                             LOG.trace("Submit exception: ", throwable);
209                         } else {
210                             LOG.error("Exception during transaction submitting. ", throwable);
211                         }
212                     }
213                 }
214             }, MoreExecutors.directExecutor());
215         }
216         return true;
217     }
218
219     public <T extends DataObject> void addDeleteOperationToTxChain(final LogicalDatastoreType store,
220                                                                     final InstanceIdentifier<T> path) {
221         synchronized (txLock) {
222             ensureTransaction();
223             if (writeTx == null) {
224                 LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
225                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
226             }
227
228             writeTx.delete(store, path);
229         }
230     }
231
232     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
233                                                           final InstanceIdentifier<T> path,
234                                                           final T data,
235                                                           final boolean createParents) {
236         synchronized (txLock) {
237             ensureTransaction();
238             if (writeTx == null) {
239                 LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
240                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
241             }
242
243             writeTx.put(store, path, data, createParents);
244         }
245     }
246
247     public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
248                                                           final InstanceIdentifier<T> path,
249                                                           final T data,
250                                                           final boolean createParents) {
251         synchronized (txLock) {
252             ensureTransaction();
253             if (writeTx == null) {
254                 LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
255                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
256             }
257
258             writeTx.merge(store, path, data, createParents);
259         }
260     }
261
262     public <T extends DataObject> ListenableFuture<Optional<T>>
263         readFromTransaction(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
264         synchronized (txLock) {
265             ensureTransaction();
266             if (writeTx == null) {
267                 LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
268                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
269             }
270
271             return writeTx.read(store, path);
272         }
273     }
274
275     @Override
276     public void onTransactionChainFailed(final TransactionChain chain,
277                                          final Transaction transaction, final Throwable cause) {
278         synchronized (txLock) {
279             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
280                     && chain.equals(this.transactionChain)) {
281                 LOG.warn("Transaction chain failed, recreating chain due to ", cause);
282                 closeTransactionChain();
283                 createTxChain();
284                 writeTx = null;
285             }
286         }
287     }
288
289     @Override
290     public void onTransactionChainSuccessful(final TransactionChain chain) {
291         // NOOP
292     }
293
294     @GuardedBy("txLock")
295     private void ensureTransaction() {
296         if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
297                 && transactionChain != null) {
298             writeTx = transactionChain.newReadWriteTransaction();
299         }
300     }
301
302     private void enableSubmit() {
303         synchronized (txLock) {
304             /* !!!IMPORTANT: never set true without transactionChain */
305             submitIsEnabled = transactionChain != null;
306         }
307     }
308
309     public ListenableFuture<?> shuttingDown() {
310         if (LOG.isDebugEnabled()) {
311             LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
312         }
313         synchronized (txLock) {
314             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
315             return txChainShuttingDown();
316         }
317     }
318
319     @GuardedBy("txLock")
320     private FluentFuture<? extends CommitInfo> txChainShuttingDown() {
321         boolean wasSubmitEnabled = submitIsEnabled;
322         submitIsEnabled = false;
323         FluentFuture<? extends CommitInfo> future;
324
325         if (!wasSubmitEnabled || transactionChain == null) {
326             // stay with actual thread
327             future = CommitInfo.emptyFluentFuture();
328
329             if (writeTx != null) {
330                 writeTx.cancel();
331                 writeTx = null;
332             }
333         } else if (writeTx == null) {
334             // hijack md-sal thread
335             future = lastSubmittedFuture;
336         } else {
337             if (LOG.isDebugEnabled()) {
338                 LOG.debug("Submitting all transactions for Node {}", this.nodeId);
339             }
340             // hijack md-sal thread
341             future = writeTx.commit();
342             writeTx = null;
343         }
344
345         return future;
346     }
347
348     @Override
349     public void close() {
350         if (LOG.isDebugEnabled()) {
351             LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
352         }
353         synchronized (txLock) {
354             closeTransactionChain();
355         }
356     }
357
358     private enum TransactionChainManagerStatus {
359         /**
360          * txChainManager is working - is active (MASTER).
361          */
362         WORKING,
363         /**
364          * txChainManager is sleeping - is not active (SLAVE or default init value).
365          */
366         SLEEPING,
367         /**
368          * txChainManager is trying to be closed - device disconnecting.
369          */
370         SHUTTING_DOWN
371     }
372 }