7ccdc42824ccb132454e54f60500ff5de2cae776
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17
18 import java.util.Objects;
19 import java.util.concurrent.CancellationException;
20 import javax.annotation.Nonnull;
21 import javax.annotation.Nullable;
22 import javax.annotation.concurrent.GuardedBy;
23 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
24 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
25 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
28 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
32 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
33 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
37 import org.opendaylight.yangtools.yang.binding.DataObject;
38 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * openflowplugin-impl
45  * org.opendaylight.openflowplugin.impl.device
46  * <p/>
47  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
48  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
49  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
50  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
51  */
52 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
53
54     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
55
56     private final Object txLock = new Object();
57     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
58     private final DataBroker dataBroker;
59     private LifecycleService lifecycleService;
60
61     @GuardedBy("txLock")
62     private WriteTransaction wTx;
63     @GuardedBy("txLock")
64     private BindingTransactionChain txChainFactory;
65     @GuardedBy("txLock")
66     private boolean submitIsEnabled;
67     @GuardedBy("txLock")
68     private ListenableFuture<Void> lastSubmittedFuture;
69
70     private boolean initCommit;
71
72     @GuardedBy("txLock")
73     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
74
75     TransactionChainManager(@Nonnull final DataBroker dataBroker,
76                             @Nonnull final DeviceInfo deviceInfo) {
77         this.dataBroker = Preconditions.checkNotNull(dataBroker);
78         this.nodeII = deviceInfo.getNodeInstanceIdentifier();
79         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
80         lastSubmittedFuture = Futures.immediateFuture(null);
81         LOG.debug("created txChainManager for {}", this.nodeII);
82     }
83
84     private NodeId nodeId() {
85         return nodeII.getKey().getId();
86     }
87
88     @GuardedBy("txLock")
89     private void createTxChain() {
90         if (txChainFactory != null) {
91             txChainFactory.close();
92         }
93         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
94     }
95
96     public void setLifecycleService(final LifecycleService lifecycleService) {
97         this.lifecycleService = lifecycleService;
98     }
99
100     void initialSubmitWriteTransaction() {
101         enableSubmit();
102         submitWriteTransaction();
103     }
104
105     /**
106      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
107      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
108      * transactions. Call this method for MASTER role only.
109      */
110     void activateTransactionManager() {
111         LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", nodeId(), submitIsEnabled);
112         synchronized (txLock) {
113             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
114                 LOG.debug("Transaction Factory create {}", nodeId());
115                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
116                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
117                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
118                 this.submitIsEnabled = false;
119                 this.initCommit = true;
120                 createTxChain();
121             } else {
122                 LOG.debug("Transaction is active {}", nodeId());
123             }
124         }
125     }
126
127     /**
128      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
129      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
130      * Call this method for SLAVE only.
131      * @return Future
132      */
133     ListenableFuture<Void> deactivateTransactionManager() {
134         final ListenableFuture<Void> future;
135         synchronized (txLock) {
136             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
137                 LOG.debug("Submitting all transactions if we were in status WORKING for Node {}", nodeId());
138                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
139                 future = txChainShuttingDown();
140                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
141                 LOG.debug("Transaction Factory deactivate for Node {}", nodeId());
142                 Futures.addCallback(future, new FutureCallback<Void>() {
143                     @Override
144                     public void onSuccess(final Void result) {
145                         txChainFactory.close();
146                         txChainFactory = null;
147                     }
148
149                     @Override
150                     public void onFailure(final Throwable t) {
151                         txChainFactory.close();
152                         txChainFactory = null;
153                     }
154                 });
155             } else {
156                 // TODO : ignoring redundant deactivate invocation
157                 future = Futures.immediateCheckedFuture(null);
158             }
159         }
160         return future;
161     }
162
163     boolean submitWriteTransaction() {
164         synchronized (txLock) {
165             if (!submitIsEnabled) {
166                 LOG.trace("transaction not committed - submit block issued");
167                 return false;
168             }
169             if (wTx == null) {
170                 LOG.trace("nothing to commit - submit returns true");
171                 return true;
172             }
173             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
174                     "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
175             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
176             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
177                 @Override
178                 public void onSuccess(final Void result) {
179                     if (initCommit) {
180                         initCommit = false;
181                     }
182                 }
183
184                 @Override
185                 public void onFailure(final Throwable t) {
186                     if (t instanceof TransactionCommitFailedException) {
187                         LOG.error("Transaction commit failed. ", t);
188                     } else {
189                         if (t instanceof CancellationException) {
190                             LOG.warn("Submit task was canceled");
191                             LOG.trace("Submit exception: ", t);
192                         } else {
193                             LOG.error("Exception during transaction submitting. ", t);
194                         }
195                     }
196                     if (initCommit) {
197                         LOG.warn("Initial commit failed. ", t);
198                         wTx = null;
199                         if (Objects.nonNull(lifecycleService)) {
200                             lifecycleService.closeConnection();
201                         }
202                     }
203                 }
204             });
205             lastSubmittedFuture = submitFuture;
206             wTx = null;
207         }
208         return true;
209     }
210
211     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
212                                                              final InstanceIdentifier<T> path){
213         final WriteTransaction writeTx = getTransactionSafely();
214         if (writeTx != null) {
215             LOG.trace("addDeleteOperation called with path {} ", path);
216             writeTx.delete(store, path);
217         } else {
218             LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
219             throw new TransactionChainClosedException("Cannot write into transaction.");
220         }
221     }
222
223     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
224                                                    final InstanceIdentifier<T> path,
225                                                    final T data,
226                                                    final boolean createParents){
227         final WriteTransaction writeTx = getTransactionSafely();
228         if (writeTx != null) {
229             LOG.trace("writeToTransaction called with path {} ", path);
230             writeTx.put(store, path, data, createParents);
231         } else {
232             LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
233             throw new TransactionChainClosedException("Cannot write into transaction.");
234         }
235     }
236
237     @Override
238     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
239                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
240         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
241             LOG.warn("txChain failed -> recreating due to {}", cause);
242             recreateTxChain();
243         }
244     }
245
246     @Override
247     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
248         // NOOP
249     }
250
251     private void recreateTxChain() {
252         synchronized (txLock) {
253             createTxChain();
254             wTx = null;
255         }
256     }
257
258     @Nullable
259     private WriteTransaction getTransactionSafely() {
260             synchronized (txLock) {
261                 if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
262                     if (wTx == null && txChainFactory != null) {
263                         wTx = txChainFactory.newWriteOnlyTransaction();
264                     }
265                 }
266             }
267         return wTx;
268     }
269
270     @VisibleForTesting
271     void enableSubmit() {
272         synchronized (txLock) {
273             /* !!!IMPORTANT: never set true without txChainFactory */
274             submitIsEnabled = txChainFactory != null;
275         }
276     }
277
278     ListenableFuture<Void> shuttingDown() {
279         LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeII);
280         ListenableFuture<Void> future;
281         synchronized (txLock) {
282             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
283             future = txChainShuttingDown();
284         }
285         return future;
286     }
287
288     @GuardedBy("txLock")
289     private ListenableFuture<Void> txChainShuttingDown() {
290         submitIsEnabled = false;
291         ListenableFuture<Void> future;
292         if (txChainFactory == null) {
293             // stay with actual thread
294             future = Futures.immediateCheckedFuture(null);
295         } else if (wTx == null) {
296             // hijack md-sal thread
297             future = lastSubmittedFuture;
298         } else {
299             // hijack md-sal thread
300             future = wTx.submit();
301             wTx = null;
302         }
303         return future;
304     }
305
306     @Override
307     public void close() {
308         LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}, will wait for ownershipservice to notify"
309                 , nodeII);
310         Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
311         Preconditions.checkState(wTx == null);
312         synchronized (txLock) {
313             if (txChainFactory != null) {
314                 txChainFactory.close();
315                 txChainFactory = null;
316             }
317         }
318         Preconditions.checkState(txChainFactory == null);
319     }
320
321     private enum TransactionChainManagerStatus {
322         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
323         WORKING,
324         /** txChainManager is working - is active (MASTER) */
325         SLEEPING,
326         /** txChainManager is trying to be closed - device disconnecting */
327         SHUTTING_DOWN;
328     }
329 }