BUG-7901: fix unsynchronized transaction access
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / device / TransactionChainManager.java
index 189525f71a6af76809bff0dffab06f65f6988aaf..aded9b26d9ee3d76f6222f9ae172934ddf872885 100644 (file)
@@ -8,29 +8,31 @@
 
 package org.opendaylight.openflowplugin.impl.device;
 
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CancellationException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
-import org.opendaylight.yangtools.concepts.Registration;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,51 +44,47 @@ import org.slf4j.LoggerFactory;
  * a {@link TransactionChainListener} and provide package protected methods for writeToTransaction
  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
  * and submitTransaction method (wrapped {@link WriteTransaction#submit()})
- *
- * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *         </p>
- *         Created: Apr 2, 2015
  */
 class TransactionChainManager implements TransactionChainListener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
+    private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
 
     private final Object txLock = new Object();
-
     private final DataBroker dataBroker;
-//    private final DeviceState deviceState;
+    private final String nodeId;
+    private LifecycleService lifecycleService;
+
     @GuardedBy("txLock")
     private WriteTransaction wTx;
     @GuardedBy("txLock")
     private BindingTransactionChain txChainFactory;
+    @GuardedBy("txLock")
     private boolean submitIsEnabled;
+    @GuardedBy("txLock")
+    private ListenableFuture<Void> lastSubmittedFuture;
 
-    public TransactionChainManagerStatus getTransactionChainManagerStatus() {
-        return transactionChainManagerStatus;
-    }
+    private volatile boolean initCommit;
 
     @GuardedBy("txLock")
-    private TransactionChainManagerStatus transactionChainManagerStatus;
-    private final KeyedInstanceIdentifier<Node, NodeKey> nodeII;
-    private volatile Registration managerRegistration;
+    private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
 
     TransactionChainManager(@Nonnull final DataBroker dataBroker,
-                            @Nonnull final KeyedInstanceIdentifier<Node, NodeKey> nodeII,
-                            @Nonnull final Registration managerRegistration) {
-        this.dataBroker = Preconditions.checkNotNull(dataBroker);
-        this.nodeII = Preconditions.checkNotNull(nodeII);
-        this.managerRegistration = Preconditions.checkNotNull(managerRegistration);
-        this.transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
-        createTxChain();
-        LOG.debug("created txChainManager");
+                            @Nonnull final DeviceInfo deviceInfo) {
+        this.dataBroker = dataBroker;
+        this.nodeId = deviceInfo.getNodeInstanceIdentifier().getKey().getId().getValue();
+        this.lastSubmittedFuture = Futures.immediateFuture(null);
     }
 
     @GuardedBy("txLock")
     private void createTxChain() {
-        if (txChainFactory != null) {
-            txChainFactory.close();
-        }
+        BindingTransactionChain txChainFactoryTemp = txChainFactory;
         txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
+        Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
+    }
+
+    public void setLifecycleService(final LifecycleService lifecycleService) {
+        this.lifecycleService = lifecycleService;
     }
 
     void initialSubmitWriteTransaction() {
@@ -98,21 +96,19 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * Method change status for TxChainManager to {@link TransactionChainManagerStatus#WORKING} and it has to make
      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
      * transactions. Call this method for MASTER role only.
-     * @param enableSubmit - marker to be sure a WriteTransaction submit is not blocking
-     *            (Blocking write is used for initialization part only)
      */
-    public void activateTransactionManager(final boolean enableSubmit) {
-//        LOG.trace("activetTransactionManager for node {} transaction submit is set to {}", deviceState.getNodeId(), enableSubmit);
+    void activateTransactionManager() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("activateTransactionManager for node {} transaction submit is set to {}", this.nodeId, submitIsEnabled);
+        }
         synchronized (txLock) {
-            if (TransactionChainManagerStatus.SLEEPING.equals(transactionChainManagerStatus)) {
-//                LOG.debug("Transaction Factory create {}", deviceState.getNodeId());
+            if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
                 Preconditions.checkState(txChainFactory == null, "TxChainFactory survive last close.");
                 Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
                 this.transactionChainManagerStatus = TransactionChainManagerStatus.WORKING;
+                this.submitIsEnabled = false;
+                this.initCommit = true;
                 createTxChain();
-                this.submitIsEnabled = enableSubmit;
-            } else {
-//                LOG.debug("Transaction is active {}", deviceState.getNodeId());
             }
         }
     }
@@ -121,87 +117,126 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * Method change status for TxChainManger to {@link TransactionChainManagerStatus#SLEEPING} and it unregisters
      * this class instance as {@link TransactionChainListener} so it broke a possibility to write something to DS.
      * Call this method for SLAVE only.
+     * @return Future
      */
-    public void deactivateTransactionManager() {
+    ListenableFuture<Void> deactivateTransactionManager() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
+        }
+        final ListenableFuture<Void> future;
         synchronized (txLock) {
-            if (TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus)) {
-//                LOG.debug("Submitting all transactions if we were in status WORKING for Node", deviceState.getNodeId());
-                submitWriteTransaction();
-                Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
-//                LOG.debug("Transaction Factory delete for Node {}", deviceState.getNodeId());
+            if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
                 transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
-                txChainFactory.close();
-                txChainFactory = null;
+                future = txChainShuttingDown();
+                Preconditions.checkState(wTx == null, "We have some unexpected WriteTransaction.");
+                Futures.addCallback(future, new FutureCallback<Void>() {
+                    @Override
+                    public void onSuccess(final Void result) {
+                        removeTxChainFactory();
+                    }
+
+                    @Override
+                    public void onFailure(final Throwable t) {
+                        removeTxChainFactory();
+                    }
+                });
+            } else {
+                // TODO : ignoring redundant deactivate invocation
+                future = Futures.immediateCheckedFuture(null);
             }
         }
+        return future;
+    }
+
+    private void removeTxChainFactory() {
+        Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
+        txChainFactory = null;
     }
 
     boolean submitWriteTransaction() {
-        if (!submitIsEnabled) {
-            LOG.trace("transaction not committed - submit block issued");
-            return false;
-        }
         synchronized (txLock) {
-            Preconditions.checkState(TransactionChainManagerStatus.WORKING.equals(transactionChainManagerStatus),
-                    "we have here Uncompleted Transaction for node {} and we are not MASTER", nodeII);
-            if (wTx == null) {
-                LOG.trace("nothing to commit - submit returns true");
+            if (!submitIsEnabled) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("transaction not committed - submit block issued");
+                }
+                return false;
+            }
+            if (Objects.isNull(wTx)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("nothing to commit - submit returns true");
+                }
                 return true;
             }
+            Preconditions.checkState(TransactionChainManagerStatus.WORKING == transactionChainManagerStatus,
+                    "we have here Uncompleted Transaction for node {} and we are not MASTER", this.nodeId);
             final CheckedFuture<Void, TransactionCommitFailedException> submitFuture = wTx.submit();
+            lastSubmittedFuture = submitFuture;
+            wTx = null;
+
             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
                 @Override
-                public void onSuccess(Void result) {
-                    //no action required
+                public void onSuccess(final Void result) {
+                    initCommit = false;
                 }
 
                 @Override
-                public void onFailure(Throwable t) {
+                public void onFailure(final Throwable t) {
                     if (t instanceof TransactionCommitFailedException) {
-                        LOG.error("Transaction commit failed. {}", t);
+                        LOG.error("Transaction commit failed. ", t);
                     } else {
-                        LOG.error("Exception during transaction submitting. {}", t);
+                        if (t instanceof CancellationException) {
+                            LOG.warn("Submit task was canceled");
+                            LOG.trace("Submit exception: ", t);
+                        } else {
+                            LOG.error("Exception during transaction submitting. ", t);
+                        }
+                    }
+                    if (initCommit) {
+                        Optional.ofNullable(lifecycleService).ifPresent(LifecycleService::closeConnection);
                     }
                 }
             });
-            wTx = null;
         }
         return true;
     }
 
