Merge "Bundle Reconciliation: Close bundle on switch before reconciliation"
[openflowplugin.git] / openflowplugin-common / src / main / java / org / opendaylight / openflowplugin / common / txchain / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.common.txchain;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.CheckedFuture;
13 import com.google.common.util.concurrent.FutureCallback;
14 import com.google.common.util.concurrent.Futures;
15 import com.google.common.util.concurrent.ListenableFuture;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.Objects;
18 import java.util.Optional;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23 import javax.annotation.Nonnull;
24 import javax.annotation.concurrent.GuardedBy;
25 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
26 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
27 import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
28 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
30 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
31 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
34 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
35 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
36 import org.opendaylight.yangtools.yang.binding.DataObject;
37 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 /**
42  * The openflowplugin-impl.org.opendaylight.openflowplugin.impl.device
43  * package protected class for controlling {@link WriteTransaction} life cycle. It is
44  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
45  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
46  * and submitTransaction method (wrapped {@link WriteTransaction#submit()}).
47  */
48 public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
49
50     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
51     private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
52
53     private final Object txLock = new Object();
54     private final DataBroker dataBroker;
55     private final String nodeId;
56
57     @GuardedBy("txLock")
58     private ReadWriteTransaction writeTx;
59     @GuardedBy("txLock")
60     private BindingTransactionChain transactionChain;
61     @GuardedBy("txLock")
62     private boolean submitIsEnabled;
63     @GuardedBy("txLock")
64     private ListenableFuture<Void> lastSubmittedFuture;
65
66     private volatile boolean initCommit;
67
68     @GuardedBy("txLock")
69     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
70
71     public TransactionChainManager(@Nonnull final DataBroker dataBroker,
72                                    @Nonnull final String deviceIdentifier) {
73         this.dataBroker = dataBroker;
74         this.nodeId = deviceIdentifier;
75         this.lastSubmittedFuture = Futures.immediateFuture(null);
76     }
77
78     @GuardedBy("txLock")
79     private void createTxChain() {
80         BindingTransactionChain txChainFactoryTemp = transactionChain;
81         transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
82         Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
83     }
84
85     public boolean initialSubmitWriteTransaction() {
86         enableSubmit();
87         return submitTransaction();
88     }
89
90     /**
91      * Method change status for TxChainManager to WORKING and it has to make
92      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
93      * transactions. Call this method for MASTER role only.
94      */
95     public void activateTransactionManager() {
96         if (LOG.isDebugEnabled()) {
97             LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
98                     this.nodeId, submitIsEnabled);
99         }
100         synchronized (txLock) {
101             if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
102                 Preconditions.checkState(transactionChain == null,
103                         "TxChainFactory survive last close.");
104                 Preconditions.checkState(writeTx == null,
105                         "We have some unexpected WriteTransaction.");
106                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
107                 this.submitIsEnabled = false;
108                 this.initCommit = true;
109                 createTxChain();
110             }
111         }
112     }
113
114     /**
115      * Method change status for TxChainManger to SLEEPING and it unregisters
116      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
117      * Call this method for SLAVE only.
118      * @return Future
119      */
120     public ListenableFuture<Void> deactivateTransactionManager() {
121         if (LOG.isDebugEnabled()) {
122             LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
123         }
124         final ListenableFuture<Void> future;
125         synchronized (txLock) {
126             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
127                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
128                 future = txChainShuttingDown();
129                 Preconditions.checkState(writeTx == null,
130                         "We have some unexpected WriteTransaction.");
131                 Futures.addCallback(future, new FutureCallback<Void>() {
132                     @Override
133                     public void onSuccess(final Void result) {
134                         closeTransactionChain();
135                     }
136
137                     @Override
138                     public void onFailure(@Nonnull final Throwable throwable) {
139                         closeTransactionChain();
140                     }
141                 }, MoreExecutors.directExecutor());
142             } else {
143                 // ignoring redundant deactivate invocation
144                 future = Futures.immediateFuture(null);
145             }
146         }
147         return future;
148     }
149
150     private void closeTransactionChain() {
151         if (writeTx != null) {
152             writeTx.cancel();
153             writeTx = null;
154         }
155         Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
156         transactionChain = null;
157     }
158
159     @GuardedBy("txLock")
160     public boolean submitTransaction() {
161         return submitTransaction(false);
162     }
163
164     @GuardedBy("txLock")
165     public boolean submitTransaction(boolean doSync) {
166         synchronized (txLock) {
167             if (!submitIsEnabled) {
168                 if (LOG.isTraceEnabled()) {
169                     LOG.trace("transaction not committed - submit block issued");
170                 }
171                 return false;
172             }
173             if (Objects.isNull(writeTx)) {
174                 if (LOG.isTraceEnabled()) {
175                     LOG.trace("nothing to commit - submit returns true");
176                 }
177                 return true;
178             }
179             Preconditions.checkState(TransactionChainManagerStatus.WORKING == transactionChainManagerStatus,
180                     "we have here Uncompleted Transaction for node {} and we are not MASTER",
181                     this.nodeId);
182             final ListenableFuture<Void> submitFuture = writeTx.submit();
183             lastSubmittedFuture = submitFuture;
184             writeTx = null;
185
186             if (initCommit || doSync) {
187                 try {
188                     submitFuture.get(5L, TimeUnit.SECONDS);
189                 } catch (InterruptedException | ExecutionException | TimeoutException ex) {
190                     LOG.error("Exception during INITIAL({}) || doSync({}) transaction submitting. ",
191                             initCommit, doSync, ex);
192                     return false;
193                 }
194                 initCommit = false;
195                 return true;
196             }
197
198             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
199                 @Override
200                 public void onSuccess(final Void result) {
201                     //NOOP
202                 }
203
204                 @Override
205                 public void onFailure(final Throwable throwable) {
206                     if (throwable instanceof TransactionCommitFailedException) {
207                         LOG.error("Transaction commit failed. ", throwable);
208                     } else {
209                         if (throwable instanceof CancellationException) {
210                             LOG.warn("Submit task was canceled");
211                             LOG.trace("Submit exception: ", throwable);
212                         } else {
213                             LOG.error("Exception during transaction submitting. ", throwable);
214                         }
215                     }
216                 }
217             }, MoreExecutors.directExecutor());
218         }
219         return true;
220     }
221
222     public <T extends DataObject> void addDeleteOperationToTxChain(final LogicalDatastoreType store,
223                                                                     final InstanceIdentifier<T> path) {
224         synchronized (txLock) {
225             ensureTransaction();
226             if (writeTx == null) {
227                 LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
228                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
229             }
230
231             writeTx.delete(store, path);
232         }
233     }
234
235     public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
236                                                           final InstanceIdentifier<T> path,
237                                                           final T data,
238                                                           final boolean createParents) {
239         synchronized (txLock) {
240             ensureTransaction();
241             if (writeTx == null) {
242                 LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
243                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
244             }
245
246             writeTx.put(store, path, data, createParents);
247         }
248     }
249
250     public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
251                                                           final InstanceIdentifier<T> path,
252                                                           final T data,
253                                                           final boolean createParents) {
254         synchronized (txLock) {
255             ensureTransaction();
256             if (writeTx == null) {
257                 LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
258                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
259             }
260
261             writeTx.merge(store, path, data, createParents);
262         }
263     }
264
265     public <T extends DataObject> CheckedFuture<com.google.common.base.Optional<T>, ReadFailedException>
266         readFromTransaction(final LogicalDatastoreType store, final InstanceIdentifier<T> path) {
267         synchronized (txLock) {
268             ensureTransaction();
269             if (writeTx == null) {
270                 LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
271                 throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
272             }
273
274             return writeTx.read(store, path);
275         }
276     }
277
278     @Override
279     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
280                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
281         synchronized (txLock) {
282             if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
283                     && chain.equals(this.transactionChain)) {
284                 LOG.warn("Transaction chain failed, recreating chain due to ", cause);
285                 closeTransactionChain();
286                 createTxChain();
287                 writeTx = null;
288             }
289         }
290     }
291
292     @Override
293     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
294         // NOOP
295     }
296
297     @GuardedBy("txLock")
298     private void ensureTransaction() {
299         if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
300                 && transactionChain != null) {
301             writeTx = transactionChain.newReadWriteTransaction();
302         }
303     }
304
305     private void enableSubmit() {
306         synchronized (txLock) {
307             /* !!!IMPORTANT: never set true without transactionChain */
308             submitIsEnabled = transactionChain != null;
309         }
310     }
311
312     public ListenableFuture<Void> shuttingDown() {
313         if (LOG.isDebugEnabled()) {
314             LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
315         }
316         synchronized (txLock) {
317             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
318             return txChainShuttingDown();
319         }
320     }
321
322     @GuardedBy("txLock")
323     private ListenableFuture<Void> txChainShuttingDown() {
324         boolean wasSubmitEnabled = submitIsEnabled;
325         submitIsEnabled = false;
326         ListenableFuture<Void> future;
327
328         if (!wasSubmitEnabled || transactionChain == null) {
329             // stay with actual thread
330             future = Futures.immediateCheckedFuture(null);
331
332             if (writeTx != null) {
333                 writeTx.cancel();
334                 writeTx = null;
335             }
336         } else if (writeTx == null) {
337             // hijack md-sal thread
338             future = lastSubmittedFuture;
339         } else {
340             if (LOG.isDebugEnabled()) {
341                 LOG.debug("Submitting all transactions for Node {}", this.nodeId);
342             }
343             // hijack md-sal thread
344             future = writeTx.submit();
345             writeTx = null;
346         }
347
348         return future;
349     }
350
351     @Override
352     public void close() {
353         if (LOG.isDebugEnabled()) {
354             LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
355         }
356         synchronized (txLock) {
357             closeTransactionChain();
358         }
359     }
360
361     private enum TransactionChainManagerStatus {
362         /**
363          * txChainManager is working - is active (MASTER).
364          */
365         WORKING,
366         /**
367          * txChainManager is sleeping - is not active (SLAVE or default init value).
368          */
369         SLEEPING,
370         /**
371          * txChainManager is trying to be closed - device disconnecting.
372          */
373         SHUTTING_DOWN
374     }
375 }