X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FTransactionChainManager.java;h=4706cc1df8d789a24605306c3a32c3563d5fc6eb;hb=0b4b43cb55409bc1a6f8aa2be4bda5b05bd2a66e;hp=4224b109bbedc6c0fa116bf721f03a3da7086367;hpb=80087e9e52fe5e1869e5f0b813891fe6a9240162;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java index 4224b109bb..4706cc1df8 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java @@ -10,10 +10,9 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; -import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import javax.annotation.Nonnull; import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain; import org.opendaylight.controller.md.sal.binding.api.DataBroker; @@ -22,116 +21,207 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionChain; import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.concepts.Registration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * openflowplugin-impl * org.opendaylight.openflowplugin.impl.device - * + *

* Package protected class for controlling {@link WriteTransaction} life cycle. It is * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)}) * and submitTransaction method (wrapped {@link WriteTransaction#submit()}) * * @author Vaclav Demcak - * - * Created: Apr 2, 2015 + *

+ * Created: Apr 2, 2015 */ -@VisibleForTesting -class TransactionChainManager implements TransactionChainListener { +class TransactionChainManager implements TransactionChainListener, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class); - private static Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class); + private final Object txLock = new Object(); - private final HashedWheelTimer hashedWheelTimer; private final DataBroker dataBroker; - private final long maxTx; - private final long timerValue; - private BindingTransactionChain txChainFactory; private WriteTransaction wTx; - private Timeout submitTaskTime; - private long nrOfActualTx; - private boolean counterIsEnabled; + private BindingTransactionChain txChainFactory; + private boolean submitIsEnabled; + + public TransactionChainManagerStatus getTransactionChainManagerStatus() { + return transactionChainManagerStatus; + } + + private TransactionChainManagerStatus transactionChainManagerStatus; + private ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler; + private final KeyedInstanceIdentifier nodeII; + private Registration managerRegistration; TransactionChainManager(@Nonnull final DataBroker dataBroker, - @Nonnull final HashedWheelTimer hashedWheelTimer, - final long maxTx, - final long timerValue) { + @Nonnull final KeyedInstanceIdentifier nodeII, + @Nonnull final Registration managerRegistration) { this.dataBroker = Preconditions.checkNotNull(dataBroker); - this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer); - this.maxTx = maxTx; + this.nodeII = Preconditions.checkNotNull(nodeII); + this.managerRegistration = Preconditions.checkNotNull(managerRegistration); + this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING; + createTxChain(dataBroker); + LOG.debug("created txChainManager"); + } + + private void createTxChain(final DataBroker dataBroker) { txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); - nrOfActualTx = 0L; - this.timerValue = timerValue; - LOG.debug("created txChainManager with operation limit {}", maxTx); } - synchronized void writeToTransaction(final LogicalDatastoreType store, - final InstanceIdentifier path, final T data) { - if (wTx == null) { - wTx = txChainFactory.newWriteOnlyTransaction(); - } - wTx.put(store, path, data); - if ( ! counterIsEnabled) { - return; - } - countTxInAndCommit(); + void initialSubmitWriteTransaction() { + enableSubmit(); + submitWriteTransaction(); } - synchronized void addDeleteOperationTotTxChain(final LogicalDatastoreType store, - final InstanceIdentifier path) { - if (wTx == null) { - wTx = txChainFactory.newWriteOnlyTransaction(); - } - wTx.delete(store, path); - if ( ! counterIsEnabled) { - return; + public synchronized boolean attemptToRegisterHandler(final ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler) { + if (TransactionChainManagerStatus.SHUTTING_DOWN.equals(this.transactionChainManagerStatus) + && null == this.readyForNewTransactionChainHandler) { + this.readyForNewTransactionChainHandler = readyForNewTransactionChainHandler; + if (managerRegistration == null) { + this.readyForNewTransactionChainHandler.onReadyForNewTransactionChain(); + } + return true; + } else { + return false; } - countTxInAndCommit(); } - private void countTxInAndCommit() { - nrOfActualTx += 1L; - if (nrOfActualTx >= maxTx) { - submitTransaction(); + boolean submitWriteTransaction() { + if (!submitIsEnabled) { + LOG.trace("transaction not committed - submit block issued"); + return false; } - } + synchronized (txLock) { + if (wTx == null) { + LOG.trace("nothing to commit - submit returns true"); + return true; + } + final CheckedFuture submitFuture = wTx.submit(); + Futures.addCallback(submitFuture, new FutureCallback() { + @Override + public void onSuccess(Void result) { + //no action required + } - synchronized void submitTransaction() { - if (wTx != null) { - LOG.trace("submitting transaction, counter: {}", nrOfActualTx); - wTx.submit(); + @Override + public void onFailure(Throwable t) { + if (t instanceof TransactionCommitFailedException) { + LOG.error("Transaction commit failed. {}", t); + } else { + LOG.error("Exception during transaction submitting. {}", t); + } + } + }); wTx = null; - nrOfActualTx = 0L; - } - if (submitTaskTime != null && ! submitTaskTime.isExpired()) { - submitTaskTime.cancel(); } - submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() { - @Override - public void run(final Timeout timeout) throws Exception { - submitTransaction(); - } - }, timerValue, TimeUnit.MILLISECONDS); + return true; } - synchronized void enableCounter() { - counterIsEnabled = true; + void addDeleteOperationTotTxChain(final LogicalDatastoreType store, + final InstanceIdentifier path) { + final WriteTransaction writeTx = getTransactionSafely(); + writeTx.delete(store, path); + } + + void writeToTransaction(final LogicalDatastoreType store, + final InstanceIdentifier path, final T data) { + final WriteTransaction writeTx = getTransactionSafely(); + writeTx.put(store, path, data); } @Override public void onTransactionChainFailed(final TransactionChain chain, - final AsyncTransaction transaction, final Throwable cause) { - LOG.debug("txChain failed -> recreating"); - LOG.trace("reason", cause); - txChainFactory.close(); - txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this); + final AsyncTransaction transaction, final Throwable cause) { + LOG.warn("txChain failed -> recreating", cause); + recreateTxChain(); } @Override public void onTransactionChainSuccessful(final TransactionChain chain) { // NOOP - only yet, here is probably place for notification to get new WriteTransaction } + + private void recreateTxChain() { + txChainFactory.close(); + createTxChain(dataBroker); + synchronized (txLock) { + wTx = null; + } + } + + private WriteTransaction getTransactionSafely() { + if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) { + synchronized (txLock) { + if (wTx == null) { + wTx = txChainFactory.newWriteOnlyTransaction(); + } + } + } + return wTx; + } + + @VisibleForTesting + void enableSubmit() { + submitIsEnabled = true; + } + + @Override + public void close() { + LOG.debug("Removing node {} from operational DS.", nodeII); + synchronized (txLock) { + final WriteTransaction writeTx = getTransactionSafely(); + this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN; + writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII); + LOG.debug("Delete node {} from operational DS put to write transaction.", nodeII); + CheckedFuture submitsFuture = writeTx.submit(); + LOG.debug("Delete node {} from operational DS write transaction submitted.", nodeII); + Futures.addCallback(submitsFuture, new FutureCallback() { + @Override + public void onSuccess(final Void aVoid) { + LOG.debug("Removing node {} from operational DS successful .", nodeII); + notifyReadyForNewTransactionChainAndCloseFactory(); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.info("Attempt to close transaction chain factory failed.", throwable); + notifyReadyForNewTransactionChainAndCloseFactory(); + } + }); + wTx = null; + } + } + + private void notifyReadyForNewTransactionChainAndCloseFactory() { + synchronized (this) { + try { + LOG.debug("Closing registration in manager."); + managerRegistration.close(); + } catch (Exception e) { + LOG.warn("Failed to close transaction chain manager's registration.", e); + } + managerRegistration = null; + if (null != readyForNewTransactionChainHandler) { + readyForNewTransactionChainHandler.onReadyForNewTransactionChain(); + } + } + txChainFactory.close(); + LOG.debug("Transaction chain factory closed."); + } + + public enum TransactionChainManagerStatus { + WORKING, SHUTTING_DOWN; + } + }