BUG-8027: do not break actor encapsulation 47/53747/2
authorRobert Varga <rovarga@cisco.com>
Thu, 23 Mar 2017 17:10:58 +0000 (18:10 +0100)
committerRobert Varga <rovarga@cisco.com>
Thu, 23 Mar 2017 17:17:21 +0000 (18:17 +0100)
Invoking abort() from ShardTransaction means we are executing code
from one actor in the context of another one and since the code path
involves persistence, this breaks Akka rather thoroughly.

Introduce a dedicated method for the required upcall and send a request
to persist separately.

Change-Id: Ic994b5e5963e8c602844e283f34df8bfa3726705
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PersistAbortTransactionPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java

index c19cb9a0802c8ce285ce0cdc368ee824a4906d24..1a5b968741c45711f014d293f1b6e2594836a43d 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
@@ -75,6 +76,20 @@ abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
         parent.abortTransaction(this, callback);
     }
 
+    /**
+     * This method is exposed for sake of {@link ShardTransaction}, which is an actor. We need to ensure that
+     * the parent is updated to reflect the transaction has been closed, but no journal actions may be invoked.
+     *
+     * <p>
+     * ShardTransaction is responsible for additionally sending a request to persist an {@link AbortTransactionPayload}
+     * via a message to the Shard actor.
+     */
+    final void abortFromTransactionActor() {
+        if (close()) {
+            parent.abortFromTransactionActor(this);
+        }
+    }
+
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot)
index 11e74d5adad0eb09cae2af3a0a645fdbf96fbd0c..19a2151d2fc925284a39d65960e80819d425e154 100644 (file)
@@ -67,11 +67,13 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
@@ -334,6 +336,9 @@ public class Shard extends RaftActor {
             } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
                 store.processCohortRegistryCommand(getSender(),
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+            } else if (message instanceof PersistAbortTransactionPayload) {
+                final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+                persistPayload(txId, AbortTransactionPayload.create(txId), true);
             } else {
                 super.handleNonRaftCommand(message);
             }
index 3b15b656b35f64fd0441da8a9c28489f2dc4a8dc..20cee65c37ca50e62baf09cd3c4db25539242f8c 100644 (file)
@@ -659,6 +659,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         replicatePayload(id, AbortTransactionPayload.create(id), callback);
     }
 
+    @Override
+    void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
+        // No-op for free-standing transactions
+
+    }
+
     @Override
     ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         final DataTreeModification snapshot = transaction.getSnapshot();
index e620e52eb38026197953acfa01d0ae185a4834ca..794c6d82187d94f1c29a4f3bd8e25bd598cb5ca8 100644 (file)
@@ -72,14 +72,18 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+    void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
         if (transaction instanceof ReadWriteShardDataTreeTransaction) {
             Preconditions.checkState(openTransaction != null,
                     "Attempted to abort transaction %s while none is outstanding", transaction);
             LOG.debug("Aborted open transaction {}", transaction);
             openTransaction = null;
         }
+    }
 
+    @Override
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+        abortFromTransactionActor(transaction);
         dataTree.abortTransaction(transaction, callback);
     }
 
index 21c8c3bba2c667df3e131e46a1dc109f1faa8d40..a280a2229111082c4abac657042a468971c957cc 100644 (file)
@@ -12,10 +12,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 
 abstract class ShardDataTreeTransactionParent {
 
+    abstract void abortFromTransactionActor(AbstractShardDataTreeTransaction<?> transaction);
+
     abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
 
     abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
 
     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
-
 }
index b4fee8e9c859cb5a0a076806e1bf9f32f9aa38d3..dfd60afa21b84c7462a5459bc8752d4548276e57 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -77,7 +78,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     }
 
     private void closeTransaction(final boolean sendReply) {
-        getDOMStoreTransaction().abort(null);
+        getDOMStoreTransaction().abortFromTransactionActor();
+        shardActor.tell(new PersistAbortTransactionPayload(transactionId), ActorRef.noSender());
 
         if (sendReply && returnCloseTransactionReply()) {
             getSender().tell(new CloseTransactionReply(), getSelf());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PersistAbortTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PersistAbortTransactionPayload.java
new file mode 100644 (file)
index 0000000..b581ddb
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A request sent from {@link org.opendaylight.controller.cluster.datastore.ShardTransaction} to
+ * {@link org.opendaylight.controller.cluster.datastore.Shard} to persist an
+ * {@link org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload} after the transaction has
+ * been closed by the frontend and internal backend state has been updated.
+ *
+ * <p>
+ * Since the two are actors, we cannot perform a direct upcall, as that breaks actor containment and wreaks havoc into
+ * Akka itself. This class does not need to be serializable, as both actors are guaranteed to be co-located.
+ *
+ * @author Robert Varga
+ */
+public final class PersistAbortTransactionPayload {
+    private final TransactionIdentifier txId;
+
+    public PersistAbortTransactionPayload(final TransactionIdentifier txId) {
+        this.txId = Preconditions.checkNotNull(txId);
+    }
+
+    public TransactionIdentifier getTransactionId() {
+        return txId;
+    }
+}
index 00087e37dfeefafc9d1852d9934ea9c34a39ce5d..233ecb8cf344ed6f97583aea213b4169c9c7d3fc 100644 (file)
@@ -70,7 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort(null);
+        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
 
         future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
                 DataStoreVersions.CURRENT_VERSION), 3000);
@@ -92,7 +92,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort(null);
+        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
 
         future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
                 DataStoreVersions.CURRENT_VERSION), 3000);
@@ -113,7 +113,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort(null);
+        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
 
         future = akka.pattern.Patterns.ask(subject,
                 new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);