import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* and submitTransaction method (wrapped {@link WriteTransaction#submit()})
*
* @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- * <p/>
+ * </p>
* Created: Apr 2, 2015
*/
-@VisibleForTesting
-class TransactionChainManager implements TransactionChainListener {
+class TransactionChainManager implements TransactionChainListener, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
- private final HashedWheelTimer hashedWheelTimer;
+ private final Object txLock = new Object();
+
private final DataBroker dataBroker;
- private final long maxTx;
- private final long timerValue;
- private BindingTransactionChain txChainFactory;
private WriteTransaction wTx;
- private Timeout submitTaskTime;
- private long nrOfActualTx;
+ private BindingTransactionChain txChainFactory;
private boolean submitIsEnabled;
- TransactionChainManager(@Nonnull final DataBroker dataBroker,
- @Nonnull final HashedWheelTimer hashedWheelTimer,
- final long maxTx,
- final long timerValue) {
- this.dataBroker = Preconditions.checkNotNull(dataBroker);
- this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
- this.maxTx = maxTx;
- txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
- nrOfActualTx = 0L;
- this.timerValue = timerValue;
- LOG.debug("created txChainManager with operation limit {}", maxTx);
+ public TransactionChainManagerStatus getTransactionChainManagerStatus() {
+ return transactionChainManagerStatus;
}
+ private TransactionChainManagerStatus transactionChainManagerStatus;
+ private ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler;
+ private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
+ private Registration managerRegistration;
- public void commitOperationsGatheredInOneTransaction(){
- enableSubmit();
- submitTransaction();
- }
- public void startGatheringOperationsToOneTransaction(){
- submitIsEnabled = false;
- }
-
- synchronized <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path, final T data) {
- if (wTx == null) {
- wTx = txChainFactory.newWriteOnlyTransaction();
- }
- wTx.put(store, path, data);
- countTxInAndCommit();
+ TransactionChainManager(@Nonnull final DataBroker dataBroker,
+ @Nonnull final KeyedInstanceIdentifier<Node, NodeKey> nodeII,
+ @Nonnull final Registration managerRegistration) {
+ this.dataBroker = Preconditions.checkNotNull(dataBroker);
+ this.nodeII = Preconditions.checkNotNull(nodeII);
+ this.managerRegistration = Preconditions.checkNotNull(managerRegistration);
+ this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
+ createTxChain(dataBroker);
+ LOG.debug("created txChainManager");
}
- synchronized <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
- final InstanceIdentifier<T> path) {
- if (wTx == null) {
- wTx = txChainFactory.newWriteOnlyTransaction();
- }
- wTx.delete(store, path);
- countTxInAndCommit();
+ private void createTxChain(final DataBroker dataBroker) {
+ txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
}
- private void countTxInAndCommit() {
- nrOfActualTx += 1L;
- if (nrOfActualTx >= maxTx) {
- submitTransaction();
- }
+ void initialSubmitWriteTransaction() {
+ enableSubmit();
+ submitWriteTransaction();
}
- synchronized void submitScheduledTransaction(Timeout timeout) {
- if (timeout.isCancelled()) {
- // zombie timer executed
- return;
- }
-
- if (submitIsEnabled) {
- submitTransaction();
+ public synchronized boolean attemptToRegisterHandler(final ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler) {
+ if (TransactionChainManagerStatus.SHUTTING_DOWN.equals(this.transactionChainManagerStatus)
+ && null == this.readyForNewTransactionChainHandler) {
+ this.readyForNewTransactionChainHandler = readyForNewTransactionChainHandler;
+ if (managerRegistration == null) {
+ this.readyForNewTransactionChainHandler.onReadyForNewTransactionChain();
+ }
+ return true;
} else {
- LOG.info("transaction submit task will not be scheduled - submit block issued.");
+ return false;
}
}
- synchronized void submitTransaction() {
- if (submitIsEnabled) {
- if (wTx != null && nrOfActualTx > 0) {
- LOG.trace("submitting transaction, counter: {}", nrOfActualTx);
- CheckedFuture<Void, TransactionCommitFailedException> submitResult = wTx.submit();
- hookTimeExpenseCounter(submitResult, String.valueOf(wTx.getIdentifier()));
- wTx = null;
- nrOfActualTx = 0L;
- }
- if (submitTaskTime != null) {
- // if possible then cancel current timer (even if being executed via timer)
- submitTaskTime.cancel();
+ boolean submitWriteTransaction() {
+ if (!submitIsEnabled) {
+ LOG.trace("transaction not committed - submit block issued");
+ return false;
+ }
+ synchronized (txLock) {
+ if (wTx == null) {
+ LOG.trace("nothing to commit - submit returns true");
+ return true;
}
- submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() {
+ final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
+ Futures.addCallback(submitFuture, new FutureCallback<Void>() {
@Override
- public void run(final Timeout timeout) throws Exception {
- submitScheduledTransaction(timeout);
+ public void onSuccess(Void result) {
+ //no action required
}
- }, timerValue, TimeUnit.MILLISECONDS);
- } else {
- LOG.debug("transaction not committed - submit block issued");
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof TransactionCommitFailedException) {
+ LOG.error("Transaction commit failed. {}", t);
+ } else {
+ LOG.error("Exception during transaction submitting. {}", t);
+ }
+ }
+ });
+ wTx = null;
}
+ return true;
}
- private void hookTimeExpenseCounter(CheckedFuture<Void, TransactionCommitFailedException> submitResult, final String name) {
- final long submitFiredTime = System.currentTimeMillis();
- LOG.debug("submit of {} fired", name);
- Futures.addCallback(submitResult, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- LOG.debug("submit of {} finished in {} ms", name, System.currentTimeMillis() - submitFiredTime);
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.warn("transaction submit failed: {}", t.getMessage());
- }
- });
+ <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path) {
+ final WriteTransaction writeTx = getTransactionSafely();
+ writeTx.delete(store, path);
}
- synchronized void enableSubmit() {
- submitIsEnabled = true;
+ <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
+ final InstanceIdentifier<T> path, final T data) {
+ final WriteTransaction writeTx = getTransactionSafely();
+ writeTx.put(store, path, data);
}
@Override
public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
final AsyncTransaction<?, ?> transaction, final Throwable cause) {
LOG.warn("txChain failed -> recreating", cause);
- txChainFactory.close();
- txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
+ recreateTxChain();
}
@Override
// NOOP - only yet, here is probably place for notification to get new WriteTransaction
}
+ private void recreateTxChain() {
+ txChainFactory.close();
+ createTxChain(dataBroker);
+ synchronized (txLock) {
+ wTx = null;
+ }
+ }
+
+ private WriteTransaction getTransactionSafely() {
+ if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
+ synchronized (txLock) {
+ if (wTx == null) {
+ wTx = txChainFactory.newWriteOnlyTransaction();
+ }
+ }
+ }
+ return wTx;
+ }
+
+ @VisibleForTesting
+ void enableSubmit() {
+ submitIsEnabled = true;
+ }
+
+ @Override
+ public void close() {
+ LOG.debug("Removing node {} from operational DS.", nodeII);
+ synchronized (txLock) {
+ final WriteTransaction writeTx = getTransactionSafely();
+ this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
+ writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
+ LOG.debug("Delete node {} from operational DS put to write transaction.", nodeII);
+ CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
+ LOG.debug("Delete node {} from operational DS write transaction submitted.", nodeII);
+ Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void aVoid) {
+ LOG.debug("Removing node {} from operational DS successful .", nodeII);
+ notifyReadyForNewTransactionChainAndCloseFactory();
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.info("Attempt to close transaction chain factory failed.", throwable);
+ notifyReadyForNewTransactionChainAndCloseFactory();
+ }
+ });
+ wTx = null;
+ }
+ }
+
+ private void notifyReadyForNewTransactionChainAndCloseFactory() {
+ synchronized (this) {
+ try {
+ LOG.debug("Closing registration in manager.");
+ managerRegistration.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close transaction chain manager's registration.", e);
+ }
+ managerRegistration = null;
+ if (null != readyForNewTransactionChainHandler) {
+ readyForNewTransactionChainHandler.onReadyForNewTransactionChain();
+ }
+ }
+ txChainFactory.close();
+ LOG.debug("Transaction chain factory closed.");
+ }
+
+ public enum TransactionChainManagerStatus {
+ WORKING, SHUTTING_DOWN;
+ }
+
}