Do not leak internal state
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
index 189525f71a6af76809bff0dffab06f65f6988aaf..657aebaf072d119a63cfbccdfcdd057efdd57713 100644 (file)
@@ -8,26 +8,35 @@
 
 package org.opendaylight.openflowplugin.impl.device;
 
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
@@ -54,30 +63,23 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     private final Object txLock = new Object();
 
     private final DataBroker dataBroker;
-//    private final DeviceState deviceState;
+    private final DeviceState deviceState;
     @GuardedBy("txLock")
     private WriteTransaction wTx;
     @GuardedBy("txLock")
     private BindingTransactionChain txChainFactory;
     private boolean submitIsEnabled;
 
-    public TransactionChainManagerStatus getTransactionChainManagerStatus() {
-        return transactionChainManagerStatus;
-    }
-
     @GuardedBy("txLock")
     private TransactionChainManagerStatus transactionChainManagerStatus;
     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
-    private volatile Registration managerRegistration;
 
     TransactionChainManager(@Nonnull final DataBroker dataBroker,
-                            @Nonnull final KeyedInstanceIdentifier<Node, NodeKey> nodeII,
-                            @Nonnull final Registration managerRegistration) {
+                            @Nonnull final DeviceState deviceState) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.nodeII = Preconditions.checkNotNull(nodeII);
-        this.managerRegistration = Preconditions.checkNotNull(managerRegistration);
+        this.deviceState = Preconditions.checkNotNull(deviceState);
+        this.nodeII = Preconditions.checkNotNull(deviceState.getNodeInstanceIdentifier());
         this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
-        createTxChain();
         LOG.debug("created txChainManager");
     }
 
@@ -98,21 +100,18 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
      * transactions. Call this method for MASTER role only.
-     * @param enableSubmit - marker to be sure a WriteTransaction submit is not blocking
-     *            (Blocking write is used for initialization part only)
      */
-    public void activateTransactionManager(final boolean enableSubmit) {
-//        LOG.trace("activetTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), enableSubmit);
+    public void activateTransactionManager() {
+        LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), this.submitIsEnabled);
         synchronized (txLock) {
             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
-//                LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
+                LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
                 createTxChain();
-                this.submitIsEnabled = enableSubmit;
             } else {
-//                LOG.debug("Transaction is active {}", deviceState.getNodeId());
+                LOG.debug("Transaction is active {}", deviceState.getNodeId());
             }
         }
     }
@@ -121,19 +120,36 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
      * Call this method for SLAVE only.
+     * @return Future
      */
-    public void deactivateTransactionManager() {
+    public ListenableFuture<Void> deactivateTransactionManager() {
+        final ListenableFuture<Void> future;
         synchronized (txLock) {
             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
-//                LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
-                submitWriteTransaction();
-                Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
-//                LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId());
+                LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
-                txChainFactory.close();
-                txChainFactory = null;
+                future = txChainShuttingDown();
+                Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
+                LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId());
+                Futures.addCallback(future, new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        txChainFactory.close();
+                        txChainFactory = null;
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable t) {
+                        txChainFactory.close();
+                        txChainFactory = null;
+                    }
+                });
+            } else {
+                // TODO : ignoring redundant deactivate invocation
+                future = Futures.immediateCheckedFuture(null);
             }
         }
+        return future;
     }
 
     boolean submitWriteTransaction() {
@@ -142,21 +158,21 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
             return false;
         }
         synchronized (txLock) {
-            Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
-                    "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
             if (wTx == null) {
                 LOG.trace("nothing to commit - submit returns true");
                 return true;
             }
+            Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
+                    "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void result) {
+                public void onSuccess(final Void result) {
                     //no action required
                 }
 
                 @Override
-                public void onFailure(Throwable t) {
+                public void onFailure(final Throwable t) {
                     if (t instanceof TransactionCommitFailedException) {
                         LOG.error("Transaction commit failed. {}", t);
                     } else {
@@ -169,13 +185,6 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         return true;
     }
 
-    @Deprecated
-    public void cancelWriteTransaction() {
-        // there is no cancel txn in ping-pong broker. So we need to drop the chain and recreate it.
-        // since the chain is created per device, there won't be any other txns other than ones we created.
-        recreateTxChain();
-    }
-
     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
                                                              final InstanceIdentifier<T> path) {
         final WriteTransaction writeTx = getTransactionSafely();
@@ -199,8 +208,8 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-        LOG.warn("txChain failed -> recreating", cause);
         if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
+            LOG.warn("txChain failed -> recreating", cause);
             recreateTxChain();
         }
     }
@@ -219,10 +228,12 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
 
     @Nullable
     private WriteTransaction getTransactionSafely() {
-        if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
+        if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
             synchronized (txLock) {
-                if (wTx == null && txChainFactory != null) {
-                    wTx = txChainFactory.newWriteOnlyTransaction();
+                if (wTx == null && TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
+                    if (wTx == null && txChainFactory != null) {
+                        wTx = txChainFactory.newWriteOnlyTransaction();
+                    }
                 }
             }
         }
@@ -234,92 +245,100 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         submitIsEnabled = true;
     }
 
-    /**
-     * @deprecated will be removed
-     * @param removeDSNode
-     */
-    @Deprecated
-    public void cleanupPostClosure(final boolean removeDSNode) {
+    ListenableFuture<Void> shuttingDown() {
+        LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII);
+        ListenableFuture<Void> future;
         synchronized (txLock) {
-            if (removeDSNode) {
-                LOG.info("Removing from operational DS, node {} ", nodeII);
-                final WriteTransaction writeTx = getTransactionSafely();
-                this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
-                writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
-                LOG.debug("Delete from operational DS put to write transaction. node {} ", nodeII);
-                final CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
-                LOG.info("Delete from operational DS write transaction submitted. node {} ", nodeII);
-                Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess(final Void aVoid) {
-                        LOG.debug("Removing from operational DS successful . node {} ", nodeII);
-                        notifyReadyForNewTransactionChainAndCloseFactory();
-                    }
-
-                    @Override
-                    public void onFailure(final Throwable throwable) {
-                        LOG.info("Attempt to close transaction chain factory failed.", throwable);
-                        notifyReadyForNewTransactionChainAndCloseFactory();
-                    }
-                });
-                wTx = null;
-            } else {
-                if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WAITING_TO_BE_SHUT)) {
-                    LOG.info("This is a disconnect, but not the last node,transactionChainManagerStatus={}, node:{}",
-                            transactionChainManagerStatus, nodeII);
-                    // a disconnect has happened, but this is not the last node in the cluster, so just close the chain
-                    this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
-                    notifyReadyForNewTransactionChainAndCloseFactory();
-                    wTx = null;
-                } else {
-                    LOG.trace("This is not a disconnect, hence we are not closing txnChainMgr,transactionChainManagerStatus={}, node:{}",
-                            transactionChainManagerStatus, nodeII);
-                }
+            this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
+            future = txChainShuttingDown();
+        }
+        return future;
+    }
 
