2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.impl.device;
11 import com.google.common.annotations.VisibleForTesting;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import io.netty.util.HashedWheelTimer;
17 import io.netty.util.Timeout;
18 import io.netty.util.TimerTask;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import javax.annotation.Nonnull;
22 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
23 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
24 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
25 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
26 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
27 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
28 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
29 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
30 import org.opendaylight.yangtools.yang.binding.DataObject;
31 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
37 * org.opendaylight.openflowplugin.impl.device
39 * Package protected class for controlling {@link WriteTransaction} life cycle. It is
40 * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
41 * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
42 * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
44 * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
46 * Created: Apr 2, 2015
49 class TransactionChainManager implements TransactionChainListener {
51 private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
53 private final HashedWheelTimer hashedWheelTimer;
54 private final DataBroker dataBroker;
55 private final long maxTx;
56 private final long timerValue;
57 private BindingTransactionChain txChainFactory;
58 private WriteTransaction wTx;
59 private Timeout submitTaskTime;
60 private long nrOfActualTx;
61 private boolean submitIsEnabled;
63 TransactionChainManager(@Nonnull final DataBroker dataBroker,
64 @Nonnull final HashedWheelTimer hashedWheelTimer,
66 final long timerValue) {
67 this.dataBroker = Preconditions.checkNotNull(dataBroker);
68 this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
70 txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
72 this.timerValue = timerValue;
73 LOG.debug("created txChainManager with operation limit {}", maxTx);
77 public void commitOperationsGatheredInOneTransaction() {
82 public void startGatheringOperationsToOneTransaction() {
83 submitIsEnabled = false;
86 <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
87 final InstanceIdentifier<T> path, final T data) {
89 final WriteTransaction writeTx = getTransactionSafely();
90 writeTx.put(store, path, data);
92 } catch (final Exception e) {
93 LOG.warn("failed to put into writeOnlyTransaction: {}", e.getMessage());
94 LOG.trace("failed to put into writeOnlyTransaction.. ", e);
98 private WriteTransaction getTransactionSafely() {
100 wTx = txChainFactory.newWriteOnlyTransaction();
105 <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
106 final InstanceIdentifier<T> path) {
108 final WriteTransaction writeTx = getTransactionSafely();
109 writeTx.delete(store, path);
110 countTxInAndCommit();
111 } catch (final Exception e) {
112 LOG.warn("failed to put into writeOnlyTransaction : {}", e.getMessage());
113 LOG.trace("failed to put into writeOnlyTransaction.. ", e);
117 private void countTxInAndCommit() {
119 if (nrOfActualTx >= maxTx) {
124 void submitScheduledTransaction(final Timeout timeout) {
125 if (timeout.isCancelled()) {
126 // zombie timer executed
130 if (submitIsEnabled) {
131 if (nrOfActualTx > 0L) {
135 LOG.info("transaction submit task will not be scheduled - submit block issued.");
139 void submitTransaction() {
140 if (submitIsEnabled) {
141 if (wTx != null && nrOfActualTx > 0) {
142 LOG.trace("submitting transaction, counter: {}", nrOfActualTx);
143 final CheckedFuture<Void, TransactionCommitFailedException> submitResult = wTx.submit();
146 } catch (ExecutionException | InterruptedException e) {
149 hookTimeExpenseCounter(submitResult, String.valueOf(wTx.getIdentifier()) + "::" + nrOfActualTx);
153 if (submitTaskTime != null) {
154 // if possible then cancel current timer (even if being executed via timer)
155 submitTaskTime.cancel();
157 submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() {
159 public void run(final Timeout timeout) throws Exception {
160 submitScheduledTransaction(timeout);
162 }, timerValue, TimeUnit.MILLISECONDS);
165 LOG.trace("transaction not committed - submit block issued");
169 private static void hookTimeExpenseCounter(final CheckedFuture<Void, TransactionCommitFailedException> submitResult, final String name) {
170 final long submitFiredTime = System.nanoTime();
171 LOG.debug("submit of {} fired", name);
172 Futures.addCallback(submitResult, new FutureCallback<Void>() {
174 public void onSuccess(final Void result) {
175 LOG.debug("submit of {} finished in {} ms", name, System.nanoTime() - submitFiredTime);
179 public void onFailure(final Throwable t) {
180 LOG.warn("transaction submit failed: {}", t.getMessage());
185 void enableSubmit() {
186 submitIsEnabled = true;
190 public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
191 final AsyncTransaction<?, ?> transaction, final Throwable cause) {
192 LOG.warn("txChain failed -> recreating", cause);
196 private void recreateTxChain() {
197 txChainFactory.close();
198 txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
202 public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
203 // NOOP - only yet, here is probably place for notification to get new WriteTransaction