BUG 5523 DeviceManager handler for connection and disconnection too
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import javax.annotation.Nonnull;
18 import javax.annotation.Nullable;
19 import javax.annotation.concurrent.GuardedBy;
20 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
21 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
22 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
23 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
25 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
28 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
29 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
32 import org.opendaylight.yangtools.yang.binding.DataObject;
33 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
34 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 /**
39  * openflowplugin-impl
40  * org.opendaylight.openflowplugin.impl.device
41  * <p/>
42  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
43  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
44  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
45  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
46  *
47  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
48  *         </p>
49  *         Created: Apr 2, 2015
50  */
51 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
52
53     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
54
55     private final Object txLock = new Object();
56     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
57     private final DataBroker dataBroker;
58
59     @GuardedBy("txLock")
60     private WriteTransaction wTx;
61     @GuardedBy("txLock")
62     private BindingTransactionChain txChainFactory;
63     @GuardedBy("txLock")
64     private boolean submitIsEnabled;
65     @GuardedBy("txLock")
66     private ListenableFuture<Void> lastSubmittedFuture;
67
68     public TransactionChainManagerStatus getTransactionChainManagerStatus() {
69         return transactionChainManagerStatus;
70     }
71
72     @GuardedBy("txLock")
73     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
74
75     TransactionChainManager(@Nonnull final DataBroker dataBroker, @Nonnull final DeviceState deviceState) {
76         this.dataBroker = Preconditions.checkNotNull(dataBroker);
77         this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier());
78         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
79         lastSubmittedFuture = Futures.immediateFuture(null);
80         LOG.debug("created txChainManager for {}", nodeII);
81     }
82
83     private NodeId nodeId() {
84         return nodeII.getKey().getId();
85     }
86
87     @GuardedBy("txLock")
88     private void createTxChain() {
89         if (txChainFactory != null) {
90             txChainFactory.close();
91         }
92         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
93     }
94
95     void initialSubmitWriteTransaction() {
96         enableSubmit();
97         submitWriteTransaction();
98     }
99
100     /**
101      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
102      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
103      * transactions. Call this method for MASTER role only.
104      */
105     public void activateTransactionManager() {
106         LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", nodeId(), submitIsEnabled);
107         synchronized (txLock) {
108             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
109                 LOG.debug("Transaction Factory create {}", nodeId());
110                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
111                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
112                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
113                 this.submitIsEnabled = false;
114                 createTxChain();
115             } else {
116                 LOG.debug("Transaction is active {}", nodeId());
117             }
118         }
119     }
120
121     /**
122      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
123      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
124      * Call this method for SLAVE only.
125      * @return Future
126      */
127     public ListenableFuture<Void> deactivateTransactionManager() {
128         final ListenableFuture<Void> future;
129         synchronized (txLock) {
130             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
131                 LOG.debug("Submitting all transactions if we were in status WORKING for Node", nodeId());
132                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
133                 future = txChainShuttingDown();
134                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
135                 LOG.debug("Transaction Factory delete for Node {}", nodeId());
136                 Futures.addCallback(future, new FutureCallback<Void>() {
137                     @Override
138                     public void onSuccess(final Void result) {
139                         txChainFactory.close();
140                         txChainFactory = null;
141                     }
142
143                     @Override
144                     public void onFailure(final Throwable t) {
145                         txChainFactory.close();
146                         txChainFactory = null;
147                     }
148                 });
149             } else {
150                 // TODO : ignoring redundant deactivate invocation
151                 future = Futures.immediateCheckedFuture(null);
152             }
153         }
154         return future;
155     }
156
157     boolean submitWriteTransaction() {
158         synchronized (txLock) {
159             if (!submitIsEnabled) {
160                 LOG.trace("transaction not committed - submit block issued");
161                 return false;
162             }
163             if (wTx == null) {
164                 LOG.trace("nothing to commit - submit returns true");
165                 return true;
166             }
167             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
168                     "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
169             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
170             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
171                 @Override
172                 public void onSuccess(final Void result) {
173                     //no action required
174                 }
175
176                 @Override
177                 public void onFailure(final Throwable t) {
178                     if (t instanceof TransactionCommitFailedException) {
179                         LOG.error("Transaction commit failed. {}", t);
180                     } else {
181                         LOG.error("Exception during transaction submitting. {}", t);
182                     }
183                 }
184             });
185             lastSubmittedFuture = submitFuture;
186             wTx = null;
187         }
188         return true;
189     }
190
191     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
192                                                              final InstanceIdentifier<T> path) {
193         final WriteTransaction writeTx = getTransactionSafely();
194         if (writeTx != null) {
195             writeTx.delete(store, path);
196         } else {
197             LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
198         }
199     }
200
201     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
202                                                    final InstanceIdentifier<T> path, final T data) {
203         final WriteTransaction writeTx = getTransactionSafely();
204         if (writeTx != null) {
205             writeTx.put(store, path, data);
206         } else {
207             LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
208         }
209     }
210
211     @Override
212     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
213                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
214         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
215             LOG.warn("txChain failed -> recreating", cause);
216             recreateTxChain();
217         }
218     }
219
220     @Override
221     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
222         // NOOP
223     }
224
225     private void recreateTxChain() {
226         synchronized (txLock) {
227             createTxChain();
228             wTx = null;
229         }
230     }
231
232     @Nullable
233     private WriteTransaction getTransactionSafely() {
234         if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
235             synchronized (txLock) {
236                 if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
237                     if (wTx == null && txChainFactory != null) {
238                         wTx = txChainFactory.newWriteOnlyTransaction();
239                     }
240                 }
241             }
242         }
243         return wTx;
244     }
245
246     @VisibleForTesting
247     void enableSubmit() {
248         synchronized (txLock) {
249             /* !!!IMPORTANT: never set true without txChainFactory */
250             submitIsEnabled = txChainFactory != null;
251         }
252     }
253
254     ListenableFuture<Void> shuttingDown() {
255         LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII);
256         ListenableFuture<Void> future;
257         synchronized (txLock) {
258             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
259             future = txChainShuttingDown();
260         }
261         return future;
262     }
263
264     @GuardedBy("txLock")
265     private ListenableFuture<Void> txChainShuttingDown() {
266         submitIsEnabled = false;
267         ListenableFuture<Void> future;
268         if (txChainFactory == null) {
269             // stay with actual thread
270             future = Futures.immediateCheckedFuture(null);
271         } else if (wTx == null) {
272             // hijack md-sal thread
273             future = lastSubmittedFuture;
274         } else {
275             // hijack md-sal thread
276             future = wTx.submit();
277             wTx = null;
278         }
279         return future;
280     }
281
282     @Override
283     public void close() {
284         LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN, will wait for ownershipservice to notify", nodeII);
285         Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
286         Preconditions.checkState(wTx == null);
287         synchronized (txLock) {
288             if (txChainFactory != null) {
289                 txChainFactory.close();
290                 txChainFactory = null;
291             }
292         }
293         Preconditions.checkState(txChainFactory == null);
294     }
295
296     private enum TransactionChainManagerStatus {
297         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
298         WORKING,
299         /** txChainManager is working - is active (MASTER) */
300         SLEEPING,
301         /** txChainManager is trying to be closed - device disconnecting */
302         SHUTTING_DOWN;
303     }
304 }