Do not retain DeviceState
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Optional;
13 import com.google.common.base.Preconditions;
14 import com.google.common.base.Verify;
15 import com.google.common.util.concurrent.AsyncFunction;
16 import com.google.common.util.concurrent.CheckedFuture;
17 import com.google.common.util.concurrent.FutureCallback;
18 import com.google.common.util.concurrent.FutureFallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
27 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
29 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
30 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
33 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
34 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
41 import org.opendaylight.yangtools.yang.binding.DataObject;
42 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
43 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * openflowplugin-impl
49  * org.opendaylight.openflowplugin.impl.device
50  * <p/>
51  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
52  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
53  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
54  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
55  *
56  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
57  *         </p>
58  *         Created: Apr 2, 2015
59  */
60 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
61
62     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
63
64     private final Object txLock = new Object();
65     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
66     private final DataBroker dataBroker;
67
68     @GuardedBy("txLock")
69     private WriteTransaction wTx;
70     @GuardedBy("txLock")
71     private BindingTransactionChain txChainFactory;
72     @GuardedBy("txLock")
73     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
74
75     private boolean submitIsEnabled;
76
77     TransactionChainManager(@Nonnull final DataBroker dataBroker, @Nonnull final DeviceState deviceState) {
78         this.dataBroker = Preconditions.checkNotNull(dataBroker);
79         this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier());
80         LOG.debug("created txChainManager for {}", nodeII);
81     }
82
83     private NodeId nodeId() {
84         return nodeII.getKey().getId();
85     }
86
87     @GuardedBy("txLock")
88     private void createTxChain() {
89         if (txChainFactory != null) {
90             txChainFactory.close();
91         }
92         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
93     }
94
95     void initialSubmitWriteTransaction() {
96         enableSubmit();
97         submitWriteTransaction();
98     }
99
100     /**
101      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
102      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
103      * transactions. Call this method for MASTER role only.
104      */
105     public void activateTransactionManager() {
106         LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", nodeId(), submitIsEnabled);
107         synchronized (txLock) {
108             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
109                 LOG.debug("Transaction Factory create {}", nodeId());
110                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
111                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
112                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
113                 createTxChain();
114             } else {
115                 LOG.debug("Transaction is active {}", nodeId());
116             }
117         }
118     }
119
120     /**
121      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
122      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
123      * Call this method for SLAVE only.
124      * @return Future
125      */
126     public ListenableFuture<Void> deactivateTransactionManager() {
127         final ListenableFuture<Void> future;
128         synchronized (txLock) {
129             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
130                 LOG.debug("Submitting all transactions if we were in status WORKING for Node", nodeId());
131                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
132                 future = txChainShuttingDown();
133                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
134                 LOG.debug("Transaction Factory delete for Node {}", nodeId());
135                 Futures.addCallback(future, new FutureCallback<Void>() {
136                     @Override
137                     public void onSuccess(final Void result) {
138                         txChainFactory.close();
139                         txChainFactory = null;
140                     }
141
142                     @Override
143                     public void onFailure(final Throwable t) {
144                         txChainFactory.close();
145                         txChainFactory = null;
146                     }
147                 });
148             } else {
149                 // TODO : ignoring redundant deactivate invocation
150                 future = Futures.immediateCheckedFuture(null);
151             }
152         }
153         return future;
154     }
155
156     boolean submitWriteTransaction() {
157         if (!submitIsEnabled) {
158             LOG.trace("transaction not committed - submit block issued");
159             return false;
160         }
161         synchronized (txLock) {
162             if (wTx == null) {
163                 LOG.trace("nothing to commit - submit returns true");
164                 return true;
165             }
166             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
167                     "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
168             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
169             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
170                 @Override
171                 public void onSuccess(final Void result) {
172                     //no action required
173                 }
174
175                 @Override
176                 public void onFailure(final Throwable t) {
177                     if (t instanceof TransactionCommitFailedException) {
178                         LOG.error("Transaction commit failed. {}", t);
179                     } else {
180                         LOG.error("Exception during transaction submitting. {}", t);
181                     }
182                 }
183             });
184             wTx = null;
185         }
186         return true;
187     }
188
189     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
190                                                              final InstanceIdentifier<T> path) {
191         final WriteTransaction writeTx = getTransactionSafely();
192         if (writeTx != null) {
193             writeTx.delete(store, path);
194         } else {
195             LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
196         }
197     }
198
199     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
200                                                    final InstanceIdentifier<T> path, final T data) {
201         final WriteTransaction writeTx = getTransactionSafely();
202         if (writeTx != null) {
203             writeTx.put(store, path, data);
204         } else {
205             LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
206         }
207     }
208
209     @Override
210     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
211                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
212         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
213             LOG.warn("txChain failed -> recreating", cause);
214             recreateTxChain();
215         }
216     }
217
218     @Override
219     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
220         // NOOP
221     }
222
223     private void recreateTxChain() {
224         synchronized (txLock) {
225             createTxChain();
226             wTx = null;
227         }
228     }
229
230     @Nullable
231     private WriteTransaction getTransactionSafely() {
232         if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
233             synchronized (txLock) {
234                 if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
235                     if (wTx == null && txChainFactory != null) {
236                         wTx = txChainFactory.newWriteOnlyTransaction();
237                     }
238                 }
239             }
240         }
241         return wTx;
242     }
243
244     @VisibleForTesting
245     void enableSubmit() {
246         submitIsEnabled = true;
247     }
248
249     ListenableFuture<Void> shuttingDown() {
250         LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII);
251         ListenableFuture<Void> future;
252         synchronized (txLock) {
253             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
254             future = txChainShuttingDown();
255         }
256         return future;
257     }
258
259     private ListenableFuture<Void> txChainShuttingDown() {
260         ListenableFuture<Void> future;
261         if (txChainFactory == null) {
262             // stay with actual thread
263             future = Futures.immediateCheckedFuture(null);
264         } else {
265             // hijack md-sal thread
266             if (wTx == null) {
267                 wTx = txChainFactory.newWriteOnlyTransaction();
268             }
269             final NodeBuilder nodeBuilder = new NodeBuilder().setId(nodeId());
270             wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
271             future = wTx.submit();
272             wTx = null;
273
274             future = Futures.withFallback(future, new FutureFallback<Void>() {
275
276                 @Override
277                 public ListenableFuture<Void> create(final Throwable t) throws Exception {
278                     LOG.debug("Last ShuttingDown Transaction for node {} fail. Put empty FlowCapableNode", nodeId());
279                     final ReadOnlyTransaction readWriteTx = dataBroker.newReadOnlyTransaction();
280                     final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readFlowNode = readWriteTx
281                             .read(LogicalDatastoreType.OPERATIONAL, nodeII.augmentation(FlowCapableNode.class));
282                     return Futures.transform(readFlowNode, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
283
284                         @Override
285                         public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) {
286                             if (input.isPresent()) {
287                                 final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
288                                 nodeBuilder.addAugmentation(FlowCapableNode.class, new FlowCapableNodeBuilder().build());
289                                 delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
290                                 return delWtx.submit();
291                             }
292                             return Futures.immediateFuture(null);
293                         }
294                     });
295                 }
296             });
297         }
298         return future;
299     }
300
301     /**
302      * Transaction could be close if we are not submit anything. We have property submitIsEnable what
303      * could protect us for check it is NEW transaction from chain and we are able close everything
304      * safely.
305      */
306     void clearUnsubmittedTransaction() {
307         LOG.debug("Cleaning unsubmited Transaction for Device {}", nodeId());
308         Verify.verify(!submitIsEnabled, "We are not able clean TxChain {}", nodeId());
309         synchronized (txLock) {
310             if (wTx != null) {
311                 wTx.cancel();
312                 wTx = null;
313             }
314             if (txChainFactory != null) {
315                 txChainFactory.close();
316                 txChainFactory = null;
317             }
318             transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
319         }
320     }
321
322     @Override
323     public void close() {
324         LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN, will wait for ownershipservice to notify", nodeII);
325         Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
326         Preconditions.checkState(wTx == null);
327         synchronized (txLock) {
328             if (txChainFactory != null) {
329                 txChainFactory.close();
330                 txChainFactory = null;
331             }
332         }
333         Preconditions.checkState(txChainFactory == null);
334     }
335
336     private enum TransactionChainManagerStatus {
337         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
338         WORKING,
339         /** txChainManager is working - is active (MASTER) */
340         SLEEPING,
341         /** txChainManager is trying to be closed - device disconnecting */
342         SHUTTING_DOWN;
343     }
344 }