Merge "Bug-4957: Make async operational DS Read"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import javax.annotation.Nonnull;
17 import javax.annotation.Nullable;
18 import javax.annotation.concurrent.GuardedBy;
19 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
20 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
21 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
22 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
24 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
28 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
31 import org.opendaylight.yangtools.yang.binding.DataObject;
32 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
33 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * openflowplugin-impl
39  * org.opendaylight.openflowplugin.impl.device
40  * <p/>
41  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
42  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
43  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
44  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
45  *
46  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
47  *         </p>
48  *         Created: Apr 2, 2015
49  */
50 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
51
52     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
53
54     private final Object txLock = new Object();
55
56     private final DataBroker dataBroker;
57     private final DeviceState deviceState;
58     @GuardedBy("txLock")
59     private WriteTransaction wTx;
60     @GuardedBy("txLock")
61     private BindingTransactionChain txChainFactory;
62     private boolean submitIsEnabled;
63
64     public TransactionChainManagerStatus getTransactionChainManagerStatus() {
65         return transactionChainManagerStatus;
66     }
67
68     @GuardedBy("txLock")
69     private TransactionChainManagerStatus transactionChainManagerStatus;
70     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
71
72     TransactionChainManager(@Nonnull final DataBroker dataBroker,
73                             @Nonnull final DeviceState deviceState) {
74         this.dataBroker = Preconditions.checkNotNull(dataBroker);
75         this.deviceState = Preconditions.checkNotNull(deviceState);
76         this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier());
77         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
78         LOG.debug("created txChainManager");
79     }
80
81     @GuardedBy("txLock")
82     private void createTxChain() {
83         if (txChainFactory != null) {
84             txChainFactory.close();
85         }
86         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
87     }
88
89     void initialSubmitWriteTransaction() {
90         enableSubmit();
91         submitWriteTransaction();
92     }
93
94     /**
95      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
96      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
97      * transactions. Call this method for MASTER role only.
98      */
99     public void activateTransactionManager() {
100         LOG.trace("activetTransactionManaager for node {} transaction submit is set to {}", deviceState.getNodeId());
101         synchronized (txLock) {
102             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
103                 LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
104                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
105                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
106                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
107                 createTxChain();
108             } else {
109                 LOG.debug("Transaction is active {}", deviceState.getNodeId());
110             }
111         }
112     }
113
114     /**
115      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
116      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
117      * Call this method for SLAVE only.
118      * @return Future
119      */
120     public CheckedFuture<Void, TransactionCommitFailedException> deactivateTransactionManager() {
121         final CheckedFuture<Void, TransactionCommitFailedException> future;
122         synchronized (txLock) {
123             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
124                 LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
125                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
126                 future = txChainShuttingDown();
127                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
128                 LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId());
129                 Futures.addCallback(future, new FutureCallback<Void>() {
130                     @Override
131                     public void onSuccess(final Void result) {
132                         txChainFactory.close();
133                         txChainFactory = null;
134                     }
135
136                     @Override
137                     public void onFailure(final Throwable t) {
138                         txChainFactory.close();
139                         txChainFactory = null;
140                     }
141                 });
142             } else {
143                 // TODO : ignoring redundant deactivate invocation
144                 future = Futures.immediateCheckedFuture(null);
145             }
146         }
147         return future;
148     }
149
150     boolean submitWriteTransaction() {
151         if (!submitIsEnabled) {
152             LOG.trace("transaction not committed - submit block issued");
153             return false;
154         }
155         synchronized (txLock) {
156             if (wTx == null) {
157                 LOG.trace("nothing to commit - submit returns true");
158                 return true;
159             }
160             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
161                     "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
162             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
163             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
164                 @Override
165                 public void onSuccess(final Void result) {
166                     //no action required
167                 }
168
169                 @Override
170                 public void onFailure(final Throwable t) {
171                     if (t instanceof TransactionCommitFailedException) {
172                         LOG.error("Transaction commit failed. {}", t);
173                     } else {
174                         LOG.error("Exception during transaction submitting. {}", t);
175                     }
176                 }
177             });
178             wTx = null;
179         }
180         return true;
181     }
182
183     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
184                                                              final InstanceIdentifier<T> path) {
185         final WriteTransaction writeTx = getTransactionSafely();
186         if (writeTx != null) {
187             writeTx.delete(store, path);
188         } else {
189             LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
190         }
191     }
192
193     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
194                                                    final InstanceIdentifier<T> path, final T data) {
195         final WriteTransaction writeTx = getTransactionSafely();
196         if (writeTx != null) {
197             writeTx.put(store, path, data);
198         } else {
199             LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
200         }
201     }
202
203     @Override
204     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
205                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
206         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
207             LOG.warn("txChain failed -> recreating", cause);
208             recreateTxChain();
209         }
210     }
211
212     @Override
213     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
214         // NOOP
215     }
216
217     private void recreateTxChain() {
218         synchronized (txLock) {
219             createTxChain();
220             wTx = null;
221         }
222     }
223
224     @Nullable
225     private WriteTransaction getTransactionSafely() {
226         if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
227             synchronized (txLock) {
228                 if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
229                     if (wTx == null && txChainFactory != null) {
230                         wTx = txChainFactory.newWriteOnlyTransaction();
231                     }
232                 }
233             }
234         }
235         return wTx;
236     }
237
238     @VisibleForTesting
239     void enableSubmit() {
240         submitIsEnabled = true;
241     }
242
243     CheckedFuture<Void, TransactionCommitFailedException> shuttingDown() {
244         LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII);
245         CheckedFuture<Void, TransactionCommitFailedException> future;
246         synchronized (txLock) {
247             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
248             future = txChainShuttingDown();
249         }
250         return future;
251     }
252
253     private CheckedFuture<Void, TransactionCommitFailedException> txChainShuttingDown() {
254         CheckedFuture<Void, TransactionCommitFailedException> future;
255         if (txChainFactory == null) {
256             // stay with actual thread
257             future = Futures.immediateCheckedFuture(null);
258         } else {
259             // hijack md-sal thread
260             if (wTx == null) {
261                 wTx = txChainFactory.newWriteOnlyTransaction();
262             }
263             final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId());
264             wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
265             future = wTx.submit();
266             wTx = null;
267         }
268         return future;
269     }
270
271     @Override
272     public void close() {
273         LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN, will wait for ownershipservice to notify", nodeII);
274         Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
275         Preconditions.checkState(wTx == null);
276         synchronized (txLock) {
277             if (txChainFactory != null) {
278                 txChainFactory.close();
279                 txChainFactory = null;
280             }
281         }
282         Preconditions.checkState(txChainFactory == null);
283     }
284
285     public enum TransactionChainManagerStatus {
286         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
287         WORKING,
288         /** txChainManager is working - is active (MASTER) */
289         SLEEPING,
290         /** txChainManager is trying to be closed - device disconnecting */
291         SHUTTING_DOWN;
292     }
293 }