X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FTransactionChainManager.java;h=350372d3a94a30a1a6911dfbaf6ca1a1847246ca;hb=refs%2Fchanges%2F14%2F57814%2F38;hp=b782688d02e4dac3f738d506bfa6cadda587d93a;hpb=d10c556113e37e0cb9fa08ed47932f72ba454ee3;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java index b782688d02..350372d3a9 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java @@ -14,8 +14,13 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; @@ -23,16 +28,12 @@ import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; +import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,20 +45,15 @@ import org.slf4j.LoggerFactory; * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)}) * and submitTransaction method (wrapped {@link WriteTransaction#submit()}) - * - * @author Vaclav Demcak - *

- * Created: Apr 2, 2015 */ class TransactionChainManager implements TransactionChainListener, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class); + private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction."; private final Object txLock = new Object(); - private final KeyedInstanceIdentifier nodeII; - private final DeviceInfo deviceInfo; private final DataBroker dataBroker; - private final LifecycleConductor conductor; + private final String nodeId; @GuardedBy("txLock") private WriteTransaction wTx; @@ -68,42 +64,28 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable @GuardedBy("txLock") private ListenableFuture lastSubmittedFuture; - private boolean initCommit; - - public TransactionChainManagerStatus getTransactionChainManagerStatus() { - return transactionChainManagerStatus; - } + private volatile boolean initCommit; @GuardedBy("txLock") private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; TransactionChainManager(@Nonnull final DataBroker dataBroker, - @Nonnull final DeviceInfo deviceInfo, - @Nonnull final LifecycleConductor conductor) { - this.dataBroker = Preconditions.checkNotNull(dataBroker); - this.conductor = Preconditions.checkNotNull(conductor); - this.deviceInfo = deviceInfo; - this.nodeII = deviceInfo.getNodeInstanceIdentifier(); - this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; - lastSubmittedFuture = Futures.immediateFuture(null); - LOG.debug("created txChainManager for {}", this.nodeII); - } - - private NodeId nodeId() { - return nodeII.getKey().getId(); + @Nonnull final DeviceInfo deviceInfo) { + this.dataBroker = dataBroker; + this.nodeId = deviceInfo.getLOGValue(); + this.lastSubmittedFuture = Futures.immediateFuture(null); } @GuardedBy("txLock") private void createTxChain() { - if (txChainFactory != null) { - txChainFactory.close(); - } + BindingTransactionChain txChainFactoryTemp = txChainFactory; txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); + Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close); } - void initialSubmitWriteTransaction() { + boolean initialSubmitWriteTransaction() { enableSubmit(); - submitWriteTransaction(); + return submitWriteTransaction(); } /** @@ -112,18 +94,20 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable * transactions. Call this method for MASTER role only. */ void activateTransactionManager() { - LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", nodeId(), submitIsEnabled); + if (LOG.isDebugEnabled()) { + LOG.debug("activateTransactionManager for node {} transaction submit is set to {}", + this.nodeId, submitIsEnabled); + } synchronized (txLock) { - if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) { - LOG.debug("Transaction Factory create {}", nodeId()); - Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close."); - Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction."); + if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) { + Preconditions.checkState(txChainFactory == null, + "TxChainFactory survive last close."); + Preconditions.checkState(wTx == null, + "We have some unexpected WriteTransaction."); this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING; this.submitIsEnabled = false; this.initCommit = true; createTxChain(); - } else { - LOG.debug("Transaction is active {}", nodeId()); } } } @@ -135,107 +119,133 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable * @return Future */ ListenableFuture deactivateTransactionManager() { + if (LOG.isDebugEnabled()) { + LOG.debug("deactivateTransactionManager for node {}", this.nodeId); + } final ListenableFuture future; synchronized (txLock) { - if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { - LOG.debug("Submitting all transactions if we were in status WORKING for Node {}", nodeId()); + if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) { transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING; future = txChainShuttingDown(); - Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction."); - LOG.debug("Transaction Factory deactivate for Node {}", nodeId()); + Preconditions.checkState(wTx == null, + "We have some unexpected WriteTransaction."); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(final Void result) { - txChainFactory.close(); - txChainFactory = null; + removeTxChainFactory(); } @Override public void onFailure(final Throwable t) { - txChainFactory.close(); - txChainFactory = null; + removeTxChainFactory(); } }); } else { - // TODO : ignoring redundant deactivate invocation + // ignoring redundant deactivate invocation future = Futures.immediateCheckedFuture(null); } } return future; } + private void removeTxChainFactory() { + Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close); + txChainFactory = null; + } + boolean submitWriteTransaction() { synchronized (txLock) { if (!submitIsEnabled) { - LOG.trace("transaction not committed - submit block issued"); + if (LOG.isTraceEnabled()) { + LOG.trace("transaction not committed - submit block issued"); + } return false; } - if (wTx == null) { - LOG.trace("nothing to commit - submit returns true"); + if (Objects.isNull(wTx)) { + if (LOG.isTraceEnabled()) { + LOG.trace("nothing to commit - submit returns true"); + } return true; } - Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus), - "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII); + Preconditions.checkState(TransactionChainManagerStatus.WORKING == transactionChainManagerStatus, + "we have here Uncompleted Transaction for node {} and we are not MASTER", + this.nodeId); final CheckedFuture submitFuture = wTx.submit(); + lastSubmittedFuture = submitFuture; + wTx = null; + + if (initCommit) { + try { + submitFuture.get(5L, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException ex) { + LOG.error("Exception during INITIAL transaction submitting. ", ex); + return false; + } + initCommit = false; + return true; + } + Futures.addCallback(submitFuture, new FutureCallback() { @Override public void onSuccess(final Void result) { - if (initCommit) { - initCommit = false; - } + //NOOP } @Override public void onFailure(final Throwable t) { if (t instanceof TransactionCommitFailedException) { - LOG.error("Transaction commit failed. {}", t); + LOG.error("Transaction commit failed. ", t); } else { - LOG.error("Exception during transaction submitting. {}", t); - } - if (initCommit) { - LOG.error("Initial commit failed. {}", t); - conductor.closeConnection(deviceInfo); + if (t instanceof CancellationException) { + LOG.warn("Submit task was canceled"); + LOG.trace("Submit exception: ", t); + } else { + LOG.error("Exception during transaction submitting. ", t); + } } } }); - lastSubmittedFuture = submitFuture; - wTx = null; } return true; } void addDeleteOperationTotTxChain(final LogicalDatastoreType store, - final InstanceIdentifier path) throws Exception { - final WriteTransaction writeTx = getTransactionSafely(); - if (writeTx != null) { - LOG.trace("addDeleteOperation called with path {} ", path); - writeTx.delete(store, path); - } else { - LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path); - throw new Exception("Cannot write into transaction."); + final InstanceIdentifier path){ + synchronized (txLock) { + ensureTransaction(); + if (wTx == null) { + LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path); + throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION); + } + + wTx.delete(store, path); } } void writeToTransaction(final LogicalDatastoreType store, final InstanceIdentifier path, final T data, - final boolean createParents) throws Exception { - final WriteTransaction writeTx = getTransactionSafely(); - if (writeTx != null) { - LOG.trace("writeToTransaction called with path {} ", path); - writeTx.put(store, path, data, createParents); - } else { - LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path); - throw new Exception("Cannot write into transaction."); + final boolean createParents){ + synchronized (txLock) { + ensureTransaction(); + if (wTx == null) { + LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path); + throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION); + } + + wTx.put(store, path, data, createParents); } } @Override public void onTransactionChainFailed(final TransactionChain chain, final AsyncTransaction transaction, final Throwable cause) { - if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) { - LOG.warn("txChain failed -> recreating due to {}", cause); - recreateTxChain(); + synchronized (txLock) { + if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) { + LOG.warn("Transaction chain failed, recreating chain due to ", cause); + createTxChain(); + wTx = null; + } } } @@ -244,25 +254,12 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable // NOOP } - private void recreateTxChain() { - synchronized (txLock) { - createTxChain(); - wTx = null; - } - } - - @Nullable - private WriteTransaction getTransactionSafely() { - if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { - synchronized (txLock) { - if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) { - if (wTx == null && txChainFactory != null) { - wTx = txChainFactory.newWriteOnlyTransaction(); - } - } - } + @GuardedBy("txLock") + private void ensureTransaction() { + if (wTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus + && txChainFactory != null) { + wTx = txChainFactory.newWriteOnlyTransaction(); } - return wTx; } @VisibleForTesting @@ -274,46 +271,52 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable } ListenableFuture shuttingDown() { - LOG.debug("TxManager is going SHUTTING_DOWN for node {}", nodeII); - ListenableFuture future; + if (LOG.isDebugEnabled()) { + LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId); + } synchronized (txLock) { this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN; - future = txChainShuttingDown(); + return txChainShuttingDown(); } - return future; } @GuardedBy("txLock") private ListenableFuture txChainShuttingDown() { + boolean wasSubmitEnabled = submitIsEnabled; submitIsEnabled = false; ListenableFuture future; - if (txChainFactory == null) { + + if (!wasSubmitEnabled || txChainFactory == null) { // stay with actual thread future = Futures.immediateCheckedFuture(null); + + if (wTx != null) { + wTx.cancel(); + wTx = null; + } } else if (wTx == null) { // hijack md-sal thread future = lastSubmittedFuture; } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Submitting all transactions for Node {}", this.nodeId); + } // hijack md-sal thread future = wTx.submit(); wTx = null; } + return future; } @Override public void close() { - LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}, will wait for ownershipservice to notify" - , nodeII); - Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)); - Preconditions.checkState(wTx == null); + if (LOG.isDebugEnabled()) { + LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId); + } synchronized (txLock) { - if (txChainFactory != null) { - txChainFactory.close(); - txChainFactory = null; - } + removeTxChainFactory(); } - Preconditions.checkState(txChainFactory == null); } private enum TransactionChainManagerStatus {