Bug 6176 - Decrease logging level in Sal-F/G/M-Service and use synchronized BiMap
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17
18 import java.util.Objects;
19 import java.util.concurrent.CancellationException;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import javax.annotation.Nullable;
23 import javax.annotation.concurrent.GuardedBy;
24 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
25 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
26 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
27 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
28 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
30 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
31 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
32 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
33 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
34 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
38 import org.opendaylight.yangtools.yang.binding.DataObject;
39 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
40 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * openflowplugin-impl
46  * org.opendaylight.openflowplugin.impl.device
47  * <p/>
48  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
49  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
50  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
51  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
52  */
53 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
54
55     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
56     private static final Long RETRY_DELAY = 100L;
57     private static final int RETRY_COUNT = 3;
58
59     private final Object txLock = new Object();
60     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
61     private final DataBroker dataBroker;
62     private LifecycleService lifecycleService;
63
64     @GuardedBy("txLock")
65     private WriteTransaction wTx;
66     @GuardedBy("txLock")
67     private BindingTransactionChain txChainFactory;
68     @GuardedBy("txLock")
69     private boolean submitIsEnabled;
70     @GuardedBy("txLock")
71     private ListenableFuture<Void> lastSubmittedFuture;
72
73     private boolean initCommit;
74
75     @GuardedBy("txLock")
76     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
77
78     TransactionChainManager(@Nonnull final DataBroker dataBroker,
79                             @Nonnull final DeviceInfo deviceInfo) {
80         this.dataBroker = Preconditions.checkNotNull(dataBroker);
81         this.nodeII = deviceInfo.getNodeInstanceIdentifier();
82         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
83         lastSubmittedFuture = Futures.immediateFuture(null);
84         LOG.debug("created txChainManager for {}", this.nodeII);
85     }
86
87     private NodeId nodeId() {
88         return nodeII.getKey().getId();
89     }
90
91     @GuardedBy("txLock")
92     private void createTxChain() {
93         if (txChainFactory != null) {
94             txChainFactory.close();
95         }
96         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
97     }
98
99     public void setLifecycleService(final LifecycleService lifecycleService) {
100         this.lifecycleService = lifecycleService;
101     }
102
103     void initialSubmitWriteTransaction() {
104         enableSubmit();
105         submitWriteTransaction();
106     }
107
108     /**
109      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
110      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
111      * transactions. Call this method for MASTER role only.
112      */
113     void activateTransactionManager() {
114         LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", nodeId(), submitIsEnabled);
115         synchronized (txLock) {
116             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
117                 LOG.debug("Transaction Factory create {}", nodeId());
118                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
119                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
120                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
121                 this.submitIsEnabled = false;
122                 this.initCommit = true;
123                 createTxChain();
124             } else {
125                 LOG.debug("Transaction is active {}", nodeId());
126             }
127         }
128     }
129
130     /**
131      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
132      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
133      * Call this method for SLAVE only.
134      * @return Future
135      */
136     ListenableFuture<Void> deactivateTransactionManager() {
137         final ListenableFuture<Void> future;
138         synchronized (txLock) {
139             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
140                 LOG.debug("Submitting all transactions if we were in status WORKING for Node {}", nodeId());
141                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
142                 future = txChainShuttingDown();
143                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
144                 LOG.debug("Transaction Factory deactivate for Node {}", nodeId());
145                 Futures.addCallback(future, new FutureCallback<Void>() {
146                     @Override
147                     public void onSuccess(final Void result) {
148                         txChainFactory.close();
149                         txChainFactory = null;
150                     }
151
152                     @Override
153                     public void onFailure(final Throwable t) {
154                         txChainFactory.close();
155                         txChainFactory = null;
156                     }
157                 });
158             } else {
159                 // TODO : ignoring redundant deactivate invocation
160                 future = Futures.immediateCheckedFuture(null);
161             }
162         }
163         return future;
164     }
165
166     boolean submitWriteTransaction() {
167         synchronized (txLock) {
168             if (!submitIsEnabled) {
169                 LOG.trace("transaction not committed - submit block issued");
170                 return false;
171             }
172             if (wTx == null) {
173                 LOG.trace("nothing to commit - submit returns true");
174                 return true;
175             }
176             Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
177                     "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
178             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
179             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
180                 @Override
181                 public void onSuccess(final Void result) {
182                     if (initCommit) {
183                         initCommit = false;
184                     }
185                 }
186
187                 @Override
188                 public void onFailure(final Throwable t) {
189                     if (t instanceof TransactionCommitFailedException) {
190                         LOG.error("Transaction commit failed. ", t);
191                     } else {
192                         if (t instanceof CancellationException) {
193                             LOG.warn("Submit task was canceled");
194                             LOG.trace("Submit exception: ", t);
195                         } else {
196                             LOG.error("Exception during transaction submitting. ", t);
197                         }
198                     }
199                     if (initCommit) {
200                         LOG.warn("Initial commit failed. ", t);
201                         wTx = null;
202                         if (Objects.nonNull(lifecycleService)) {
203                             lifecycleService.closeConnection();
204                         }
205                     }
206                 }
207             });
208             lastSubmittedFuture = submitFuture;
209             wTx = null;
210         }
211         return true;
212     }
213
214     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
215                                                              final InstanceIdentifier<T> path){
216         final WriteTransaction writeTx = getTransactionSafely();
217         if (writeTx != null) {
218             LOG.trace("addDeleteOperation called with path {} ", path);
219             writeTx.delete(store, path);
220         } else {
221             LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
222             throw new TransactionChainClosedException("Cannot write into transaction.");
223         }
224     }
225
226     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
227                                                    final InstanceIdentifier<T> path,
228                                                    final T data,
229                                                    final boolean createParents){
230         final WriteTransaction writeTx = getTransactionSafely();
231         if (writeTx != null) {
232             LOG.trace("writeToTransaction called with path {} ", path);
233             writeTx.put(store, path, data, createParents);
234         } else {
235             LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
236             throw new TransactionChainClosedException("Cannot write into transaction.");
237         }
238     }
239
240     @Override
241     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
242                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
243         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
244             LOG.warn("txChain failed -> recreating due to {}", cause);
245             recreateTxChain();
246         }
247     }
248
249     @Override
250     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
251         // NOOP
252     }
253
254     private void recreateTxChain() {
255         synchronized (txLock) {
256             createTxChain();
257             wTx = null;
258         }
259     }
260
261     @Nullable
262     private WriteTransaction getTransactionSafely() {
263             synchronized (txLock) {
264                 int trial = 0;
265                 while(trial <=RETRY_COUNT) {
266                     if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
267                         if (wTx == null && txChainFactory != null) {
268                             wTx = txChainFactory.newWriteOnlyTransaction();
269                             break;
270                         }
271                     } else {
272                         try {
273                             TimeUnit.MILLISECONDS.sleep(RETRY_DELAY);
274                         } catch (InterruptedException e) {
275                             LOG.debug("Timer interrupted in getTransactionSafely : {}", e);
276                         }
277                         trial++;
278                     }
279                 }
280             }
281         return wTx;
282     }
283
284     @VisibleForTesting
285     void enableSubmit() {
286         synchronized (txLock) {
287             /* !!!IMPORTANT: never set true without txChainFactory */
288             submitIsEnabled = txChainFactory != null;
289         }
290     }
291
292     ListenableFuture<Void> shuttingDown() {
293         LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeII.getKey().getId().getValue());
294         ListenableFuture<Void> future;
295         synchronized (txLock) {
296             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
297             future = txChainShuttingDown();
298         }
299         return future;
300     }
301
302     @GuardedBy("txLock")
303     private ListenableFuture<Void> txChainShuttingDown() {
304         submitIsEnabled = false;
305         ListenableFuture<Void> future;
306         if (txChainFactory == null) {
307             // stay with actual thread
308             future = Futures.immediateCheckedFuture(null);
309         } else if (wTx == null) {
310             // hijack md-sal thread
311             future = lastSubmittedFuture;
312         } else {
313             // hijack md-sal thread
314             future = wTx.submit();
315             wTx = null;
316         }
317         return future;
318     }
319
320     @Override
321     public void close() {
322         LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}, will wait for ownershipservice to notify"
323                 , nodeII);
324         Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
325         Preconditions.checkState(wTx == null);
326         synchronized (txLock) {
327             if (txChainFactory != null) {
328                 txChainFactory.close();
329                 txChainFactory = null;
330             }
331         }
332         Preconditions.checkState(txChainFactory == null);
333     }
334
335     private enum TransactionChainManagerStatus {
336         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
337         WORKING,
338         /** txChainManager is working - is active (MASTER) */
339         SLEEPING,
340         /** txChainManager is trying to be closed - device disconnecting */
341         SHUTTING_DOWN;
342     }
343 }