Merge "Terminate SLAVE task before sending role change"
[openflowplugin.git] / openflowplugin-common / src / main / java / org / opendaylight / openflowplugin / common / txchain / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.common.txchain;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Objects;
18 import java.util.Optional;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23 import javax.annotation.Nonnull;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The openflowplugin-impl.org.opendaylight.openflowplugin.impl.device
43  * package protected class for controlling {@link WriteTransaction} life cycle. It is
44  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
45  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
46  * and submitTransaction method (wrapped {@link WriteTransaction#submit()}).
47  */
48 public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
49
50     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
51     private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
52
53     private final Object txLock = new Object();
54     private final DataBroker dataBroker;
55     private final String nodeId;
56
57     @GuardedBy("txLock")
58     private ReadWriteTransaction writeTx;
59     @GuardedBy("txLock")
60     private BindingTransactionChain transactionChain;
61     @GuardedBy("txLock")
62     private boolean submitIsEnabled;
63     @GuardedBy("txLock")
64     private ListenableFuture<Void> lastSubmittedFuture;
65
66     private volatile boolean initCommit;
67
68     @GuardedBy("txLock")
69     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
70
71     public TransactionChainManager(@Nonnull final DataBroker dataBroker,
72                                    @Nonnull final String deviceIdentifier) {
73         this.dataBroker = dataBroker;
74         this.nodeId = deviceIdentifier;
75         this.lastSubmittedFuture = Futures.immediateFuture(null);
76     }
77
78     @GuardedBy("txLock")
79     private void createTxChain() {
80         BindingTransactionChain txChainFactoryTemp = transactionChain;
81         transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
82         Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
83     }
84
85     public boolean initialSubmitWriteTransaction() {
86         enableSubmit();
87         return submitTransaction();
88     }
89
90     /**
91      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
92      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
93      * transactions. Call this method for MASTER role only.
94      */
95     public void activateTransactionManager() {
96         if (LOG.isDebugEnabled()) {
97             LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
98                     this.nodeId, submitIsEnabled);
99         }
100         synchronized (txLock) {
101             if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
102                 Preconditions.checkState(transactionChain == null,
103                         "TxChainFactory survive last close.");
104                 Preconditions.checkState(writeTx == null,
105                         "We have some unexpected WriteTransaction.");
106                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
107                 this.submitIsEnabled = false;
108                 this.initCommit = true;
109                 createTxChain();
110             }
111         }
112     }
113
114     /**
115      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
116      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
117      * Call this method for SLAVE only.
118      * @return Future
119      */
120     public ListenableFuture<Void> deactivateTransactionManager() {
121         if (LOG.isDebugEnabled()) {
122             LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
123         }
124         final ListenableFuture<Void> future;
125         synchronized (txLock) {
126             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
127                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
128                 future = txChainShuttingDown();
129                 Preconditions.checkState(writeTx == null,
130                         "We have some unexpected WriteTransaction.");
131                 Futures.addCallback(future, new FutureCallback<Void>() {
132                     @Override
133                     public void onSuccess(final Void result) {
134                         closeTransactionChain();
135                     }
136
137                     @Override
138                     public void onFailure(@Nonnull final Throwable t) {
139                         closeTransactionChain();
140                     }
141                 });
142             } else {
143                 // ignoring redundant deactivate invocation
144                 future = Futures.immediateFuture(null);
145             }
146         }
147         return future;
148     }
149
150     private void closeTransactionChain() {
151         if (writeTx != null) {
152             writeTx.cancel();
153             writeTx = null;
154         }
155         Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
156         transactionChain = null;
157     }
158
159     public boolean submitTransaction() {
160         synchronized (txLock) {
161             if (!submitIsEnabled) {
162                 if (LOG.isTraceEnabled()) {
163                     LOG.trace("transaction not committed - submit block issued");
164                 }
165                 return false;
166             }
167             if (Objects.isNull(writeTx)) {
168                 if (LOG.isTraceEnabled()) {
169                     LOG.trace("nothing to commit - submit returns true");
170                 }
171                 return true;
172             }
173             Preconditions.checkState(TransactionChainManagerStatus.WORKING == transactionChainManagerStatus,
174                     "we have here Uncompleted Transaction for node {} and we are not MASTER",
175                     this.nodeId);
176             final ListenableFuture<Void> submitFuture = writeTx.submit();
177             lastSubmittedFuture = submitFuture;
178             writeTx = null;
179
180             if (initCommit) {
181                 try {
182                     submitFuture.get(5L, TimeUnit.SECONDS);
183                 } catch (InterruptedException | ExecutionException | TimeoutException ex) {
184                     LOG.error("Exception during INITIAL transaction submitting. ", ex);
185                     return false;
186                 }
187                 initCommit = false;
188                 return true;
189             }
190
191             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
192                 @Override
193                 public void onSuccess(final Void result) {
194                     //NOOP
195                 }
196
197                 @Override
198                 public void onFailure(final Throwable throwable) {
199                     if (throwable instanceof TransactionCommitFailedException) {
200                         LOG.error("Transaction commit failed. ", throwable);
201                     } else {
202                         if (throwable instanceof CancellationException) {
203                             LOG.warn("Submit task was canceled");
204                             LOG.trace("Submit exception: ", throwable);
205                         } else {
206                             LOG.error("Exception during transaction submitting. ", throwable);
207                         }
208                     }
209                 }
210             }, MoreExecutors.directExecutor());
211         }
212         return true;
213     }
214
215     public <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
216                                                                     final InstanceIdentifier<T> path){
217         synchronized (txLock) {
218             ensureTransaction();
219             if (writeTx == null) {
220                 LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
221                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
222             }
223
224             writeTx.delete(store, path);
225         }
226     }
227
228     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
229                                                           final InstanceIdentifier<T> path,
230                                                           final T data,
231                                                           final boolean createParents){
232         synchronized (txLock) {
233             ensureTransaction();
234             if (writeTx == null) {
235                 LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
236                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
237             }
238
239             writeTx.put(store, path, data, createParents);
240         }
241     }
242
243     public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
244                                                           final InstanceIdentifier<T> path,
245                                                           final T data,
246                                                           final boolean createParents){
247         synchronized (txLock) {
248             ensureTransaction();
249             if (writeTx == null) {
250                 LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
251                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
252             }
253
254             writeTx.merge(store, path, data, createParents);
255         }
256     }
257
258     public <T extends DataObject> CheckedFuture<com.google.common.base.Optional<T>, ReadFailedException>
259     readFromTransaction(final LogicalDatastoreType store,
260                         final InstanceIdentifier<T> path){
261         synchronized (txLock) {
262             ensureTransaction();
263             if (writeTx == null) {
264                 LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
265                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
266             }
267
268             return writeTx.read(store, path);
269         }
270     }
271
272     @Override
273     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
274                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
275         synchronized (txLock) {
276             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus &&
277                     chain.equals(this.transactionChain)) {
278                 LOG.warn("Transaction chain failed, recreating chain due to ", cause);
279                 closeTransactionChain();
280                 createTxChain();
281                 writeTx = null;
282             }
283         }
284     }
285
286     @Override
287     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
288         // NOOP
289     }
290
291     @GuardedBy("txLock")
292    private void ensureTransaction() {
293         if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
294             && transactionChain != null) {
295                 writeTx = transactionChain.newReadWriteTransaction();
296         }
297     }
298
299     private void enableSubmit() {
300         synchronized (txLock) {
301             /* !!!IMPORTANT: never set true without transactionChain */
302             submitIsEnabled = transactionChain != null;
303         }
304     }
305
306     public ListenableFuture<Void> shuttingDown() {
307         if (LOG.isDebugEnabled()) {
308             LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
309         }
310         synchronized (txLock) {
311             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
312             return txChainShuttingDown();
313         }
314     }
315
316     @GuardedBy("txLock")
317     private ListenableFuture<Void> txChainShuttingDown() {
318         boolean wasSubmitEnabled = submitIsEnabled;
319         submitIsEnabled = false;
320         ListenableFuture<Void> future;
321
322         if (!wasSubmitEnabled || transactionChain == null) {
323             // stay with actual thread
324             future = Futures.immediateCheckedFuture(null);
325
326             if (writeTx != null) {
327                 writeTx.cancel();
328                 writeTx = null;
329             }
330         } else if (writeTx == null) {
331             // hijack md-sal thread
332             future = lastSubmittedFuture;
333         } else {
334             if (LOG.isDebugEnabled()) {
335                 LOG.debug("Submitting all transactions for Node {}", this.nodeId);
336             }
337             // hijack md-sal thread
338             future = writeTx.submit();
339             writeTx = null;
340         }
341
342         return future;
343     }
344
345     @Override
346     public void close() {
347         if (LOG.isDebugEnabled()) {
348             LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
349         }
350         synchronized (txLock) {
351             closeTransactionChain();
352         }
353     }
354
355     private enum TransactionChainManagerStatus {
356         /**
357          * txChainManager is working - is active (MASTER).
358          */
359         WORKING,
360         /**
361          * txChainManager is sleeping - is not active (SLAVE or default init value).
362          */
363         SLEEPING,
364         /**
365          * txChainManager is trying to be closed - device disconnecting.
366          */
367         SHUTTING_DOWN
368     }
369 }