X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FTransactionChainManager.java;h=657aebaf072d119a63cfbccdfcdd057efdd57713;hb=ad3b6f4df6aad9bbb09b98cc00ca0b7b3534d0a0;hp=993d79340cedf82a00e86c559460ca4a38e39308;hpb=a845c4f39cf5ab145af181d1e0b9c4706971238f;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java index 993d79340c..657aebaf07 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java @@ -9,107 +9,336 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.FutureFallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * openflowplugin-impl * org.opendaylight.openflowplugin.impl.device - * + *

* Package protected class for controlling {@link WriteTransaction} life cycle. It is * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)}) * and submitTransaction method (wrapped {@link WriteTransaction#submit()}) * * @author Vaclav Demcak - * - * Created: Apr 2, 2015 + *

+ * Created: Apr 2, 2015 */ -@VisibleForTesting -class TransactionChainManager implements TransactionChainListener { +class TransactionChainManager implements TransactionChainListener, AutoCloseable { - private static Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class); + private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class); + + private final Object txLock = new Object(); private final DataBroker dataBroker; - private final long maxTx; - private BindingTransactionChain txChainFactory; + private final DeviceState deviceState; + @GuardedBy("txLock") private WriteTransaction wTx; - private long nrOfActualTx; - private boolean counterIsEnabled; + @GuardedBy("txLock") + private BindingTransactionChain txChainFactory; + private boolean submitIsEnabled; + + @GuardedBy("txLock") + private TransactionChainManagerStatus transactionChainManagerStatus; + private final KeyedInstanceIdentifier nodeII; - TransactionChainManager(@Nonnull final DataBroker dataBroker, final long maxTx) { + TransactionChainManager(@Nonnull final DataBroker dataBroker, + @Nonnull final DeviceState deviceState) { this.dataBroker = Preconditions.checkNotNull(dataBroker); - this.maxTx = maxTx; - txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); - nrOfActualTx = 0L; - LOG.debug("created txChainManager with operation limit {}", maxTx); + this.deviceState = Preconditions.checkNotNull(deviceState); + this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier()); + this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; + LOG.debug("created txChainManager"); } - synchronized void writeToTransaction(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) { - if (wTx == null) { - wTx = txChainFactory.newWriteOnlyTransaction(); + @GuardedBy("txLock") + private void createTxChain() { + if (txChainFactory != null) { + txChainFactory.close(); } - wTx.put(store, path, data); - if ( ! counterIsEnabled) { - return; - } - nrOfActualTx += 1L; - if (nrOfActualTx == maxTx) { - submitTransaction(); + txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); + } + + void initialSubmitWriteTransaction() { + enableSubmit(); + submitWriteTransaction(); + } + + /** + * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make + * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS + * transactions. Call this method for MASTER role only. + */ + public void activateTransactionManager() { + LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), this.submitIsEnabled); + synchronized (txLock) { + if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) { + LOG.debug("Transaction Factory create {}", deviceState.getNodeId()); + Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close."); + Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction."); + this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING; + createTxChain(); + } else { + LOG.debug("Transaction is active {}", deviceState.getNodeId()); + } } } - synchronized void addDeleteOperationTotTxChain(final LogicalDatastoreType store, - final InstanceIdentifier path) { - if (wTx == null) { - wTx = txChainFactory.newWriteOnlyTransaction(); + /** + * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters + * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS. + * Call this method for SLAVE only. + * @return Future + */ + public ListenableFuture deactivateTransactionManager() { + final ListenableFuture future; + synchronized (txLock) { + if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { + LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId()); + transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; + future = txChainShuttingDown(); + Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction."); + LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId()); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + txChainFactory.close(); + txChainFactory = null; + } + + @Override + public void onFailure(final Throwable t) { + txChainFactory.close(); + txChainFactory = null; + } + }); + } else { + // TODO : ignoring redundant deactivate invocation + future = Futures.immediateCheckedFuture(null); + } } - wTx.delete(store, path); - if ( ! counterIsEnabled) { - return; + return future; + } + + boolean submitWriteTransaction() { + if (!submitIsEnabled) { + LOG.trace("transaction not committed - submit block issued"); + return false; } - nrOfActualTx += 1L; - if (nrOfActualTx == maxTx) { - submitTransaction(); + synchronized (txLock) { + if (wTx == null) { + LOG.trace("nothing to commit - submit returns true"); + return true; + } + Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus), + "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII); + final CheckedFuture submitFuture = wTx.submit(); + Futures.addCallback(submitFuture, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + //no action required + } + + @Override + public void onFailure(final Throwable t) { + if (t instanceof TransactionCommitFailedException) { + LOG.error("Transaction commit failed. {}", t); + } else { + LOG.error("Exception during transaction submitting. {}", t); + } + } + }); + wTx = null; } + return true; } - synchronized void submitTransaction() { - if (wTx != null) { - LOG.trace("submitting transaction, counter: {}", nrOfActualTx); - wTx.submit(); - wTx = null; - nrOfActualTx = 0L; + void addDeleteOperationTotTxChain(final LogicalDatastoreType store, + final InstanceIdentifier path) { + final WriteTransaction writeTx = getTransactionSafely(); + if (writeTx != null) { + writeTx.delete(store, path); + } else { + LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path); } } - synchronized void enableCounter() { - counterIsEnabled = true; + void writeToTransaction(final LogicalDatastoreType store, + final InstanceIdentifier path, final T data) { + final WriteTransaction writeTx = getTransactionSafely(); + if (writeTx != null) { + writeTx.put(store, path, data); + } else { + LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path); + } } @Override public void onTransactionChainFailed(final TransactionChain chain, - final AsyncTransaction transaction, final Throwable cause) { - LOG.debug("txChain failed -> recreating"); - LOG.trace("reason", cause); - txChainFactory.close(); - txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); + final AsyncTransaction transaction, final Throwable cause) { + if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) { + LOG.warn("txChain failed -> recreating", cause); + recreateTxChain(); + } } @Override public void onTransactionChainSuccessful(final TransactionChain chain) { - // NOOP - only yet, here is probably place for notification to get new WriteTransaction + // NOOP + } + + private void recreateTxChain() { + synchronized (txLock) { + createTxChain(); + wTx = null; + } + } + + @Nullable + private WriteTransaction getTransactionSafely() { + if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { + synchronized (txLock) { + if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { + if (wTx == null && txChainFactory != null) { + wTx = txChainFactory.newWriteOnlyTransaction(); + } + } + } + } + return wTx; + } + + @VisibleForTesting + void enableSubmit() { + submitIsEnabled = true; + } + + ListenableFuture shuttingDown() { + LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII); + ListenableFuture future; + synchronized (txLock) { + this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN; + future = txChainShuttingDown(); + } + return future; + } + + private ListenableFuture txChainShuttingDown() { + ListenableFuture future; + if (txChainFactory == null) { + // stay with actual thread + future = Futures.immediateCheckedFuture(null); + } else { + // hijack md-sal thread + if (wTx == null) { + wTx = txChainFactory.newWriteOnlyTransaction(); + } + final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId()); + wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build()); + future = wTx.submit(); + wTx = null; + + future = Futures.withFallback(future, new FutureFallback() { + + @Override + public ListenableFuture create(final Throwable t) throws Exception { + LOG.debug("Last ShuttingDown Transaction for node {} fail. Put empty FlowCapableNode", + deviceState.getNodeId()); + final ReadOnlyTransaction readWriteTx = dataBroker.newReadOnlyTransaction(); + final CheckedFuture, ReadFailedException> readFlowNode = readWriteTx + .read(LogicalDatastoreType.OPERATIONAL, nodeII.augmentation(FlowCapableNode.class)); + return Futures.transform(readFlowNode, new AsyncFunction, Void>() { + + @Override + public ListenableFuture apply(final Optional input) { + if (input.isPresent()) { + final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); + nodeBuilder.addAugmentation(FlowCapableNode.class, new FlowCapableNodeBuilder().build()); + delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build()); + return delWtx.submit(); + } + return Futures.immediateFuture(null); + } + }); + } + }); + } + return future; + } + + /** + * Transaction could be close if we are not submit anything. We have property submitIsEnable what + * could protect us for check it is NEW transaction from chain and we are able close everything + * safely. + */ + void clearUnsubmittedTransaction() { + LOG.debug("Cleaning unsubmited Transaction for Device {}", deviceState.getNodeId()); + Verify.verify(!submitIsEnabled, "We are not able clean TxChain {}", deviceState.getNodeId()); + synchronized (txLock) { + if (wTx != null) { + wTx.cancel(); + wTx = null; + } + if (txChainFactory != null) { + txChainFactory.close(); + txChainFactory = null; + } + transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; + } + } + + @Override + public void close() { + LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN, will wait for ownershipservice to notify", nodeII); + Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)); + Preconditions.checkState(wTx == null); + synchronized (txLock) { + if (txChainFactory != null) { + txChainFactory.close(); + txChainFactory = null; + } + } + Preconditions.checkState(txChainFactory == null); + } + + private enum TransactionChainManagerStatus { + /** txChainManager is sleeping - is not active (SLAVE or default init value) */ + WORKING, + /** txChainManager is working - is active (MASTER) */ + SLEEPING, + /** txChainManager is trying to be closed - device disconnecting */ + SHUTTING_DOWN; } }