Bug 4957 Role lifecycle support for TxChainManager in DeviceContext
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import javax.annotation.Nonnull;
12 import javax.annotation.Nullable;
13 import javax.annotation.concurrent.GuardedBy;
14
15 import com.google.common.annotations.VisibleForTesting;
16 import com.google.common.base.Preconditions;
17 import com.google.common.util.concurrent.CheckedFuture;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
30 import org.opendaylight.yangtools.concepts.Registration;
31 import org.opendaylight.yangtools.yang.binding.DataObject;
32 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
33 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * openflowplugin-impl
39  * org.opendaylight.openflowplugin.impl.device
40  * <p/>
41  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
42  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
43  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
44  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
45  *
46  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
47  *         </p>
48  *         Created: Apr 2, 2015
49  */
50 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
51
52     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
53
54     private final Object txLock = new Object();
55
56     private final DataBroker dataBroker;
57 //    private final DeviceState deviceState;
58     @GuardedBy("txLock")
59     private WriteTransaction wTx;
60     @GuardedBy("txLock")
61     private BindingTransactionChain txChainFactory;
62     private boolean submitIsEnabled;
63
64     public TransactionChainManagerStatus getTransactionChainManagerStatus() {
65         return transactionChainManagerStatus;
66     }
67
68     @GuardedBy("txLock")
69     private TransactionChainManagerStatus transactionChainManagerStatus;
70     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
71     private volatile Registration managerRegistration;
72
73     TransactionChainManager(@Nonnull final DataBroker dataBroker,
74                             @Nonnull final KeyedInstanceIdentifier<Node, NodeKey> nodeII,
75                             @Nonnull final Registration managerRegistration) {
76         this.dataBroker = Preconditions.checkNotNull(dataBroker);
77         this.nodeII = Preconditions.checkNotNull(nodeII);
78         this.managerRegistration = Preconditions.checkNotNull(managerRegistration);
79         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
80         createTxChain();
81         LOG.debug("created txChainManager");
82     }
83
84     @GuardedBy("txLock")
85     private void createTxChain() {
86         if (txChainFactory != null) {
87             txChainFactory.close();
88         }
89         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
90     }
91
92     void initialSubmitWriteTransaction() {
93         enableSubmit();
94         submitWriteTransaction();
95     }
96
97     /**
98      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
99      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
100      * transactions. Call this method for MASTER role only.
101      * @param enableSubmit - marker to be sure a WriteTransaction submit is not blocking
102      *            (Blocking write is used for initialization part only)
103      */
104     public void activateTransactionManager(final boolean enableSubmit) {
105 //        LOG.trace("activetTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), enableSubmit);
106         synchronized (txLock) {
107             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
108 //                LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
109                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
110                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
111                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
112                 createTxChain();
113                 this.submitIsEnabled = enableSubmit;
114             } else {
115 //                LOG.debug("Transaction is active {}", deviceState.getNodeId());
116             }
117         }
118     }
119
120     /**
121      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
122      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
123      * Call this method for SLAVE only.
124      */
125     public void deactivateTransactionManager() {
126         synchronized (txLock) {
127             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
128 //                LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
129                 submitWriteTransaction();
130                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
131 //                LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId());
132                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
133                 txChainFactory.close();
134                 txChainFactory = null;
135             }
136         }
137     }
138
139     boolean submitWriteTransaction() {
140         if (!submitIsEnabled) {
141             LOG.trace("transaction not committed - submit block issued");
142             return false;
143         }
144         synchronized (txLock) {
145             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
146                     "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
147             if (wTx == null) {
148                 LOG.trace("nothing to commit - submit returns true");
149                 return true;
150             }
151             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
152             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
153                 @Override
154                 public void onSuccess(Void result) {
155                     //no action required
156                 }
157
158                 @Override
159                 public void onFailure(Throwable t) {
160                     if (t instanceof TransactionCommitFailedException) {
161                         LOG.error("Transaction commit failed. {}", t);
162                     } else {
163                         LOG.error("Exception during transaction submitting. {}", t);
164                     }
165                 }
166             });
167             wTx = null;
168         }
169         return true;
170     }
171
172     @Deprecated
173     public void cancelWriteTransaction() {
174         // there is no cancel txn in ping-pong broker. So we need to drop the chain and recreate it.
175         // since the chain is created per device, there won't be any other txns other than ones we created.
176         recreateTxChain();
177     }
178
179     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
180                                                              final InstanceIdentifier<T> path) {
181         final WriteTransaction writeTx = getTransactionSafely();
182         if (writeTx != null) {
183             writeTx.delete(store, path);
184         } else {
185             LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
186         }
187     }
188
189     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
190                                                    final InstanceIdentifier<T> path, final T data) {
191         final WriteTransaction writeTx = getTransactionSafely();
192         if (writeTx != null) {
193             writeTx.put(store, path, data);
194         } else {
195             LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
196         }
197     }
198
199     @Override
200     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
201                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
202         LOG.warn("txChain failed -> recreating", cause);
203         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
204             recreateTxChain();
205         }
206     }
207
208     @Override
209     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
210         // NOOP
211     }
212
213     private void recreateTxChain() {
214         synchronized (txLock) {
215             createTxChain();
216             wTx = null;
217         }
218     }
219
220     @Nullable
221     private WriteTransaction getTransactionSafely() {
222         if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
223             synchronized (txLock) {
224                 if (wTx == null && txChainFactory != null) {
225                     wTx = txChainFactory.newWriteOnlyTransaction();
226                 }
227             }
228         }
229         return wTx;
230     }
231
232     @VisibleForTesting
233     void enableSubmit() {
234         submitIsEnabled = true;
235     }
236
237     /**
238      * @deprecated will be removed
239      * @param removeDSNode
240      */
241     @Deprecated
242     public void cleanupPostClosure(final boolean removeDSNode) {
243         synchronized (txLock) {
244             if (removeDSNode) {
245                 LOG.info("Removing from operational DS, node {} ", nodeII);
246                 final WriteTransaction writeTx = getTransactionSafely();
247                 this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
248                 writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
249                 LOG.debug("Delete from operational DS put to write transaction. node {} ", nodeII);
250                 final CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
251                 LOG.info("Delete from operational DS write transaction submitted. node {} ", nodeII);
252                 Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
253                     @Override
254                     public void onSuccess(final Void aVoid) {
255                         LOG.debug("Removing from operational DS successful . node {} ", nodeII);
256                         notifyReadyForNewTransactionChainAndCloseFactory();
257                     }
258
259                     @Override
260                     public void onFailure(final Throwable throwable) {
261                         LOG.info("Attempt to close transaction chain factory failed.", throwable);
262                         notifyReadyForNewTransactionChainAndCloseFactory();
263                     }
264                 });
265                 wTx = null;
266             } else {
267                 if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WAITING_TO_BE_SHUT)) {
268                     LOG.info("This is a disconnect, but not the last node,transactionChainManagerStatus={}, node:{}",
269                             transactionChainManagerStatus, nodeII);
270                     // a disconnect has happened, but this is not the last node in the cluster, so just close the chain
271                     this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
272                     notifyReadyForNewTransactionChainAndCloseFactory();
273                     wTx = null;
274                 } else {
275                     LOG.trace("This is not a disconnect, hence we are not closing txnChainMgr,transactionChainManagerStatus={}, node:{}",
276                             transactionChainManagerStatus, nodeII);
277                 }
278
279             }
280         }
281     }
282
283     /**
284      * @deprecated will be removed
285      */
286     @Deprecated
287     private void notifyReadyForNewTransactionChainAndCloseFactory() {
288         synchronized (this) {
289             try {
290                 LOG.info("Closing registration in manager.node:{} ", nodeII);
291                 if (managerRegistration != null) {
292                     managerRegistration.close();
293                 }
294             } catch (final Exception e) {
295                 LOG.warn("Failed to close transaction chain manager's registration..node:{} ", nodeII, e);
296             }
297             managerRegistration = null;
298         }
299         txChainFactory.close();
300         LOG.info("Transaction chain factory closed. node:{} ", nodeII);
301     }
302
303     @Override
304     public void close() {
305         LOG.info("Setting transactionChainManagerStatus to WAITING_TO_BE_SHUT, will wait for ownershipservice to notify", nodeII);
306         // we can finish in initial phase
307         initialSubmitWriteTransaction();
308         synchronized (txLock) {
309             if (txChainFactory != null) {
310                 txChainFactory.close();
311                 txChainFactory = null;
312             }
313             this.transactionChainManagerStatus = TransactionChainManagerStatus.WAITING_TO_BE_SHUT;
314         }
315         Preconditions.checkState(wTx == null);
316         Preconditions.checkState(txChainFactory == null);
317     }
318
319     public enum TransactionChainManagerStatus {
320         WORKING, SLEEPING, WAITING_TO_BE_SHUT, SHUTTING_DOWN;
321     }
322
323
324
325 }