d85dab8f59632457938b55b0344792df1767e0c4
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import javax.annotation.Nonnull;
18 import javax.annotation.Nullable;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
29 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
33 import org.opendaylight.yangtools.yang.binding.DataObject;
34 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 /**
40  * openflowplugin-impl
41  * org.opendaylight.openflowplugin.impl.device
42  * <p/>
43  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
44  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
45  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
46  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
47  *
48  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
49  *         </p>
50  *         Created: Apr 2, 2015
51  */
52 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
53
54     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
55
56     private final Object txLock = new Object();
57     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
58     private final DataBroker dataBroker;
59     private final LifecycleConductor conductor;
60
61     @GuardedBy("txLock")
62     private WriteTransaction wTx;
63     @GuardedBy("txLock")
64     private BindingTransactionChain txChainFactory;
65     @GuardedBy("txLock")
66     private boolean submitIsEnabled;
67     @GuardedBy("txLock")
68     private ListenableFuture<Void> lastSubmittedFuture;
69
70     private boolean initCommit;
71
72     public TransactionChainManagerStatus getTransactionChainManagerStatus() {
73         return transactionChainManagerStatus;
74     }
75
76     @GuardedBy("txLock")
77     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
78
79     TransactionChainManager(@Nonnull final DataBroker dataBroker,
80                             @Nonnull final DeviceState deviceState,
81                             @Nonnull final LifecycleConductor conductor) {
82         this.dataBroker = Preconditions.checkNotNull(dataBroker);
83         this.conductor = Preconditions.checkNotNull(conductor);
84         this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier());
85         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
86         lastSubmittedFuture = Futures.immediateFuture(null);
87         LOG.debug("created txChainManager for {}", nodeII);
88     }
89
90     private NodeId nodeId() {
91         return nodeII.getKey().getId();
92     }
93
94     @GuardedBy("txLock")
95     private void createTxChain() {
96         if (txChainFactory != null) {
97             txChainFactory.close();
98         }
99         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
100     }
101
102     void initialSubmitWriteTransaction() {
103         enableSubmit();
104         submitWriteTransaction();
105     }
106
107     /**
108      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
109      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
110      * transactions. Call this method for MASTER role only.
111      */
112     public void activateTransactionManager() {
113         LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", nodeId(), submitIsEnabled);
114         synchronized (txLock) {
115             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
116                 LOG.debug("Transaction Factory create {}", nodeId());
117                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
118                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
119                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
120                 this.submitIsEnabled = false;
121                 this.initCommit = true;
122                 createTxChain();
123             } else {
124                 LOG.debug("Transaction is active {}", nodeId());
125             }
126         }
127     }
128
129     /**
130      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
131      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
132      * Call this method for SLAVE only.
133      * @return Future
134      */
135     public ListenableFuture<Void> deactivateTransactionManager() {
136         final ListenableFuture<Void> future;
137         synchronized (txLock) {
138             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
139                 LOG.debug("Submitting all transactions if we were in status WORKING for Node {}", nodeId());
140                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
141                 future = txChainShuttingDown();
142                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
143                 LOG.debug("Transaction Factory deactivate for Node {}", nodeId());
144                 Futures.addCallback(future, new FutureCallback<Void>() {
145                     @Override
146                     public void onSuccess(final Void result) {
147                         txChainFactory.close();
148                         txChainFactory = null;
149                     }
150
151                     @Override
152                     public void onFailure(final Throwable t) {
153                         txChainFactory.close();
154                         txChainFactory = null;
155                     }
156                 });
157             } else {
158                 // TODO : ignoring redundant deactivate invocation
159                 future = Futures.immediateCheckedFuture(null);
160             }
161         }
162         return future;
163     }
164
165     boolean submitWriteTransaction() {
166         synchronized (txLock) {
167             if (!submitIsEnabled) {
168                 LOG.trace("transaction not committed - submit block issued");
169                 return false;
170             }
171             if (wTx == null) {
172                 LOG.trace("nothing to commit - submit returns true");
173                 return true;
174             }
175             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
176                     "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
177             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
178             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
179                 @Override
180                 public void onSuccess(final Void result) {
181                     if (initCommit) {
182                         initCommit = false;
183                     }
184                 }
185
186                 @Override
187                 public void onFailure(final Throwable t) {
188                     if (t instanceof TransactionCommitFailedException) {
189                         LOG.error("Transaction commit failed. {}", t);
190                     } else {
191                         LOG.error("Exception during transaction submitting. {}", t);
192                     }
193                     if (initCommit) {
194                         LOG.error("Initial commit failed. {}", t);
195                         conductor.closeConnection(nodeId());
196                     }
197                 }
198             });
199             lastSubmittedFuture = submitFuture;
200             wTx = null;
201         }
202         return true;
203     }
204
205     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
206                                                              final InstanceIdentifier<T> path) throws Exception {
207         final WriteTransaction writeTx = getTransactionSafely();
208         if (writeTx != null) {
209             LOG.trace("addDeleteOperation called with path {} ", path);
210             writeTx.delete(store, path);
211         } else {
212             LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
213             throw new Exception("Cannot write into transaction.");
214         }
215     }
216
217     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
218                                                    final InstanceIdentifier<T> path,
219                                                    final T data,
220                                                    final boolean createParents) throws Exception {
221         final WriteTransaction writeTx = getTransactionSafely();
222         if (writeTx != null) {
223             LOG.trace("writeToTransaction called with path {} ", path);
224             writeTx.put(store, path, data, createParents);
225         } else {
226             LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
227             throw new Exception("Cannot write into transaction.");
228         }
229     }
230
231     @Override
232     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
233                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
234         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
235             LOG.warn("txChain failed -> recreating due to {}", cause);
236             recreateTxChain();
237         }
238     }
239
240     @Override
241     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
242         // NOOP
243     }
244
245     private void recreateTxChain() {
246         synchronized (txLock) {
247             createTxChain();
248             wTx = null;
249         }
250     }
251
252     @Nullable
253     private WriteTransaction getTransactionSafely() {
254         if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
255             synchronized (txLock) {
256                 if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
257                     if (wTx == null && txChainFactory != null) {
258                         wTx = txChainFactory.newWriteOnlyTransaction();
259                     }
260                 }
261             }
262         }
263         return wTx;
264     }
265
266     @VisibleForTesting
267     void enableSubmit() {
268         synchronized (txLock) {
269             /* !!!IMPORTANT: never set true without txChainFactory */
270             submitIsEnabled = txChainFactory != null;
271         }
272     }
273
274     ListenableFuture<Void> shuttingDown() {
275         LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeII);
276         ListenableFuture<Void> future;
277         synchronized (txLock) {
278             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
279             future = txChainShuttingDown();
280         }
281         return future;
282     }
283
284     @GuardedBy("txLock")
285     private ListenableFuture<Void> txChainShuttingDown() {
286         submitIsEnabled = false;
287         ListenableFuture<Void> future;
288         if (txChainFactory == null) {
289             // stay with actual thread
290             future = Futures.immediateCheckedFuture(null);
291         } else if (wTx == null) {
292             // hijack md-sal thread
293             future = lastSubmittedFuture;
294         } else {
295             // hijack md-sal thread
296             future = wTx.submit();
297             wTx = null;
298         }
299         return future;
300     }
301
302     @Override
303     public void close() {
304         LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}, will wait for ownershipservice to notify"
305                 , nodeII);
306         Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
307         Preconditions.checkState(wTx == null);
308         synchronized (txLock) {
309             if (txChainFactory != null) {
310                 txChainFactory.close();
311                 txChainFactory = null;
312             }
313         }
314         Preconditions.checkState(txChainFactory == null);
315     }
316
317     private enum TransactionChainManagerStatus {
318         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
319         WORKING,
320         /** txChainManager is working - is active (MASTER) */
321         SLEEPING,
322         /** txChainManager is trying to be closed - device disconnecting */
323         SHUTTING_DOWN;
324     }
325 }