Merge "Removed unused fields"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
index bc40b88b387bbff230f6d50917cc94953fb722d9..4706cc1df8d789a24605306c3a32c3563d5fc6eb 100644 (file)
@@ -13,10 +13,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -26,8 +22,12 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,135 +41,110 @@ import org.slf4j.LoggerFactory;
  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
  *
  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *         <p/>
+ *         </p>
  *         Created: Apr 2, 2015
  */
-@VisibleForTesting
-class TransactionChainManager implements TransactionChainListener {
+class TransactionChainManager implements TransactionChainListener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
 
-    private final HashedWheelTimer hashedWheelTimer;
+    private final Object txLock = new Object();
+
     private final DataBroker dataBroker;
-    private final long maxTx;
-    private final long timerValue;
-    private BindingTransactionChain txChainFactory;
     private WriteTransaction wTx;
-    private Timeout submitTaskTime;
-    private long nrOfActualTx;
+    private BindingTransactionChain txChainFactory;
     private boolean submitIsEnabled;
 
-    TransactionChainManager(@Nonnull final DataBroker dataBroker,
-                            @Nonnull final HashedWheelTimer hashedWheelTimer,
-                            final long maxTx,
-                            final long timerValue) {
-        this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
-        this.maxTx = maxTx;
-        txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
-        nrOfActualTx = 0L;
-        this.timerValue = timerValue;
-        LOG.debug("created txChainManager with operation limit {}", maxTx);
+    public TransactionChainManagerStatus getTransactionChainManagerStatus() {
+        return transactionChainManagerStatus;
     }
 
+    private TransactionChainManagerStatus transactionChainManagerStatus;
+    private ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler;
+    private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
+    private Registration managerRegistration;
 
-    public void commitOperationsGatheredInOneTransaction(){
-        enableSubmit();
-        submitTransaction();
-    }
-    public void startGatheringOperationsToOneTransaction(){
-        submitIsEnabled = false;
-    }
-
-    synchronized <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
-                                                                final InstanceIdentifier<T> path, final T data) {
-        if (wTx == null) {
-            wTx = txChainFactory.newWriteOnlyTransaction();
-        }
-        wTx.put(store, path, data);
-        countTxInAndCommit();
+    TransactionChainManager(@Nonnull final DataBroker dataBroker,
+                            @Nonnull final KeyedInstanceIdentifier<Node, NodeKey> nodeII,
+                            @Nonnull final Registration managerRegistration) {
+        this.dataBroker = Preconditions.checkNotNull(dataBroker);
+        this.nodeII = Preconditions.checkNotNull(nodeII);
+        this.managerRegistration = Preconditions.checkNotNull(managerRegistration);
+        this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
+        createTxChain(dataBroker);
+        LOG.debug("created txChainManager");
     }
 
-    synchronized <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
-                                                                          final InstanceIdentifier<T> path) {
-        if (wTx == null) {
-            wTx = txChainFactory.newWriteOnlyTransaction();
-        }
-        wTx.delete(store, path);
-        countTxInAndCommit();
+    private void createTxChain(final DataBroker dataBroker) {
+        txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
     }
 
-    private void countTxInAndCommit() {
-        nrOfActualTx += 1L;
-        if (nrOfActualTx >= maxTx) {
-            submitTransaction();
-        }
+    void initialSubmitWriteTransaction() {
+        enableSubmit();
+        submitWriteTransaction();
     }
 
-    synchronized void submitScheduledTransaction(Timeout timeout) {
-        if (timeout.isCancelled()) {
-            // zombie timer executed
-            return;
-        }
-
-        if (submitIsEnabled) {
-            submitTransaction();
+    public synchronized boolean attemptToRegisterHandler(final ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler) {
+        if (TransactionChainManagerStatus.SHUTTING_DOWN.equals(this.transactionChainManagerStatus)
+                && null == this.readyForNewTransactionChainHandler) {
+            this.readyForNewTransactionChainHandler = readyForNewTransactionChainHandler;
+            if (managerRegistration == null) {
+                this.readyForNewTransactionChainHandler.onReadyForNewTransactionChain();
+            }
+            return true;
         } else {
-            LOG.info("transaction submit task will not be scheduled - submit block issued.");
+            return false;
         }
     }
 
-    synchronized void submitTransaction() {
-        if (submitIsEnabled) {
-            if (wTx != null && nrOfActualTx > 0) {
-                LOG.trace("submitting transaction, counter: {}", nrOfActualTx);
-                CheckedFuture<Void, TransactionCommitFailedException> submitResult = wTx.submit();
-                hookTimeExpenseCounter(submitResult, String.valueOf(wTx.getIdentifier()));
-                wTx = null;
-                nrOfActualTx = 0L;
-            }
-            if (submitTaskTime != null) {
-                // if possible then cancel current timer (even if being executed via timer)
-                submitTaskTime.cancel();
+    boolean submitWriteTransaction() {
+        if (!submitIsEnabled) {
+            LOG.trace("transaction not committed - submit block issued");
+            return false;
+        }
+        synchronized (txLock) {
+            if (wTx == null) {
+                LOG.trace("nothing to commit - submit returns true");
+                return true;
             }
-            submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() {
+            final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
+            Futures.addCallback(submitFuture, new FutureCallback<Void>() {
                 @Override
-                public void run(final Timeout timeout) throws Exception {
-                    submitScheduledTransaction(timeout);
+                public void onSuccess(Void result) {
+                    //no action required
                 }
-            }, timerValue, TimeUnit.MILLISECONDS);
 
-        } else {
-            LOG.debug("transaction not committed - submit block issued");
+                @Override
+                public void onFailure(Throwable t) {
+                    if (t instanceof TransactionCommitFailedException) {
+                        LOG.error("Transaction commit failed. {}", t);
+                    } else {
+                        LOG.error("Exception during transaction submitting. {}", t);
+                    }
+                }
+            });
+            wTx = null;
         }
+        return true;
     }
 
-    private void hookTimeExpenseCounter(CheckedFuture<Void, TransactionCommitFailedException> submitResult, final String name) {
-        final long submitFiredTime = System.currentTimeMillis();
-        LOG.debug("submit of {} fired", name);
-        Futures.addCallback(submitResult, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(Void result) {
-                LOG.debug("submit of {} finished in {} ms", name, System.currentTimeMillis() - submitFiredTime);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOG.warn("transaction submit failed: {}", t.getMessage());
-            }
-        });
+    <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
+                                                             final InstanceIdentifier<T> path) {
+        final WriteTransaction writeTx = getTransactionSafely();
+        writeTx.delete(store, path);
     }
 
-    synchronized void enableSubmit() {
-        submitIsEnabled = true;
+    <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
+                                                   final InstanceIdentifier<T> path, final T data) {
+        final WriteTransaction writeTx = getTransactionSafely();
+        writeTx.put(store, path, data);
     }
 
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
         LOG.warn("txChain failed -> recreating", cause);
-        txChainFactory.close();
-        txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
+        recreateTxChain();
     }
 
     @Override
@@ -177,4 +152,76 @@ class TransactionChainManager implements TransactionChainListener {
         // NOOP - only yet, here is probably place for notification to get new WriteTransaction
     }
 
+    private void recreateTxChain() {
+        txChainFactory.close();
+        createTxChain(dataBroker);
+        synchronized (txLock) {
+            wTx = null;
+        }
+    }
+
+    private WriteTransaction getTransactionSafely() {
+        if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
+            synchronized (txLock) {
+                if (wTx == null) {
+                    wTx = txChainFactory.newWriteOnlyTransaction();
+                }
+            }
+        }
+        return wTx;
+    }
+
+    @VisibleForTesting
+    void enableSubmit() {
+        submitIsEnabled = true;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("Removing node {} from operational DS.", nodeII);
+        synchronized (txLock) {
+            final WriteTransaction writeTx = getTransactionSafely();
+            this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
+            writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
+            LOG.debug("Delete node {} from operational DS put to write transaction.", nodeII);
+            CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
+            LOG.debug("Delete node {} from operational DS write transaction submitted.", nodeII);
+            Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void aVoid) {
+                    LOG.debug("Removing node {} from operational DS successful .", nodeII);
+                    notifyReadyForNewTransactionChainAndCloseFactory();
+                }
+
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    LOG.info("Attempt to close transaction chain factory failed.", throwable);
+                    notifyReadyForNewTransactionChainAndCloseFactory();
+                }
+            });
+            wTx = null;
+        }
+    }
+
+    private void notifyReadyForNewTransactionChainAndCloseFactory() {
+        synchronized (this) {
+            try {
+                LOG.debug("Closing registration in manager.");
+                managerRegistration.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close transaction chain manager's registration.", e);
+            }
+            managerRegistration = null;
+            if (null != readyForNewTransactionChainHandler) {
+                readyForNewTransactionChainHandler.onReadyForNewTransactionChain();
+            }
+        }
+        txChainFactory.close();
+        LOG.debug("Transaction chain factory closed.");
+    }
+
+    public enum TransactionChainManagerStatus {
+        WORKING, SHUTTING_DOWN;
+    }
+
 }