-    @Deprecated
-    public void cancelWriteTransaction() {
-        // there is no cancel txn in ping-pong broker. So we need to drop the chain and recreate it.
-        // since the chain is created per device, there won't be any other txns other than ones we created.
-        recreateTxChain();
-    }
-
     <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
-                                                             final InstanceIdentifier<T> path) {
-        final WriteTransaction writeTx = getTransactionSafely();
-        if (writeTx != null) {
-            writeTx.delete(store, path);
-        } else {
-            LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", nodeII, path);
+                                                             final InstanceIdentifier<T> path){
+        synchronized (txLock) {
+            ensureTransaction();
+            if (wTx == null) {
+                LOG.debug("WriteTx is null for node {}. Delete {} was not realized.", this.nodeId, path);
+                throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+            }
+
+            wTx.delete(store, path);
         }
     }
 
     <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
-                                                   final InstanceIdentifier<T> path, final T data) {
-        final WriteTransaction writeTx = getTransactionSafely();
-        if (writeTx != null) {
-            writeTx.put(store, path, data);
-        } else {
-            LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", nodeII, path);
+                                                   final InstanceIdentifier<T> path,
+                                                   final T data,
+                                                   final boolean createParents){
+        synchronized (txLock) {
+            ensureTransaction();
+            if (wTx == null) {
+                LOG.debug("WriteTx is null for node {}. Write data for {} was not realized.", this.nodeId, path);
+                throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+            }
+
+            wTx.put(store, path, data, createParents);
         }
     }
 
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
-        LOG.warn("txChain failed -> recreating", cause);
-        if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WORKING)) {
-            recreateTxChain();
+        synchronized (txLock) {
+            if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
+                LOG.warn("Transaction chain failed, recreating chain due to ", cause);
+                createTxChain();
+                wTx = null;
+            }
         }
     }
 
