X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FTransactionChainManager.java;h=657aebaf072d119a63cfbccdfcdd057efdd57713;hb=ad3b6f4df6aad9bbb09b98cc00ca0b7b3534d0a0;hp=80e8dae4a7daf15dab23429fee5305d00d6afcaa;hpb=d342b69bcd7bb2a7ad10316821025f774459b76f;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java index 80e8dae4a7..657aebaf07 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java @@ -9,22 +9,31 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.FutureFallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; @@ -61,10 +70,6 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable private BindingTransactionChain txChainFactory; private boolean submitIsEnabled; - public TransactionChainManagerStatus getTransactionChainManagerStatus() { - return transactionChainManagerStatus; - } - @GuardedBy("txLock") private TransactionChainManagerStatus transactionChainManagerStatus; private final KeyedInstanceIdentifier nodeII; @@ -97,7 +102,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable * transactions. Call this method for MASTER role only. */ public void activateTransactionManager() { - LOG.trace("activetTransactionManaager for node {} transaction submit is set to {}", deviceState.getNodeId()); + LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), this.submitIsEnabled); synchronized (txLock) { if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) { LOG.debug("Transaction Factory create {}", deviceState.getNodeId()); @@ -115,19 +120,36 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS. * Call this method for SLAVE only. + * @return Future */ - public void deactivateTransactionManager() { + public ListenableFuture deactivateTransactionManager() { + final ListenableFuture future; synchronized (txLock) { if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId()); - submitWriteTransaction(); + transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; + future = txChainShuttingDown(); Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction."); LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId()); - transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; - txChainFactory.close(); - txChainFactory = null; + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + txChainFactory.close(); + txChainFactory = null; + } + + @Override + public void onFailure(final Throwable t) { + txChainFactory.close(); + txChainFactory = null; + } + }); + } else { + // TODO : ignoring redundant deactivate invocation + future = Futures.immediateCheckedFuture(null); } } + return future; } boolean submitWriteTransaction() { @@ -223,26 +245,78 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable submitIsEnabled = true; } - CheckedFuture shuttingDown() { + ListenableFuture shuttingDown() { LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII); - CheckedFuture future; + ListenableFuture future; synchronized (txLock) { this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN; - if (txChainFactory == null) { - // stay with actual thread - future = Futures.immediateCheckedFuture(null); - } else { - // hijack md-sal thread - if (wTx == null) { - wTx = txChainFactory.newWriteOnlyTransaction(); + future = txChainShuttingDown(); + } + return future; + } + + private ListenableFuture txChainShuttingDown() { + ListenableFuture future; + if (txChainFactory == null) { + // stay with actual thread + future = Futures.immediateCheckedFuture(null); + } else { + // hijack md-sal thread + if (wTx == null) { + wTx = txChainFactory.newWriteOnlyTransaction(); + } + final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()); + wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build()); + future = wTx.submit(); + wTx = null; + + future = Futures.withFallback(future, new FutureFallback() { + + @Override + public ListenableFuture create(final Throwable t) throws Exception { + LOG.debug("Last ShuttingDown Transaction for node {} fail. Put empty FlowCapableNode", + deviceState.getNodeId()); + final ReadOnlyTransaction readWriteTx = dataBroker.newReadOnlyTransaction(); + final CheckedFuture, ReadFailedException> readFlowNode = readWriteTx + .read(LogicalDatastoreType.OPERATIONAL, nodeII.augmentation(FlowCapableNode.class)); + return Futures.transform(readFlowNode, new AsyncFunction, Void>() { + + @Override + public ListenableFuture apply(final Optional input) { + if (input.isPresent()) { + final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); + nodeBuilder.addAugmentation(FlowCapableNode.class, new FlowCapableNodeBuilder().build()); + delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build()); + return delWtx.submit(); + } + return Futures.immediateFuture(null); + } + }); } - final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()); - wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build()); - future = wTx.submit(); + }); + } + return future; + } + + /** + * Transaction could be close if we are not submit anything. We have property submitIsEnable what + * could protect us for check it is NEW transaction from chain and we are able close everything + * safely. + */ + void clearUnsubmittedTransaction() { + LOG.debug("Cleaning unsubmited Transaction for Device {}", deviceState.getNodeId()); + Verify.verify(!submitIsEnabled, "We are not able clean TxChain {}", deviceState.getNodeId()); + synchronized (txLock) { + if (wTx != null) { + wTx.cancel(); wTx = null; } + if (txChainFactory != null) { + txChainFactory.close(); + txChainFactory = null; + } + transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; } - return future; } @Override @@ -259,7 +333,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable Preconditions.checkState(txChainFactory == null); } - public enum TransactionChainManagerStatus { + private enum TransactionChainManagerStatus { /** txChainManager is sleeping - is not active (SLAVE or default init value) */ WORKING, /** txChainManager is working - is active (MASTER) */