2a09f79d0c6feb5b90fc9a34e3d82d5e422eddeb
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timeout;
18 import io.netty.util.TimerTask;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
28 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.yangtools.yang.binding.DataObject;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * openflowplugin-impl
37  * org.opendaylight.openflowplugin.impl.device
38  * <p/>
39  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
40  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
41  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
42  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
43  *
44  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
45  *         <p/>
46  *         Created: Apr 2, 2015
47  */
48 @VisibleForTesting
49 class TransactionChainManager implements TransactionChainListener {
50
51     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
52
53     private final HashedWheelTimer hashedWheelTimer;
54     private final DataBroker dataBroker;
55     private final long maxTx;
56     private final long timerValue;
57     private BindingTransactionChain txChainFactory;
58     private WriteTransaction wTx;
59     private Timeout submitTaskTime;
60     private long nrOfActualTx;
61     private boolean submitIsEnabled;
62
63     TransactionChainManager(@Nonnull final DataBroker dataBroker,
64                             @Nonnull final HashedWheelTimer hashedWheelTimer,
65                             final long maxTx,
66                             final long timerValue) {
67         this.dataBroker = Preconditions.checkNotNull(dataBroker);
68         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
69         this.maxTx = maxTx;
70         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
71         nrOfActualTx = 0L;
72         this.timerValue = timerValue;
73         LOG.debug("created txChainManager with operation limit {}", maxTx);
74     }
75
76
77     public void commitOperationsGatheredInOneTransaction() {
78         enableSubmit();
79         submitTransaction();
80     }
81
82     public void startGatheringOperationsToOneTransaction() {
83         submitIsEnabled = false;
84     }
85
86     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
87                                                    final InstanceIdentifier<T> path, final T data) {
88         try {
89             final WriteTransaction writeTx = getTransactionSafely();
90             writeTx.put(store, path, data);
91             countTxInAndCommit();
92         } catch (final Exception e) {
93             LOG.warn("failed to put into writeOnlyTransaction: {}", e.getMessage());
94             LOG.trace("failed to put into writeOnlyTransaction.. ", e);
95         }
96     }
97
98     private WriteTransaction getTransactionSafely() {
99         if (wTx == null) {
100             wTx = txChainFactory.newWriteOnlyTransaction();
101         }
102         return wTx;
103     }
104
105     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
106                                                              final InstanceIdentifier<T> path) {
107         try {
108             final WriteTransaction writeTx = getTransactionSafely();
109             writeTx.delete(store, path);
110             countTxInAndCommit();
111         } catch (final Exception e) {
112             LOG.warn("failed to put into writeOnlyTransaction : {}", e.getMessage());
113             LOG.trace("failed to put into writeOnlyTransaction.. ", e);
114         }
115     }
116
117     private void countTxInAndCommit() {
118         nrOfActualTx += 1L;
119         if (nrOfActualTx >= maxTx) {
120             submitTransaction();
121         }
122     }
123
124     void submitScheduledTransaction(final Timeout timeout) {
125         if (timeout.isCancelled()) {
126             // zombie timer executed
127             return;
128         }
129
130         if (submitIsEnabled) {
131             if (nrOfActualTx > 0L) {
132                 submitTransaction();
133             }
134         } else {
135             LOG.info("transaction submit task will not be scheduled - submit block issued.");
136         }
137     }
138
139     void submitTransaction() {
140         if (submitIsEnabled) {
141             if (wTx != null && nrOfActualTx > 0) {
142                 LOG.trace("submitting transaction, counter: {}", nrOfActualTx);
143                 final CheckedFuture<Void, TransactionCommitFailedException> submitResult = wTx.submit();
144                 try {
145                     submitResult.get();
146                 } catch (ExecutionException | InterruptedException e) {
147                     recreateTxChain();
148                 }
149                 hookTimeExpenseCounter(submitResult, String.valueOf(wTx.getIdentifier()) + "::" + nrOfActualTx);
150                 wTx = null;
151                 nrOfActualTx = 0L;
152             }
153             if (submitTaskTime != null) {
154                 // if possible then cancel current timer (even if being executed via timer)
155                 submitTaskTime.cancel();
156             }
157             submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() {
158                 @Override
159                 public void run(final Timeout timeout) throws Exception {
160                     submitScheduledTransaction(timeout);
161                 }
162             }, timerValue, TimeUnit.MILLISECONDS);
163
164         } else {
165             LOG.trace("transaction not committed - submit block issued");
166         }
167     }
168
169     private static void hookTimeExpenseCounter(final CheckedFuture<Void, TransactionCommitFailedException> submitResult, final String name) {
170         final long submitFiredTime = System.nanoTime();
171         LOG.debug("submit of {} fired", name);
172         Futures.addCallback(submitResult, new FutureCallback<Void>() {
173             @Override
174             public void onSuccess(final Void result) {
175                 LOG.debug("submit of {} finished in {} ms", name, System.nanoTime() - submitFiredTime);
176             }
177
178             @Override
179             public void onFailure(final Throwable t) {
180                 LOG.warn("transaction submit failed: {}", t.getMessage());
181             }
182         });
183     }
184
185     void enableSubmit() {
186         submitIsEnabled = true;
187     }
188
189     @Override
190     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
191                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
192         LOG.warn("txChain failed -> recreating", cause);
193         recreateTxChain();
194     }
195
196     private void recreateTxChain() {
197         txChainFactory.close();
198         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
199     }
200
201     @Override
202     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
203         // NOOP - only yet, here is probably place for notification to get new WriteTransaction
204     }
205
206 }