Bug 7295 - Incorrect handling of device transactions in clustered setting 06/52006/1
authorJakub Morvay <jmorvay@cisco.com>
Tue, 14 Feb 2017 16:57:34 +0000 (17:57 +0100)
committerJakub Morvay <jmorvay@cisco.com>
Fri, 17 Feb 2017 15:17:50 +0000 (15:17 +0000)
Transactions on mounted device in clustered setting are delegated to
leader node and leader node actually configures the device.

Implementation, hovewer, depens only on one transaction at a time.
Delegated calls can interleave together, for example one node can submit
transaction of the other node.

Explicitly open a new tranasction before applying any operations to it.

Change-Id: I0c5715a9cd089dd809b071d411884e71e9793849
Signed-off-by: Jakub Morvay <jmorvay@cisco.com>
(cherry picked from commit da599396d0581c113dba72b33f811c2a0d18ec9b)

netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/NetconfDOMTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/api/RemoteOperationTxProcessor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/MasterSalFacade.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/RemoteOperationTxProcessorImpl.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/actors/NetconfNodeActor.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfMasterDOMTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfProxyDOMTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/tx/NetconfWriteOnlyTransaction.java
netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/OpenTransaction.java [new file with mode: 0644]
netconf/netconf-topology-singleton/src/test/java/org/opendaylight/netconf/topology/singleton/impl/tx/WriteOnlyTransactionTest.java

index ec9c717440b6c6b97071584d65fef709db49365b..58fe9b38f37d166e08465df6d747929bfa5d9338 100644 (file)
@@ -17,8 +17,21 @@ import scala.concurrent.Future;
 /**
  * Provides API for all operations of read and write transactions
  */
