Do not leak internal state
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
index 80e8dae4a7daf15dab23429fee5305d00d6afcaa..657aebaf072d119a63cfbccdfcdd057efdd57713 100644 (file)
@@ -9,22 +9,31 @@
 package org.opendaylight.openflowplugin.impl.device;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.FutureFallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
@@ -61,10 +70,6 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     private BindingTransactionChain txChainFactory;
     private boolean submitIsEnabled;
 
-    public TransactionChainManagerStatus getTransactionChainManagerStatus() {
-        return transactionChainManagerStatus;
-    }
-
     @GuardedBy("txLock")
     private TransactionChainManagerStatus transactionChainManagerStatus;
     private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
@@ -97,7 +102,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * transactions. Call this method for MASTER role only.
      */
     public void activateTransactionManager() {
-        LOG.trace("activetTransactionManaager for node {} transaction submit is set to {}", deviceState.getNodeId());
+        LOG.trace("activateTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), this.submitIsEnabled);
         synchronized (txLock) {
             if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
                 LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
@@ -115,19 +120,36 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
      * Call this method for SLAVE only.
+     * @return Future
      */
-    public void deactivateTransactionManager() {
+    public ListenableFuture<Void> deactivateTransactionManager() {
+        final ListenableFuture<Void> future;
         synchronized (txLock) {
             if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
                 LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
-                submitWriteTransaction();
+                transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
+                future = txChainShuttingDown();
                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
                 LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId());
-                transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
-                txChainFactory.close();
-                txChainFactory = null;
+                Futures.addCallback(future, new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        txChainFactory.close();
+                        txChainFactory = null;
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable t) {
+                        txChainFactory.close();
+                        txChainFactory = null;
+                    }
+                });
+            } else {
+                // TODO : ignoring redundant deactivate invocation
+                future = Futures.immediateCheckedFuture(null);
             }
         }
+        return future;
     }
 
     boolean submitWriteTransaction() {
@@ -223,26 +245,78 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         submitIsEnabled = true;
     }
 
-    CheckedFuture<Void, TransactionCommitFailedException> shuttingDown() {
+    ListenableFuture<Void> shuttingDown() {
         LOG.debug("TxManager is going SUTTING_DOWN for node {}", nodeII);
-        CheckedFuture<Void, TransactionCommitFailedException> future;
+        ListenableFuture<Void> future;
         synchronized (txLock) {
             this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
-            if (txChainFactory == null) {
-                // stay with actual thread
-                future = Futures.immediateCheckedFuture(null);
-            } else {
-                // hijack md-sal thread
-                if (wTx == null) {
-                    wTx = txChainFactory.newWriteOnlyTransaction();
+            future = txChainShuttingDown();
+        }
+        return future;
+    }
+
+    private ListenableFuture<Void> txChainShuttingDown() {
+        ListenableFuture<Void> future;
+        if (txChainFactory == null) {
+            // stay with actual thread
+            future = Futures.immediateCheckedFuture(null);
+        } else {
+            // hijack md-sal thread
+            if (wTx == null) {
+                wTx = txChainFactory.newWriteOnlyTransaction();
+            }
+            final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId());
+            wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
+            future = wTx.submit();
+            wTx = null;
+
+            future = Futures.withFallback(future, new FutureFallback<Void>() {
+
+                @Override
+                public ListenableFuture<Void> create(final Throwable t) throws Exception {
+                    LOG.debug("Last ShuttingDown Transaction for node {} fail. Put empty FlowCapableNode",
+                            deviceState.getNodeId());
+                    final ReadOnlyTransaction readWriteTx = dataBroker.newReadOnlyTransaction();
+                    final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readFlowNode = readWriteTx
+                            .read(LogicalDatastoreType.OPERATIONAL, nodeII.augmentation(FlowCapableNode.class));
+                    return Futures.transform(readFlowNode, new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+
+                        @Override
+                        public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) {
+                            if (input.isPresent()) {
+                                final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction();
+                                nodeBuilder.addAugmentation(FlowCapableNode.class, new FlowCapableNodeBuilder().build());
+                                delWtx.put(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
+                                return delWtx.submit();
+                            }
+                            return Futures.immediateFuture(null);
+                        }
+                    });
                 }
-                final NodeBuilder nodeBuilder = new NodeBuilder().setId(deviceState.getNodeId());
-                wTx.merge(LogicalDatastoreType.OPERATIONAL, nodeII, nodeBuilder.build());
-                future = wTx.submit();
+            });
+        }
+        return future;
+    }
+
+    /**
+     * Transaction could be close if we are not submit anything. We have property submitIsEnable what
+     * could protect us for check it is NEW transaction from chain and we are able close everything
+     * safely.
+     */
+    void clearUnsubmittedTransaction() {
+        LOG.debug("Cleaning unsubmited Transaction for Device {}", deviceState.getNodeId());
+        Verify.verify(!submitIsEnabled, "We are not able clean TxChain {}", deviceState.getNodeId());
+        synchronized (txLock) {
+            if (wTx != null) {
+                wTx.cancel();
                 wTx = null;
             }
+            if (txChainFactory != null) {
+                txChainFactory.close();
+                txChainFactory = null;
+            }
+            transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
         }
-        return future;
     }
 
     @Override
@@ -259,7 +333,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         Preconditions.checkState(txChainFactory == null);
     }
 
-    public enum TransactionChainManagerStatus {
+    private enum TransactionChainManagerStatus {
         /** txChainManager is sleeping - is not active (SLAVE or default init value) */
         WORKING,
         /** txChainManager is working - is active (MASTER) */