Do not leak internal state
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.FutureFallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
40 import org.opendaylight.yangtools.yang.binding.DataObject;
41 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
42 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * openflowplugin-impl
48  * org.opendaylight.openflowplugin.impl.device
49  * <p/>
50  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
51  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
52  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
53  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
54  *
55  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
56  *         </p>
57  *         Created: Apr 2, 2015
58  */
59 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
60
61     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
62
63     private final Object txLock = new Object();
64
65     private final DataBroker dataBroker;
66     private final DeviceState deviceState;
67     @GuardedBy("txLock")
68     private WriteTransaction wTx;
69     @GuardedBy("txLock")
70     private BindingTransactionChain txChainFactory;
71     private boolean submitIsEnabled;
72
73     @GuardedBy("txLock")
74     private TransactionChainManagerStatus transactionChainManagerStatus;
75     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
76
77     TransactionChainManager(@Nonnull final DataBroker dataBroker,
78                             @Nonnull final DeviceState deviceState) {
79         this.dataBroker = Preconditions.checkNotNull(dataBroker);
80         this.deviceState = Preconditions.checkNotNull(deviceState);
81         this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier());
82         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
83         LOG.debug("created txChainManager");
84     }
85
86     @GuardedBy("txLock")
87     private void createTxChain() {
88         if (txChainFactory != null) {
89             txChainFactory.close();
90         }
91         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
92     }
93
94     void initialSubmitWriteTransaction() {
95         enableSubmit();
96         submitWriteTransaction();
97     }
98
99     /**
100      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
101      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
102      * transactions. Call this method for MASTER role only.
103      */
104     public void activateTransactionManager() {
105         LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), this.submitIsEnabled);
106         synchronized (txLock) {
107             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
108                 LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
109                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
110                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
111                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
112                 createTxChain();
113             } else {
114                 LOG.debug("Transaction is active {}", deviceState.getNodeId());
115             }
116         }
117     }
118
119     /**
120      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
121      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
122      * Call this method for SLAVE only.
123      * @return Future
124      */
125     public ListenableFuture<Void> deactivateTransactionManager() {
126         final ListenableFuture<Void> future;
127         synchronized (txLock) {
128             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
129                 LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
130                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
131                 future = txChainShuttingDown();
132                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
133                 LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId());
134                 Futures.addCallback(future, new FutureCallback<Void>() {
135                     @Override
136                     public void onSuccess(final Void result) {
137                         txChainFactory.close();
138                         txChainFactory = null;
139                     }
140
141                     @Override
142                     public void onFailure(final Throwable t) {
143                         txChainFactory.close();
144                         txChainFactory = null;
145                     }
146                 });
147             } else {
148                 // TODO : ignoring redundant deactivate invocation
149                 future = Futures.immediateCheckedFuture(null);
150             }
151         }
152         return future;
153     }
154
155     boolean submitWriteTransaction() {
156         if (!submitIsEnabled) {
157             LOG.trace("transaction not committed - submit block issued");
158             return false;
159         }
160         synchronized (txLock) {
161             if (wTx == null) {
162                 LOG.trace("nothing to commit - submit returns true");
163                 return true;
164             }
165             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
166                     "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
167             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
168             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
169                 @Override
170                 public void onSuccess(final Void result) {
171                     //no action required
172                 }
173
174                 @Override
175                 public void onFailure(final Throwable t) {
176                     if (t instanceof TransactionCommitFailedException) {
177                         LOG.error("Transaction commit failed. {}", t);
178                     } else {
179                         LOG.error("Exception during transaction submitting. {}", t);
180                     }
181                 }
182             });
183             wTx = null;
184         }
185         return true;
186     }
187
188     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
189                                                              final InstanceIdentifier<T> path) {
190         final WriteTransaction writeTx = getTransactionSafely();
191         if (writeTx != null) {
192             writeTx.delete(store, path);
193         } else {
194             LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
195         }
196     }
197
198     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
199                                                    final InstanceIdentifier<T> path, final T data) {
200         final WriteTransaction writeTx = getTransactionSafely();
201         if (writeTx != null) {
202             writeTx.put(store, path, data);
203         } else {
204             LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
205         }
206     }
207
208     @Override
209     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
210                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
211         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
212             LOG.warn("txChain failed -> recreating", cause);
213             recreateTxChain();
214         }
215     }
216
217     @Override
218     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
219         // NOOP
220     }
221
222     private void recreateTxChain() {
223         synchronized (txLock) {
224             createTxChain();
225             wTx = null;
226         }
227     }
228
229     @Nullable
230     private WriteTransaction getTransactionSafely() {
231         if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
232             synchronized (txLock) {
233                 if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
234                     if (wTx == null && txChainFactory != null) {
235                         wTx = txChainFactory.newWriteOnlyTransaction();
236                     }
237                 }
238             }
239         }
240         return wTx;
241     }
242
243     @VisibleForTesting
244     void enableSubmit() {
245         submitIsEnabled = true;
246     }
247
248     ListenableFuture<Void> shuttingDown() {
249         LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII);
250         ListenableFuture<Void> future;
251         synchronized (txLock) {
252             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
253             future = txChainShuttingDown();
254         }
255         return future;
256     }
257
258     private ListenableFuture<Void> txChainShuttingDown() {
259         ListenableFuture<Void> future;
260         if (txChainFactory == null) {
261             // stay with actual thread
262             future = Futures.immediateCheckedFuture(null);
263         } else {
264             // hijack md-sal thread
265             if (wTx == null) {
266                 wTx = txChainFactory.newWriteOnlyTransaction();
267             }
268             final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId());
269             wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
270             future = wTx.submit();
271             wTx = null;
272
273             future = Futures.withFallback(future, new FutureFallback<Void>() {
274
275                 @Override
276                 public ListenableFuture<Void> create(final Throwable t) throws Exception {
277                     LOG.debug("Last ShuttingDown Transaction for node {} fail. Put empty FlowCapableNode",
278                             deviceState.getNodeId());
279                     final ReadOnlyTransaction readWriteTx = dataBroker.newReadOnlyTransaction();
280                     final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readFlowNode = readWriteTx
281                             .read(LogicalDatastoreType.OPERATIONAL, nodeII.augmentation(FlowCapableNode.class));
282                     return Futures.transform(readFlowNode, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
283
284                         @Override
285                         public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) {
286                             if (input.isPresent()) {
287                                 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
288                                 nodeBuilder.addAugmentation(FlowCapableNode.class, new FlowCapableNodeBuilder().build());
289                                 delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
290                                 return delWtx.submit();
291                             }
292                             return Futures.immediateFuture(null);
293                         }
294                     });
295                 }
296             });
297         }
298         return future;
299     }
300
301     /**
302      * Transaction could be close if we are not submit anything. We have property submitIsEnable what
303      * could protect us for check it is NEW transaction from chain and we are able close everything
304      * safely.
305      */
306     void clearUnsubmittedTransaction() {
307         LOG.debug("Cleaning unsubmited Transaction for Device {}", deviceState.getNodeId());
308         Verify.verify(!submitIsEnabled, "We are not able clean TxChain {}", deviceState.getNodeId());
309         synchronized (txLock) {
310             if (wTx != null) {
311                 wTx.cancel();
312                 wTx = null;
313             }
314             if (txChainFactory != null) {
315                 txChainFactory.close();
316                 txChainFactory = null;
317             }
318             transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
319         }
320     }
321
322     @Override
323     public void close() {
324         LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN, will wait for ownershipservice to notify", nodeII);
325         Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
326         Preconditions.checkState(wTx == null);
327         synchronized (txLock) {
328             if (txChainFactory != null) {
329                 txChainFactory.close();
330                 txChainFactory = null;
331             }
332         }
333         Preconditions.checkState(txChainFactory == null);
334     }
335
336     private enum TransactionChainManagerStatus {
337         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
338         WORKING,
339         /** txChainManager is working - is active (MASTER) */
340         SLEEPING,
341         /** txChainManager is trying to be closed - device disconnecting */
342         SHUTTING_DOWN;
343     }
344 }