+// TODO we should separate between read tx and write tx
 public interface NetconfDOMTransaction {
 
+
+    /**
+     * Opens a new transaction. Transactions have to be opened before applying
+     * any operations on them. Previous transaction has to be either submitted
+     * ({@link #submit()} was invoked) or canceled ({@link #cancel()} was
+     * invoked.
+     *
+     * @throws IllegalStateException
+     *             if the previous transaction was not SUBMITTED or CANCELLED.
+     */
+    void openTransaction();
+
     /**
      * Read data from particular data-store
      * @param store data-store type
index 9023847da30718106a9f49ae363fec10f2912393..5d1c893f48aebf3863044606bd06503adf2b01ab 100644 (file)
@@ -19,6 +19,11 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  */
 public interface RemoteOperationTxProcessor {
 
+    /**
+     * Opens a new transaction.
+     */
+    void doOpenTransaction(ActorRef recipient, ActorRef sender);
+
     /**
      * Delete node in particular data-store in path
      * @param store data-store type
index e4911acbf72a974251ddcc8f566c1b3c4c5a8286..36ab07bbea40f111e952b4a3bca7ac23eab352e8 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.netconf.sal.connect.netconf.sal.NetconfDeviceSalProvider
 import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
 import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
 import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfMasterDOMTransaction;
+import org.opendaylight.netconf.topology.singleton.impl.tx.NetconfProxyDOMTransaction;
 import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
 import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
 import org.opendaylight.yangtools.yang.common.SimpleDateFormatUtil;
@@ -137,8 +138,13 @@ class MasterSalFacade implements AutoCloseable, RemoteDeviceHandler<NetconfSessi
                 new NetconfMasterDOMTransaction(id, remoteSchemaContext, deviceRpc, netconfSessionPreferences);
         deviceDataBroker =
                 new NetconfDOMDataBroker(actorSystem, id, masterDOMTransactions);
+        // We need to create NetconfProxyDOMTransaction so accessing mountpoint
+        // on leader node would be same as on follower node
+        final NetconfDOMTransaction proxyDOMTransation =
+                new NetconfProxyDOMTransaction(id, actorSystem, masterActorRef);
+        final NetconfDOMDataBroker proxyDataBroker = new NetconfDOMDataBroker(actorSystem, id, proxyDOMTransation);
         salProvider.getMountInstance()
-                .onTopologyDeviceConnected(remoteSchemaContext, deviceDataBroker, deviceRpc, notificationService);
+                .onTopologyDeviceConnected(remoteSchemaContext, proxyDataBroker, deviceRpc, notificationService);
     }
 
     private Future<Object> sendInitialDataToActor() {
index db5e3e91b5a2181a9194fcc760e58e89c711a249..f19d0d535b98bf5afffe37bdaaa01979024afdbc 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.netconf.topology.singleton.impl;
 
 import akka.actor.ActorRef;
+import akka.actor.Status;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
@@ -40,12 +41,28 @@ public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcesso
     private DOMDataWriteTransaction writeTx;
     private DOMDataReadOnlyTransaction readTx;
 
+    private ActorRef currentUser = null;
+
     public RemoteOperationTxProcessorImpl(final DOMDataBroker dataBroker, final RemoteDeviceId id) {
         this.dataBroker = dataBroker;
         this.id = id;
         this.readTx = dataBroker.newReadOnlyTransaction();
     }
 
+    @Override
+    public void doOpenTransaction(ActorRef recipient, ActorRef sender) {
+        if (currentUser != null) {
+            LOG.error("{}: Opening a new transaction for {} failed.", id, recipient);
+            recipient.tell(new Status.Failure(
+                    new IllegalStateException("Transaction is already opened for another user")), recipient);
+            return;
+        }
+
+        LOG.debug("{}: Opening a new transaction for {}", id, recipient);
+        currentUser = recipient;
+        recipient.tell(new Status.Success(null), sender);
+    }
+
     @Override
     public void doDelete(final LogicalDatastoreType store, final YangInstanceIdentifier path) {
         if (writeTx == null) {
@@ -56,6 +73,7 @@ public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcesso
 
     @Override
     public void doSubmit(final ActorRef recipient, final ActorRef sender) {
+        currentUser = null;
         if (writeTx != null) {
             CheckedFuture<Void, TransactionCommitFailedException> submitFuture = writeTx.submit();
             Futures.addCallback(submitFuture, new FutureCallback<Void>() {
@@ -77,11 +95,13 @@ public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcesso
 
     @Override
     public void doCancel(final ActorRef recipient, final ActorRef sender) {
+        currentUser = null;
         boolean cancel = false;
         if (writeTx != null) {
             cancel = writeTx.cancel();
         }
         recipient.tell(cancel, sender);
+
     }
 
     @Override
@@ -148,6 +168,7 @@ public class RemoteOperationTxProcessorImpl implements RemoteOperationTxProcesso
 
     @Override
     public void close() throws Exception {
+        currentUser = null;
         if (readTx != null) {
             readTx.close();
         }
index e0fe6cc387ada90a917ae8278a440c1def3caebf..edbfe1be5151177a3b207399ea24dd8e180d905a 100644 (file)
@@ -46,6 +46,7 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteR
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
@@ -148,7 +149,9 @@ public class NetconfNodeActor extends UntypedActor {
     }
 
     private void resolveProxyCalls(final Object message, final ActorRef recipient, final ActorRef futureSender) {
-        if (message instanceof ReadRequest) {
+        if (message instanceof OpenTransaction) {
+            operationsProcessor.doOpenTransaction(recipient, futureSender);
+        } else if (message instanceof ReadRequest) {
 
             final ReadRequest readRequest = (ReadRequest) message;
             operationsProcessor.doRead(readRequest.getStore(), readRequest.getPath(), recipient, futureSender);
index 33680f2dfc2701212e17ae068b54a82f04528d74..4df91d9dd80afc844c043e83758b76fb097126f0 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.netconf.topology.singleton.impl.tx;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -58,6 +59,14 @@ public class NetconfMasterDOMTransaction implements NetconfDOMTransaction {
         readTx = delegateBroker.newReadOnlyTransaction();
     }
 
+    @Override
+    public void openTransaction() {
+        // TODO We don't have to do anything here since
+        // NetconfProxyDOMTransactions and RemoteOperationTxProcessor do all
+        // the work regarding opening transactions. But maybe we should check
+        // for open transaction here instead in RemoteOperationTxProcessor
+    }
+
     @Override
     public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
                                                         final YangInstanceIdentifier path) {
index 26f25fae1d5b296e395d2d9a70c8a184e2a406f9..4298f2c8e83a5ccc07e8e8db7bc0c44d42b57b48 100644 (file)
@@ -13,6 +13,7 @@ import akka.actor.ActorSystem;
 import akka.dispatch.OnComplete;
 import akka.pattern.Patterns;
 import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
 import org.opendaylight.controller.config.util.xml.DocumentedException;
 import org.opendaylight.controller.config.util.xml.DocumentedException.ErrorSeverity;
 import org.opendaylight.controller.config.util.xml.DocumentedException.ErrorTag;
@@ -27,6 +28,7 @@ import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteR
 import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.OpenTransaction;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
 import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
@@ -55,6 +57,24 @@ public class NetconfProxyDOMTransaction implements NetconfDOMTransaction {
         this.masterContextRef = masterContextRef;
     }
 
+    @Override
+    public void openTransaction() {
+        // TODO we can do some checking for already opened transaction also
+        // here in this class. We can track open transaction at least for this
+        // node.
+        LOG.debug("{}: Requesting leader {} to open new transaction", id, masterContextRef);
+        final Future<Object> openTxFuture =
+                Patterns.ask(masterContextRef, new OpenTransaction(), NetconfTopologyUtils.TIMEOUT);
+        try {
+            // we have to wait here so we can see if tx can be opened
+            Await.result(openTxFuture, NetconfTopologyUtils.TIMEOUT.duration());
+            LOG.debug("{}: New transaction opened successfully", id);
+        } catch (final Exception e) {
+            LOG.error("{}: Failed to open new transaction", id, e);
+            Throwables.propagate(e);
+        }
+    }
+
     @Override
     public Future<Optional<NormalizedNodeMessage>> read(final LogicalDatastoreType store,
                                                         final YangInstanceIdentifier path) {
index f938748cbfacda14e00829ae3f484e52a036b7c4..422cf7aaa954bfcd8745f7138dd9f3af1e8c9dd5 100644 (file)
@@ -45,6 +45,8 @@ public class NetconfWriteOnlyTransaction implements DOMDataWriteTransaction {
         this.id = id;
         this.delegate = delegate;
         this.actorSystem = actorSystem;
+
+        this.delegate.openTransaction();
     }
 
     @Override
diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/OpenTransaction.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/messages/transactions/OpenTransaction.java
new file mode 100644 (file)
index 0000000..87622ca
--- /dev/null
@@ -0,0 +1,16 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+/**
+ * A message sent to MountPoint leader to open new transaction
+ */
+public class OpenTransaction implements TransactionRequest {
+    private static final long serialVersionUID = 1L;
+}
index 5007657c8a97e282c5cefeab959c6e31ccec6886..f5fbe31e9d9a08707236e2cf64ba15dbd29131ad 100644 (file)
@@ -148,21 +148,28 @@ public class WriteOnlyTransactionTest {
         // Test of invoking put on master through slave proxy
 
         doNothing().when(writeTx).put(storeType, instanceIdentifier, testNode);
-        slaveDataBroker.newWriteOnlyTransaction().put(storeType, instanceIdentifier, testNode);
+
+        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+        wTx.put(storeType, instanceIdentifier, testNode);
 
         verify(writeTx, times(1)).put(storeType, instanceIdentifier, testNode);
 
+        wTx.cancel();
         // Test of invoking merge on master through slave proxy
 
         doNothing().when(writeTx).merge(storeType, instanceIdentifier, testNode);
-        slaveDataBroker.newWriteOnlyTransaction().merge(storeType, instanceIdentifier, testNode);
+        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        wTx.merge(storeType, instanceIdentifier, testNode);
 
         verify(writeTx, times(1)).merge(storeType, instanceIdentifier, testNode);
 
+        wTx.cancel();
         // Test of invoking delete on master through slave proxy
 
         doNothing().when(writeTx).delete(storeType, instanceIdentifier);
-        slaveDataBroker.newWriteOnlyTransaction().delete(storeType, instanceIdentifier);
+        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        wTx.delete(storeType, instanceIdentifier);
+        wTx.cancel();
 
         verify(writeTx, times(1)).delete(storeType, instanceIdentifier);
 
@@ -177,33 +184,33 @@ public class WriteOnlyTransactionTest {
 
         // Without Tx
 
+        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
         final CheckedFuture<Void,TransactionCommitFailedException> resultSubmit = Futures.immediateCheckedFuture(null);
         doReturn(resultSubmit).when(writeTx).submit();
 
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse =
-                slaveDataBroker.newWriteOnlyTransaction().submit();
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitResponse = wTx.submit();
 
-        final Object result= resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
+        final Object result = resultSubmitResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
 
         assertNull(result);
 
         // With Tx
-
+        wTx = slaveDataBroker.newWriteOnlyTransaction();
         doNothing().when(writeTx).delete(any(), any());
-        slaveDataBroker.newWriteOnlyTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+        wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
 
         final CheckedFuture<Void,TransactionCommitFailedException> resultSubmitTx = Futures.immediateCheckedFuture(null);
         doReturn(resultSubmitTx).when(writeTx).submit();
 
-        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse =
-                slaveDataBroker.newWriteOnlyTransaction().submit();
+        final CheckedFuture<Void, TransactionCommitFailedException> resultSubmitTxResponse = wTx.submit();
 
         final Object resultTx = resultSubmitTxResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
 
         assertNull(resultTx);
 
-        slaveDataBroker.newWriteOnlyTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+        wTx = slaveDataBroker.newWriteOnlyTransaction();
+        wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
 
         final TransactionCommitFailedException throwable = new TransactionCommitFailedException("Fail", null);
@@ -213,7 +220,7 @@ public class WriteOnlyTransactionTest {
         doReturn(resultThrowable).when(writeTx).submit();
 
         final CheckedFuture<Void, TransactionCommitFailedException> resultThrowableResponse =
-                slaveDataBroker.newWriteOnlyTransaction().submit();
+                wTx.submit();
 
         exception.expect(TransactionCommitFailedException.class);
         resultThrowableResponse.checkedGet(TIMEOUT_SEC, TimeUnit.SECONDS);
@@ -228,23 +235,24 @@ public class WriteOnlyTransactionTest {
 
         // Without Tx
 
-        final Boolean resultFalseNoTx = slaveDataBroker.newWriteOnlyTransaction().cancel();
+        DOMDataWriteTransaction wTx = slaveDataBroker.newWriteOnlyTransaction();
+        final Boolean resultFalseNoTx = wTx.cancel();
         assertEquals(false, resultFalseNoTx);
 
         // With Tx, readWriteTx test
 
+        wTx = slaveDataBroker.newWriteOnlyTransaction();
         doNothing().when(writeTx).delete(any(), any());
-        slaveDataBroker.newReadWriteTransaction().delete(LogicalDatastoreType.CONFIGURATION,
+        wTx.delete(LogicalDatastoreType.CONFIGURATION,
                 YangInstanceIdentifier.EMPTY);
 
         doReturn(true).when(writeTx).cancel();
-
-        final Boolean resultTrue = slaveDataBroker.newWriteOnlyTransaction().cancel();
+        final Boolean resultTrue = wTx.cancel();
         assertEquals(true, resultTrue);
 
         doReturn(false).when(writeTx).cancel();
 
-        final Boolean resultFalse = slaveDataBroker.newWriteOnlyTransaction().cancel();
+        final Boolean resultFalse = wTx.cancel();
         assertEquals(false, resultFalse);
 
     }