@@ -210,116 +245,70 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         // NOOP
     }
 
-    private void recreateTxChain() {
-        synchronized (txLock) {
-            createTxChain();
-            wTx = null;
-        }
-    }
-
+    @GuardedBy("txLock")
     @Nullable
-    private WriteTransaction getTransactionSafely() {
-        if (wTx == null && !TransactionChainManagerStatus.SHUTTING_DOWN.equals(transactionChainManagerStatus)) {
-            synchronized (txLock) {
-                if (wTx == null && txChainFactory != null) {
-                    wTx = txChainFactory.newWriteOnlyTransaction();
-                }
-            }
+    private void ensureTransaction() {
+        if (wTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
+            && txChainFactory != null) {
+                wTx = txChainFactory.newWriteOnlyTransaction();
         }
-        return wTx;
     }
 
     @VisibleForTesting
     void enableSubmit() {
-        submitIsEnabled = true;
+        synchronized (txLock) {
+            /* !!!IMPORTANT: never set true without txChainFactory */
+            submitIsEnabled = txChainFactory != null;
+        }
     }
 
-    /**
-     * @deprecated will be removed
-     * @param removeDSNode
-     */
-    @Deprecated
-    public void cleanupPostClosure(final boolean removeDSNode) {
+    ListenableFuture<Void> shuttingDown() {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
+        }
         synchronized (txLock) {
-            if (removeDSNode) {
-                LOG.info("Removing from operational DS, node {} ", nodeII);
-                final WriteTransaction writeTx = getTransactionSafely();
-                this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
-                writeTx.delete(LogicalDatastoreType.OPERATIONAL, nodeII);
-                LOG.debug("Delete from operational DS put to write transaction. node {} ", nodeII);
-                final CheckedFuture<Void, TransactionCommitFailedException> submitsFuture = writeTx.submit();
-                LOG.info("Delete from operational DS write transaction submitted. node {} ", nodeII);
-                Futures.addCallback(submitsFuture, new FutureCallback<Void>() {
-                    @Override
-                    public void onSuccess(final Void aVoid) {
-                        LOG.debug("Removing from operational DS successful . node {} ", nodeII);
-                        notifyReadyForNewTransactionChainAndCloseFactory();
-                    }
-
-                    @Override
-                    public void onFailure(final Throwable throwable) {
-                        LOG.info("Attempt to close transaction chain factory failed.", throwable);
-                        notifyReadyForNewTransactionChainAndCloseFactory();
-                    }
-                });
-                wTx = null;
-            } else {
-                if (transactionChainManagerStatus.equals(TransactionChainManagerStatus.WAITING_TO_BE_SHUT)) {
-                    LOG.info("This is a disconnect, but not the last node,transactionChainManagerStatus={}, node:{}",
-                            transactionChainManagerStatus, nodeII);
-                    // a disconnect has happened, but this is not the last node in the cluster, so just close the chain
-                    this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
-                    notifyReadyForNewTransactionChainAndCloseFactory();
-                    wTx = null;
-                } else {
-                    LOG.trace("This is not a disconnect, hence we are not closing txnChainMgr,transactionChainManagerStatus={}, node:{}",
-                            transactionChainManagerStatus, nodeII);
-                }
-
-            }
+            this.transactionChainManagerStatus = TransactionChainManagerStatus.SHUTTING_DOWN;
+            return txChainShuttingDown();
         }
     }
 
-    /**
-     * @deprecated will be removed
-     */
-    @Deprecated
-    private void notifyReadyForNewTransactionChainAndCloseFactory() {
-        synchronized (this) {
-            try {
-                LOG.info("Closing registration in manager.node:{} ", nodeII);
-                if (managerRegistration != null) {
-                    managerRegistration.close();
-                }
-            } catch (final Exception e) {
-                LOG.warn("Failed to close transaction chain manager's registration..node:{} ", nodeII, e);
+    @GuardedBy("txLock")
+    private ListenableFuture<Void> txChainShuttingDown() {
+        submitIsEnabled = false;
+        ListenableFuture<Void> future;
+        if (txChainFactory == null) {
+            // stay with actual thread
+            future = Futures.immediateCheckedFuture(null);
+        } else if (wTx == null) {
+            // hijack md-sal thread
+            future = lastSubmittedFuture;
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Submitting all transactions for Node {}", this.nodeId);
             }
-            managerRegistration = null;
+            // hijack md-sal thread
+            future = wTx.submit();
+            wTx = null;
         }
-        txChainFactory.close();
-        LOG.info("Transaction chain factory closed. node:{} ", nodeII);
+        return future;
     }
 
     @Override
     public void close() {
-        LOG.info("Setting transactionChainManagerStatus to WAITING_TO_BE_SHUT, will wait for ownershipservice to notify", nodeII);
-        // we can finish in initial phase
-        initialSubmitWriteTransaction();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
+        }
         synchronized (txLock) {
-            if (txChainFactory != null) {
-                txChainFactory.close();
-                txChainFactory = null;
-            }
-            this.transactionChainManagerStatus = TransactionChainManagerStatus.WAITING_TO_BE_SHUT;
+            removeTxChainFactory();
         }
-        Preconditions.checkState(wTx == null);
-        Preconditions.checkState(txChainFactory == null);
     }
 
-    public enum TransactionChainManagerStatus {
-        WORKING, SLEEPING, WAITING_TO_BE_SHUT, SHUTTING_DOWN;
+    private enum TransactionChainManagerStatus {
+        /** txChainManager is sleeping - is not active (SLAVE or default init value) */
+        WORKING,
+        /** txChainManager is working - is active (MASTER) */
+        SLEEPING,
+        /** txChainManager is trying to be closed - device disconnecting */
+        SHUTTING_DOWN;
     }
-
-
-
 }