Merge "Removed unused fields"
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
index 4224b109bbedc6c0fa116bf721f03a3da7086367..4706cc1df8d789a24605306c3a32c3563d5fc6eb 100644 (file)
@@ -10,10 +10,9 @@ package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import javax.annotation.Nonnull;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
@@ -22,116 +21,207 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * openflowplugin-impl
  * org.opendaylight.openflowplugin.impl.device
- *
+ * <p/>
  * Package protected class for controlling {@link WriteTransaction} life cycle. It is
  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
  *
  * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *
- * Created: Apr 2, 2015
+ *         </p>
+ *         Created: Apr 2, 2015
  */
-@VisibleForTesting
-class TransactionChainManager implements TransactionChainListener {
+class TransactionChainManager implements TransactionChainListener, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
 
-    private static Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
+    private final Object txLock = new Object();
 
-    private final HashedWheelTimer hashedWheelTimer;
     private final DataBroker dataBroker;
-    private final long maxTx;
-    private final long timerValue;
-    private BindingTransactionChain txChainFactory;
     private WriteTransaction wTx;
-    private Timeout submitTaskTime;
-    private long nrOfActualTx;
-    private boolean counterIsEnabled;
+    private BindingTransactionChain txChainFactory;
+    private boolean submitIsEnabled;
+
+    public TransactionChainManagerStatus getTransactionChainManagerStatus() {
+        return transactionChainManagerStatus;
+    }
+
+    private TransactionChainManagerStatus transactionChainManagerStatus;
+    private ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler;
+    private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
+    private Registration managerRegistration;
 
     TransactionChainManager(@Nonnull final DataBroker dataBroker,
-                            @Nonnull final HashedWheelTimer hashedWheelTimer,
-                            final long maxTx,
-                            final long timerValue) {
+                            @Nonnull final KeyedInstanceIdentifier<Node, NodeKey> nodeII,
+                            @Nonnull final Registration managerRegistration) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.hashedWheelTimer = Preconditions.checkNotNull(hashedWheelTimer);
-        this.maxTx = maxTx;
+        this.nodeII = Preconditions.checkNotNull(nodeII);
+        this.managerRegistration = Preconditions.checkNotNull(managerRegistration);
+        this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
+        createTxChain(dataBroker);
+        LOG.debug("created txChainManager");
+    }
+
+    private void createTxChain(final DataBroker dataBroker) {
         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
-        nrOfActualTx = 0L;
-        this.timerValue = timerValue;
-        LOG.debug("created txChainManager with operation limit {}", maxTx);
     }
 