+    private ListenableFuture<Void> txChainShuttingDown() {
+        ListenableFuture<Void> future;
+        if (txChainFactory == null) {
+            // stay with actual thread
+            future = Futures.immediateCheckedFuture(null);
+        } else {
+            // hijack md-sal thread
+            if (wTx == null) {
+                wTx = txChainFactory.newWriteOnlyTransaction();
             }
+            final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId());
+            wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
+            future = wTx.submit();
+            wTx = null;
+
+            future = Futures.withFallback(future, new FutureFallback<Void>() {
+
+                @Override
+                public ListenableFuture<Void> create(final Throwable t) throws Exception {
+                    LOG.debug("Last ShuttingDown Transaction for node {} fail. Put empty FlowCapableNode",
+                            deviceState.getNodeId());
+                    final ReadOnlyTransaction readWriteTx = dataBroker.newReadOnlyTransaction();
+                    final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readFlowNode = readWriteTx
+                            .read(LogicalDatastoreType.OPERATIONAL, nodeII.augmentation(FlowCapableNode.class));
+                    return Futures.transform(readFlowNode, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+
+                        @Override
+                        public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) {
+                            if (input.isPresent()) {
+                                final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
+                                nodeBuilder.addAugmentation(FlowCapableNode.class, new FlowCapableNodeBuilder().build());
+                                delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
+                                return delWtx.submit();
+                            }
+                            return Futures.immediateFuture(null);
+                        }
+                    });
+                }
+            });
         }
+        return future;
     }
 
     /**
-     * @deprecated will be removed
+     * Transaction could be close if we are not submit anything. We have property submitIsEnable what
+     * could protect us for check it is NEW transaction from chain and we are able close everything
+     * safely.
      */
-    @Deprecated
-    private void notifyReadyForNewTransactionChainAndCloseFactory() {
-        synchronized (this) {
-            try {
-                LOG.info("Closing registration in manager.node:{} ", nodeII);
-                if (managerRegistration != null) {
-                    managerRegistration.close();
-                }
-            } catch (final Exception e) {
-                LOG.warn("Failed to close transaction chain manager's registration..node:{} ", nodeII, e);
+    void clearUnsubmittedTransaction() {
+        LOG.debug("Cleaning unsubmited Transaction for Device {}", deviceState.getNodeId());
+        Verify.verify(!submitIsEnabled, "We are not able clean TxChain {}", deviceState.getNodeId());
+        synchronized (txLock) {
+            if (wTx != null) {
+                wTx.cancel();
+                wTx = null;
             }
-            managerRegistration = null;
+            if (txChainFactory != null) {
+                txChainFactory.close();
+                txChainFactory = null;
+            }
+            transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
         }
-        txChainFactory.close();
-        LOG.info("Transaction chain factory closed. node:{} ", nodeII);
     }
 
     @Override
     public void close() {
-        LOG.info("Setting transactionChainManagerStatus to WAITING_TO_BE_SHUT, will wait for ownershipservice to notify", nodeII);
-        // we can finish in initial phase
-        initialSubmitWriteTransaction();
+        LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN, will wait for ownershipservice to notify", nodeII);
+        Preconditions.checkState(TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus));
+        Preconditions.checkState(wTx == null);
         synchronized (txLock) {
             if (txChainFactory != null) {
                 txChainFactory.close();
                 txChainFactory = null;
             }
-            this.transactionChainManagerStatus = TransactionChainManagerStatus.WAITING_TO_BE_SHUT;
         }
-        Preconditions.checkState(wTx == null);
         Preconditions.checkState(txChainFactory == null);
     }
 
-    public enum TransactionChainManagerStatus {
-        WORKING, SLEEPING, WAITING_TO_BE_SHUT, SHUTTING_DOWN;
+    private enum TransactionChainManagerStatus {
+        /** txChainManager is sleeping - is not active (SLAVE or default init value) */
+        WORKING,
+        /** txChainManager is working - is active (MASTER) */
+        SLEEPING,
+        /** txChainManager is trying to be closed - device disconnecting */
+        SHUTTING_DOWN;
     }
-
-
-
 }