bc40b88b387bbff230f6d50917cc94953fb722d9
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
1 /**
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.impl.device;
10
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timeout;
18 import io.netty.util.TimerTask;
19 import java.util.concurrent.TimeUnit;
20 import javax.annotation.Nonnull;
21 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
22 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
23 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
24 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
26 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
28 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
29 import org.opendaylight.yangtools.yang.binding.DataObject;
30 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 /**
35  * openflowplugin-impl
36  * org.opendaylight.openflowplugin.impl.device
37  * <p/>
38  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
39  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
40  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
41  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
42  *
43  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
44  *         <p/>
45  *         Created: Apr 2, 2015
46  */
47 @VisibleForTesting
48 class TransactionChainManager implements TransactionChainListener {
49
50     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
51
52     private final HashedWheelTimer hashedWheelTimer;
53     private final DataBroker dataBroker;
54     private final long maxTx;
55     private final long timerValue;
56     private BindingTransactionChain txChainFactory;
57     private WriteTransaction wTx;
58     private Timeout submitTaskTime;
59     private long nrOfActualTx;
60     private boolean submitIsEnabled;
61
62     TransactionChainManager(@Nonnull final DataBroker dataBroker,
63                             @Nonnull final HashedWheelTimer hashedWheelTimer,
64                             final long maxTx,
65                             final long timerValue) {
66         this.dataBroker = Preconditions.checkNotNull(dataBroker);
67         this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
68         this.maxTx = maxTx;
69         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
70         nrOfActualTx = 0L;
71         this.timerValue = timerValue;
72         LOG.debug("created txChainManager with operation limit {}", maxTx);
73     }
74
75
76     public void commitOperationsGatheredInOneTransaction(){
77         enableSubmit();
78         submitTransaction();
79     }
80     public void startGatheringOperationsToOneTransaction(){
81         submitIsEnabled = false;
82     }
83
84     synchronized <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
85                                                                 final InstanceIdentifier<T> path, final T data) {
86         if (wTx == null) {
87             wTx = txChainFactory.newWriteOnlyTransaction();
88         }
89         wTx.put(store, path, data);
90         countTxInAndCommit();
91     }
92
93     synchronized <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
94                                                                           final InstanceIdentifier<T> path) {
95         if (wTx == null) {
96             wTx = txChainFactory.newWriteOnlyTransaction();
97         }
98         wTx.delete(store, path);
99         countTxInAndCommit();
100     }
101
102     private void countTxInAndCommit() {
103         nrOfActualTx += 1L;
104         if (nrOfActualTx >= maxTx) {
105             submitTransaction();
106         }
107     }
108
109     synchronized void submitScheduledTransaction(Timeout timeout) {
110         if (timeout.isCancelled()) {
111             // zombie timer executed
112             return;
113         }
114
115         if (submitIsEnabled) {
116             submitTransaction();
117         } else {
118             LOG.info("transaction submit task will not be scheduled - submit block issued.");
119         }
120     }
121
122     synchronized void submitTransaction() {
123         if (submitIsEnabled) {
124             if (wTx != null && nrOfActualTx > 0) {
125                 LOG.trace("submitting transaction, counter: {}", nrOfActualTx);
126                 CheckedFuture<Void, TransactionCommitFailedException> submitResult = wTx.submit();
127                 hookTimeExpenseCounter(submitResult, String.valueOf(wTx.getIdentifier()));
128                 wTx = null;
129                 nrOfActualTx = 0L;
130             }
131             if (submitTaskTime != null) {
132                 // if possible then cancel current timer (even if being executed via timer)
133                 submitTaskTime.cancel();
134             }
135             submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() {
136                 @Override
137                 public void run(final Timeout timeout) throws Exception {
138                     submitScheduledTransaction(timeout);
139                 }
140             }, timerValue, TimeUnit.MILLISECONDS);
141
142         } else {
143             LOG.debug("transaction not committed - submit block issued");
144         }
145     }
146
147     private void hookTimeExpenseCounter(CheckedFuture<Void, TransactionCommitFailedException> submitResult, final String name) {
148         final long submitFiredTime = System.currentTimeMillis();
149         LOG.debug("submit of {} fired", name);
150         Futures.addCallback(submitResult, new FutureCallback<Void>() {
151             @Override
152             public void onSuccess(Void result) {
153                 LOG.debug("submit of {} finished in {} ms", name, System.currentTimeMillis() - submitFiredTime);
154             }
155
156             @Override
157             public void onFailure(Throwable t) {
158                 LOG.warn("transaction submit failed: {}", t.getMessage());
159             }
160         });
161     }
162
163     synchronized void enableSubmit() {
164         submitIsEnabled = true;
165     }
166
167     @Override
168     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
169                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
170         LOG.warn("txChain failed -> recreating", cause);
171         txChainFactory.close();
172         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
173     }
174
175     @Override
176     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
177         // NOOP - only yet, here is probably place for notification to get new WriteTransaction
178     }
179
180 }