-    synchronized <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
-            final InstanceIdentifier<T> path, final T data) {
-        if (wTx == null) {
-            wTx = txChainFactory.newWriteOnlyTransaction();
-        }
-        wTx.put(store, path, data);
-        if ( ! counterIsEnabled) {
-            return;
-        }
-        countTxInAndCommit();
+    void initialSubmitWriteTransaction() {
+        enableSubmit();
+        submitWriteTransaction();
     }
 
-    synchronized <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
-                                                                final InstanceIdentifier<T> path) {
-        if (wTx == null) {
-            wTx = txChainFactory.newWriteOnlyTransaction();
-        }
-        wTx.delete(store, path);
-        if ( ! counterIsEnabled) {
-            return;
+    public synchronized boolean attemptToRegisterHandler(final ReadyForNewTransactionChainHandler readyForNewTransactionChainHandler) {
+        if (TransactionChainManagerStatus.SHUTTING_DOWN.equals(this.transactionChainManagerStatus)
+                && null == this.readyForNewTransactionChainHandler) {
+            this.readyForNewTransactionChainHandler = readyForNewTransactionChainHandler;
+            if (managerRegistration == null) {
+                this.readyForNewTransactionChainHandler.onReadyForNewTransactionChain();
+            }
+            return true;
+        } else {
+            return false;
         }
-        countTxInAndCommit();
     }
 
-    private void countTxInAndCommit() {
-        nrOfActualTx += 1L;
-        if (nrOfActualTx >= maxTx) {
-            submitTransaction();
+    boolean submitWriteTransaction() {
+        if (!submitIsEnabled) {
+            LOG.trace("transaction not committed - submit block issued");
+            return false;
         }
-    }
+        synchronized (txLock) {
+            if (wTx == null) {
+                LOG.trace("nothing to commit - submit returns true");
+                return true;
+            }
+            final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
+            Futures.addCallback(submitFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(Void result) {
+                    //no action required
+                }
 
-    synchronized void submitTransaction() {
-        if (wTx != null) {
-            LOG.trace("submitting transaction, counter: {}", nrOfActualTx);
-            wTx.submit();
+                @Override
+                public void onFailure(Throwable t) {
+                    if (t instanceof TransactionCommitFailedException) {
+                        LOG.error("Transaction commit failed. {}", t);
+                    } else {
+                        LOG.error("Exception during transaction submitting. {}", t);
+                    }
+                }
+            });
             wTx = null;
-            nrOfActualTx = 0L;
-        }
-        if (submitTaskTime != null && ! submitTaskTime.isExpired()) {
-            submitTaskTime.cancel();
         }
-        submitTaskTime = hashedWheelTimer.newTimeout(new TimerTask() {
-            @Override
-            public void run(final Timeout timeout) throws Exception {
-                submitTransaction();
-            }
-        }, timerValue, TimeUnit.MILLISECONDS);
+        return true;
     }
 
-    synchronized void enableCounter() {
-        counterIsEnabled = true;
+    <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
+                                                             final InstanceIdentifier<T> path) {
+        final WriteTransaction writeTx = getTransactionSafely();
+        writeTx.delete(store, path);
+    }
+
+    <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
+                                                   final InstanceIdentifier<T> path, final T data) {
+        final WriteTransaction writeTx = getTransactionSafely();
+        writeTx.put(store, path, data);
     }
 
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
-            final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-        LOG.debug("txChain failed -> recreating");
-        LOG.trace("reason", cause);
-        txChainFactory.close();
-        txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
+                                         final AsyncTransaction<?, ?> transaction, final Throwable cause) {
+        LOG.warn("txChain failed -> recreating", cause);
+        recreateTxChain();
     }
 
     @Override
     public void onTransactionChainSuccessful(final TransactionChain<?, ?> chain) {
         // NOOP - only yet, here is probably place for notification to get new WriteTransaction
     }
+
+    private void recreateTxChain() {
+        txChainFactory.close();
+        createTxChain(dataBroker);
+        synchronized (txLock) {
+            wTx = null;
+        }
+    }
+
+    private WriteTransaction getTransactionSafely() {
+        if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
+            synchronized (txLock) {
+                if (wTx == null) {
+                    wTx = txChainFactory.newWriteOnlyTransaction();
+                }
+            }
+        }
+        return wTx;
+    }
+
+    @VisibleForTesting
+    void enableSubmit() {
+        submitIsEnabled = true;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("Removing node {} from operational DS.", nodeII);
+        synchronized (txLock) {
+            final WriteTransaction writeTx = getTransactionSafely();
+            this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
+            writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
+            LOG.debug("Delete node {} from operational DS put to write transaction.", nodeII);
+            CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
+            LOG.debug("Delete node {} from operational DS write transaction submitted.", nodeII);
+            Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
+                @Override
+                public void onSuccess(final Void aVoid) {
+                    LOG.debug("Removing node {} from operational DS successful .", nodeII);
+                    notifyReadyForNewTransactionChainAndCloseFactory();
+                }
+
+                @Override
+                public void onFailure(final Throwable throwable) {
+                    LOG.info("Attempt to close transaction chain factory failed.", throwable);
+                    notifyReadyForNewTransactionChainAndCloseFactory();
+                }
+            });
+            wTx = null;
+        }
+    }
+
+    private void notifyReadyForNewTransactionChainAndCloseFactory() {
+        synchronized (this) {
+            try {
+                LOG.debug("Closing registration in manager.");
+                managerRegistration.close();
+            } catch (Exception e) {
+                LOG.warn("Failed to close transaction chain manager's registration.", e);
+            }
+            managerRegistration = null;
+            if (null != readyForNewTransactionChainHandler) {
+                readyForNewTransactionChainHandler.onReadyForNewTransactionChain();
+            }
+        }
+        txChainFactory.close();
+        LOG.debug("Transaction chain factory closed.");
+    }
+
+    public enum TransactionChainManagerStatus {
+        WORKING, SHUTTING_DOWN;
+    }
+
 }