Fix transaction manager closing.
[openflowplugin.git] / openflowplugin-common / src / main / java / org / opendaylight / openflowplugin / common / txchain / TransactionChainManager.java
similarity index 73%
rename from openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/TransactionChainManager.java
rename to openflowplugin-common/src/main/java/org/opendaylight/openflowplugin/common/txchain/TransactionChainManager.java
index e06fbd2f2910ed6224ee8cd3119443282e6a3a15..4a2051cfe263319aca4448e8a43dac9ac33ad4dc 100644 (file)
@@ -6,10 +6,10 @@
  * and is available at http://www.eclipse.org/legal/epl-v10.html
  */
 
-package org.opendaylight.openflowplugin.impl.device;
+package org.opendaylight.openflowplugin.common.txchain;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -24,14 +24,15 @@ import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.md.sal.binding.api.BindingTransactionChain;
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.ReadWriteTransaction;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChain;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
@@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory;
  * method (wrapped {@link WriteTransaction#put(LogicalDatastoreType, InstanceIdentifier, DataObject)})
  * and submitTransaction method (wrapped {@link WriteTransaction#submit()}).
  */
-class TransactionChainManager implements TransactionChainListener, AutoCloseable {
+public class TransactionChainManager implements TransactionChainListener, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(TransactionChainManager.class);
     private static final String CANNOT_WRITE_INTO_TRANSACTION = "Cannot write into transaction.";
@@ -54,9 +55,9 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     private final String nodeId;
 
     @GuardedBy("txLock")
-    private WriteTransaction writeTx;
+    private ReadWriteTransaction writeTx;
     @GuardedBy("txLock")
-    private BindingTransactionChain txChainFactory;
+    private BindingTransactionChain transactionChain;
     @GuardedBy("txLock")
     private boolean submitIsEnabled;
     @GuardedBy("txLock")
@@ -67,23 +68,23 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     @GuardedBy("txLock")
     private TransactionChainManagerStatus transactionChainManagerStatus = TransactionChainManagerStatus.SLEEPING;
 
-    TransactionChainManager(@Nonnull final DataBroker dataBroker,
-                            @Nonnull final DeviceInfo deviceInfo) {
+    public TransactionChainManager(@Nonnull final DataBroker dataBroker,
+                                   @Nonnull final String deviceIdentifier) {
         this.dataBroker = dataBroker;
-        this.nodeId = deviceInfo.toString();
+        this.nodeId = deviceIdentifier;
         this.lastSubmittedFuture = Futures.immediateFuture(null);
     }
 
     @GuardedBy("txLock")
     private void createTxChain() {
-        BindingTransactionChain txChainFactoryTemp = txChainFactory;
-        txChainFactory = dataBroker.createTransactionChain(TransactionChainManager.this);
+        BindingTransactionChain txChainFactoryTemp = transactionChain;
+        transactionChain = dataBroker.createTransactionChain(TransactionChainManager.this);
         Optional.ofNullable(txChainFactoryTemp).ifPresent(TransactionChain::close);
     }
 
-    boolean initialSubmitWriteTransaction() {
+    public boolean initialSubmitWriteTransaction() {
         enableSubmit();
-        return submitWriteTransaction();
+        return submitTransaction();
     }
 
     /**
@@ -91,14 +92,14 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * registration for this class instance as {@link TransactionChainListener} to provide possibility a make DS
      * transactions. Call this method for MASTER role only.
      */
-    void activateTransactionManager() {
+    public void activateTransactionManager() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("activateTransactionManager for node {} transaction submit is set to {}",
                     this.nodeId, submitIsEnabled);
         }
         synchronized (txLock) {
             if (TransactionChainManagerStatus.SLEEPING == transactionChainManagerStatus) {
-                Preconditions.checkState(txChainFactory == null,
+                Preconditions.checkState(transactionChain == null,
                         "TxChainFactory survive last close.");
                 Preconditions.checkState(writeTx == null,
                         "We have some unexpected WriteTransaction.");
@@ -116,7 +117,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
      * Call this method for SLAVE only.
      * @return Future
      */
-    ListenableFuture<Void> deactivateTransactionManager() {
+    public ListenableFuture<Void> deactivateTransactionManager() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("deactivateTransactionManager for node {}", this.nodeId);
         }
@@ -130,12 +131,12 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
                 Futures.addCallback(future, new FutureCallback<Void>() {
                     @Override
                     public void onSuccess(final Void result) {
-                        removeTxChainFactory();
+                        closeTransactionChain();
                     }
 
                     @Override
-                    public void onFailure(final Throwable throwable) {
-                        removeTxChainFactory();
+                    public void onFailure(@Nonnull final Throwable t) {
+                        closeTransactionChain();
                     }
                 });
             } else {
@@ -146,12 +147,16 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         return future;
     }
 
-    private void removeTxChainFactory() {
-        Optional.ofNullable(txChainFactory).ifPresent(TransactionChain::close);
-        txChainFactory = null;
+    private void closeTransactionChain() {
+        if (writeTx != null) {
+            writeTx.cancel();
+            writeTx = null;
+        }
+        Optional.ofNullable(transactionChain).ifPresent(TransactionChain::close);
+        transactionChain = null;
     }
 
-    boolean submitWriteTransaction() {
+    public boolean submitTransaction() {
         synchronized (txLock) {
             if (!submitIsEnabled) {
                 if (LOG.isTraceEnabled()) {
@@ -207,8 +212,8 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         return true;
     }
 
-    <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
-                                                             final InstanceIdentifier<T> path) {
+    public <T extends DataObject> void addDeleteOperationTotTxChain(final LogicalDatastoreType store,
+                                                                    final InstanceIdentifier<T> path){
         synchronized (txLock) {
             ensureTransaction();
             if (writeTx == null) {
@@ -220,10 +225,10 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         }
     }
 
-    <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
-                                                   final InstanceIdentifier<T> path,
-                                                   final T data,
-                                                   final boolean createParents) {
+    public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
+                                                          final InstanceIdentifier<T> path,
+                                                          final T data,
+                                                          final boolean createParents){
         synchronized (txLock) {
             ensureTransaction();
             if (writeTx == null) {
@@ -235,12 +240,43 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         }
     }
 
+    public <T extends DataObject> void mergeToTransaction(final LogicalDatastoreType store,
+                                                          final InstanceIdentifier<T> path,
+                                                          final T data,
+                                                          final boolean createParents){
+        synchronized (txLock) {
+            ensureTransaction();
+            if (writeTx == null) {
+                LOG.debug("WriteTx is null for node {}. Merge data for {} was not realized.", this.nodeId, path);
+                throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+            }
+
+            writeTx.merge(store, path, data, createParents);
+        }
+    }
+
+    public <T extends DataObject> CheckedFuture<com.google.common.base.Optional<T>, ReadFailedException>
+    readFromTransaction(final LogicalDatastoreType store,
+                        final InstanceIdentifier<T> path){
+        synchronized (txLock) {
+            ensureTransaction();
+            if (writeTx == null) {
+                LOG.debug("WriteTx is null for node {}. Read data for {} was not realized.", this.nodeId, path);
+                throw new TransactionChainClosedException(CANNOT_WRITE_INTO_TRANSACTION);
+            }
+
+            return writeTx.read(store, path);
+        }
+    }
+
     @Override
     public void onTransactionChainFailed(final TransactionChain<?, ?> chain,
                                          final AsyncTransaction<?, ?> transaction, final Throwable cause) {
         synchronized (txLock) {
-            if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus) {
+            if (TransactionChainManagerStatus.WORKING == transactionChainManagerStatus &&
+                    chain.equals(this.transactionChain)) {
                 LOG.warn("Transaction chain failed, recreating chain due to ", cause);
+                closeTransactionChain();
                 createTxChain();
                 writeTx = null;
             }
@@ -253,22 +289,21 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
     }
 
     @GuardedBy("txLock")
-    private void ensureTransaction() {
+   private void ensureTransaction() {
         if (writeTx == null && TransactionChainManagerStatus.WORKING == transactionChainManagerStatus
-                && txChainFactory != null) {
-            writeTx = txChainFactory.newWriteOnlyTransaction();
+            && transactionChain != null) {
+                writeTx = transactionChain.newReadWriteTransaction();
         }
     }
 
-    @VisibleForTesting
-    void enableSubmit() {
+    private void enableSubmit() {
         synchronized (txLock) {
-            /* !!!IMPORTANT: never set true without txChainFactory */
-            submitIsEnabled = txChainFactory != null;
+            /* !!!IMPORTANT: never set true without transactionChain */
+            submitIsEnabled = transactionChain != null;
         }
     }
 
-    ListenableFuture<Void> shuttingDown() {
+    public ListenableFuture<Void> shuttingDown() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("TxManager is going SHUTTING_DOWN for node {}", this.nodeId);
         }
@@ -284,7 +319,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
         submitIsEnabled = false;
         ListenableFuture<Void> future;
 
-        if (!wasSubmitEnabled || txChainFactory == null) {
+        if (!wasSubmitEnabled || transactionChain == null) {
             // stay with actual thread
             future = Futures.immediateCheckedFuture(null);
 
@@ -313,7 +348,7 @@ class TransactionChainManager implements TransactionChainListener, AutoCloseable
             LOG.debug("Setting transactionChainManagerStatus to SHUTTING_DOWN for {}", this.nodeId);
         }
         synchronized (txLock) {
-            removeTxChainFactory();
+            closeTransactionChain();
         }
     }