X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FTransactionChainManager.java;h=657aebaf072d119a63cfbccdfcdd057efdd57713;hb=ad3b6f4df6aad9bbb09b98cc00ca0b7b3534d0a0;hp=4224b109bbedc6c0fa116bf721f03a3da7086367;hpb=95641a51c4a1c89d070c7da9071f35641cfb99c0;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java index 4224b109bb..657aebaf07 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java @@ -9,129 +9,336 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Verify; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.FutureFallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * openflowplugin-impl * org.opendaylight.openflowplugin.impl.device - * + *

* Package protected class for controlling {@link WriteTransaction} life cycle. It is * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)}) * and submitTransaction method (wrapped {@link WriteTransaction#submit()}) * * @author Vaclav Demcak - * - * Created: Apr 2, 2015 + *

+ * Created: Apr 2, 2015 */ -@VisibleForTesting -class TransactionChainManager implements TransactionChainListener { +class TransactionChainManager implements TransactionChainListener, AutoCloseable { - private static Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class); + private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class); + + private final Object txLock = new Object(); - private final HashedWheelTimer hashedWheelTimer; private final DataBroker dataBroker; - private final long maxTx; - private final long timerValue; - private BindingTransactionChain txChainFactory; + private final DeviceState deviceState; + @GuardedBy("txLock") private WriteTransaction wTx; - private Timeout submitTaskTime; - private long nrOfActualTx; - private boolean counterIsEnabled; + @GuardedBy("txLock") + private BindingTransactionChain txChainFactory; + private boolean submitIsEnabled; + + @GuardedBy("txLock") + private TransactionChainManagerStatus transactionChainManagerStatus; + private final KeyedInstanceIdentifier nodeII; TransactionChainManager(@Nonnull final DataBroker dataBroker, - @Nonnull final HashedWheelTimer hashedWheelTimer, - final long maxTx, - final long timerValue) { + @Nonnull final DeviceState deviceState) { this.dataBroker = Preconditions.checkNotNull(dataBroker); - this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer); - this.maxTx = maxTx; + this.deviceState = Preconditions.checkNotNull(deviceState); + this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier()); + this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; + LOG.debug("created txChainManager"); + } + + @GuardedBy("txLock") + private void createTxChain() { + if (txChainFactory != null) { + txChainFactory.close(); + } txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); - nrOfActualTx = 0L; - this.timerValue = timerValue; - LOG.debug("created txChainManager with operation limit {}", maxTx); } - synchronized void writeToTransaction(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) { - if (wTx == null) { - wTx = txChainFactory.newWriteOnlyTransaction(); + void initialSubmitWriteTransaction() { + enableSubmit(); + submitWriteTransaction(); + } + + /** + * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make + * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS + * transactions. Call this method for MASTER role only. + */ + public void activateTransactionManager() { + LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), this.submitIsEnabled); + synchronized (txLock) { + if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) { + LOG.debug("Transaction Factory create {}", deviceState.getNodeId()); + Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close."); + Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction."); + this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING; + createTxChain(); + } else { + LOG.debug("Transaction is active {}", deviceState.getNodeId()); + } + } + } + + /** + * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters + * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS. + * Call this method for SLAVE only. + * @return Future + */ + public ListenableFuture deactivateTransactionManager() { + final ListenableFuture future; + synchronized (txLock) { + if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { + LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId()); + transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; + future = txChainShuttingDown(); + Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction."); + LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + txChainFactory.close(); + txChainFactory = null; + } + + @Override + public void onFailure(final Throwable t) { + txChainFactory.close(); + txChainFactory = null; + } + }); + } else { + // TODO : ignoring redundant deactivate invocation + future = Futures.immediateCheckedFuture(null); + } } - wTx.put(store, path, data); - if ( ! counterIsEnabled) { - return; + return future; + } + + boolean submitWriteTransaction() { + if (!submitIsEnabled) { + LOG.trace("transaction not committed - submit block issued"); + return false; + } + synchronized (txLock) { + if (wTx == null) { + LOG.trace("nothing to commit - submit returns true"); + return true; + } + Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus), + "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII); + final CheckedFuture submitFuture = wTx.submit(); + Futures.addCallback(submitFuture, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + //no action required + } + + @Override + public void onFailure(final Throwable t) { + if (t instanceof TransactionCommitFailedException) { + LOG.error("Transaction commit failed. {}", t); + } else { + LOG.error("Exception during transaction submitting. {}", t); + } + } + }); + wTx = null; } - countTxInAndCommit(); + return true; } - synchronized void addDeleteOperationTotTxChain(final LogicalDatastoreType store, - final InstanceIdentifier path) { - if (wTx == null) { - wTx = txChainFactory.newWriteOnlyTransaction(); + void addDeleteOperationTotTxChain(final LogicalDatastoreType store, + final InstanceIdentifier path) { + final WriteTransaction writeTx = getTransactionSafely(); + if (writeTx != null) { + writeTx.delete(store, path); + } else { + LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path); } - wTx.delete(store, path); - if ( ! counterIsEnabled) { - return; + } + + void writeToTransaction(final LogicalDatastoreType store, + final InstanceIdentifier path, final T data) { + final WriteTransaction writeTx = getTransactionSafely(); + if (writeTx != null) { + writeTx.put(store, path, data); + } else { + LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path); } - countTxInAndCommit(); } - private void countTxInAndCommit() { - nrOfActualTx += 1L; - if (nrOfActualTx >= maxTx) { - submitTransaction(); + @Override + public void onTransactionChainFailed(final TransactionChain chain, + final AsyncTransaction transaction, final Throwable cause) { + if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) { + LOG.warn("txChain failed -> recreating", cause); + recreateTxChain(); } } - synchronized void submitTransaction() { - if (wTx != null) { - LOG.trace("submitting transaction, counter: {}", nrOfActualTx); - wTx.submit(); + @Override + public void onTransactionChainSuccessful(final TransactionChain chain) { + // NOOP + } + + private void recreateTxChain() { + synchronized (txLock) { + createTxChain(); wTx = null; - nrOfActualTx = 0L; } - if (submitTaskTime != null && ! submitTaskTime.isExpired()) { - submitTaskTime.cancel(); + } + + @Nullable + private WriteTransaction getTransactionSafely() { + if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { + synchronized (txLock) { + if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { + if (wTx == null && txChainFactory != null) { + wTx = txChainFactory.newWriteOnlyTransaction(); + } + } + } + } + return wTx; + } + + @VisibleForTesting + void enableSubmit() { + submitIsEnabled = true; + } + + ListenableFuture shuttingDown() { + LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII); + ListenableFuture future; + synchronized (txLock) { + this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN; + future = txChainShuttingDown(); } - submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() { - @Override - public void run(final Timeout timeout) throws Exception { - submitTransaction(); + return future; + } + + private ListenableFuture txChainShuttingDown() { + ListenableFuture future; + if (txChainFactory == null) { + // stay with actual thread + future = Futures.immediateCheckedFuture(null); + } else { + // hijack md-sal thread + if (wTx == null) { + wTx = txChainFactory.newWriteOnlyTransaction(); } - }, timerValue, TimeUnit.MILLISECONDS); + final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()); + wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build()); + future = wTx.submit(); + wTx = null; + + future = Futures.withFallback(future, new FutureFallback() { + + @Override + public ListenableFuture create(final Throwable t) throws Exception { + LOG.debug("Last ShuttingDown Transaction for node {} fail. Put empty FlowCapableNode", + deviceState.getNodeId()); + final ReadOnlyTransaction readWriteTx = dataBroker.newReadOnlyTransaction(); + final CheckedFuture, ReadFailedException> readFlowNode = readWriteTx + .read(LogicalDatastoreType.OPERATIONAL, nodeII.augmentation(FlowCapableNode.class)); + return Futures.transform(readFlowNode, new AsyncFunction, Void>() { + + @Override + public ListenableFuture apply(final Optional input) { + if (input.isPresent()) { + final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); + nodeBuilder.addAugmentation(FlowCapableNode.class, new FlowCapableNodeBuilder().build()); + delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build()); + return delWtx.submit(); + } + return Futures.immediateFuture(null); + } + }); + } + }); + } + return future; } - synchronized void enableCounter() { - counterIsEnabled = true; + /** + * Transaction could be close if we are not submit anything. We have property submitIsEnable what + * could protect us for check it is NEW transaction from chain and we are able close everything + * safely. + */ + void clearUnsubmittedTransaction() { + LOG.debug("Cleaning unsubmited Transaction for Device {}", deviceState.getNodeId()); + Verify.verify(!submitIsEnabled, "We are not able clean TxChain {}", deviceState.getNodeId()); + synchronized (txLock) { + if (wTx != null) { + wTx.cancel(); + wTx = null; + } + if (txChainFactory != null) { + txChainFactory.close(); + txChainFactory = null; + } + transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; + } } @Override - public void onTransactionChainFailed(final TransactionChain chain, - final AsyncTransaction transaction, final Throwable cause) { - LOG.debug("txChain failed -> recreating"); - LOG.trace("reason", cause); - txChainFactory.close(); - txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); + public void close() { + LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN, will wait for ownershipservice to notify", nodeII); + Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)); + Preconditions.checkState(wTx == null); + synchronized (txLock) { + if (txChainFactory != null) { + txChainFactory.close(); + txChainFactory = null; + } + } + Preconditions.checkState(txChainFactory == null); } - @Override - public void onTransactionChainSuccessful(final TransactionChain chain) { - // NOOP - only yet, here is probably place for notification to get new WriteTransaction + private enum TransactionChainManagerStatus { + /** txChainManager is sleeping - is not active (SLAVE or default init value) */ + WORKING, + /** txChainManager is working - is active (MASTER) */ + SLEEPING, + /** txChainManager is trying to be closed - device disconnecting */ + SHUTTING_DOWN; } }