BUG-8027: do not break actor encapsulation 47/53747/2
authorRobert Varga <rovarga@cisco.com>
Thu, 23 Mar 2017 17:10:58 +0000 (18:10 +0100)
committerRobert Varga <rovarga@cisco.com>
Thu, 23 Mar 2017 17:17:21 +0000 (18:17 +0100)
Invoking abort() from ShardTransaction means we are executing code
from one actor in the context of another one and since the code path
involves persistence, this breaks Akka rather thoroughly.

Introduce a dedicated method for the required upcall and send a request
to persist separately.

Change-Id: Ic994b5e5963e8c602844e283f34df8bfa3726705
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PersistAbortTransactionPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java

index c19cb9a..1a5b968 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.slf4j.Logger;
@@ -75,6 +76,20 @@ abstract class AbstractShardDataTreeTransaction<T extends DataTreeSnapshot>
         parent.abortTransaction(this, callback);
     }
 
+    /**
+     * This method is exposed for sake of {@link ShardTransaction}, which is an actor. We need to ensure that
+     * the parent is updated to reflect the transaction has been closed, but no journal actions may be invoked.
+     *
+     * <p>
+     * ShardTransaction is responsible for additionally sending a request to persist an {@link AbortTransactionPayload}
+     * via a message to the Shard actor.
+     */
+    final void abortFromTransactionActor() {
+        if (close()) {
+            parent.abortFromTransactionActor(this);
+        }
+    }
+
     @Override
     public final String toString() {
         return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot)
index 11e74d5..19a2151 100644 (file)
@@ -67,11 +67,13 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
@@ -334,6 +336,9 @@ public class Shard extends RaftActor {
             } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
                 store.processCohortRegistryCommand(getSender(),
                         (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+            } else if (message instanceof PersistAbortTransactionPayload) {
+                final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+                persistPayload(txId, AbortTransactionPayload.create(txId), true);
             } else {
                 super.handleNonRaftCommand(message);
             }
index 3b15b65..20cee65 100644 (file)
@@ -659,6 +659,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         replicatePayload(id, AbortTransactionPayload.create(id), callback);
     }
 
+    @Override
+    void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
+        // No-op for free-standing transactions
+
+    }
+
     @Override
     ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
         final DataTreeModification snapshot = transaction.getSnapshot();
index e620e52..794c6d8 100644 (file)
@@ -72,14 +72,18 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
     }
 
     @Override
-    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+    void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
         if (transaction instanceof ReadWriteShardDataTreeTransaction) {
             Preconditions.checkState(openTransaction != null,
                     "Attempted to abort transaction %s while none is outstanding", transaction);
             LOG.debug("Aborted open transaction {}", transaction);
             openTransaction = null;
         }
+    }
 
+    @Override
+    void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+        abortFromTransactionActor(transaction);
         dataTree.abortTransaction(transaction, callback);
     }
 
index 21c8c3b..a280a22 100644 (file)
@@ -12,10 +12,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification
 
 abstract class ShardDataTreeTransactionParent {
 
+    abstract void abortFromTransactionActor(AbstractShardDataTreeTransaction<?> transaction);
+
     abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
 
     abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
 
     abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
-
 }
index b4fee8e..dfd60af 100644 (file)
@@ -23,6 +23,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
@@ -77,7 +78,8 @@ public abstract class ShardTransaction extends AbstractUntypedActorWithMetering
     }
 
     private void closeTransaction(final boolean sendReply) {
-        getDOMStoreTransaction().abort(null);
+        getDOMStoreTransaction().abortFromTransactionActor();
+        shardActor.tell(new PersistAbortTransactionPayload(transactionId), ActorRef.noSender());
 
         if (sendReply && returnCloseTransactionReply()) {
             getSender().tell(new CloseTransactionReply(), getSelf());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PersistAbortTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/PersistAbortTransactionPayload.java
new file mode 100644 (file)
index 0000000..b581ddb
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A request sent from {@link org.opendaylight.controller.cluster.datastore.ShardTransaction} to
+ * {@link org.opendaylight.controller.cluster.datastore.Shard} to persist an
+ * {@link org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload} after the transaction has
+ * been closed by the frontend and internal backend state has been updated.
+ *
+ * <p>
+ * Since the two are actors, we cannot perform a direct upcall, as that breaks actor containment and wreaks havoc into
+ * Akka itself. This class does not need to be serializable, as both actors are guaranteed to be co-located.
+ *
+ * @author Robert Varga
+ */
+public final class PersistAbortTransactionPayload {
+    private final TransactionIdentifier txId;
+
+    public PersistAbortTransactionPayload(final TransactionIdentifier txId) {
+        this.txId = Preconditions.checkNotNull(txId);
+    }
+
+    public TransactionIdentifier getTransactionId() {
+        return txId;
+    }
+}
index 00087e3..233ecb8 100644 (file)
@@ -70,7 +70,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort(null);
+        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
 
         future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
                 DataStoreVersions.CURRENT_VERSION), 3000);
@@ -92,7 +92,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort(null);
+        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
 
         future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
                 DataStoreVersions.CURRENT_VERSION), 3000);
@@ -113,7 +113,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
         Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        subject.underlyingActor().getDOMStoreTransaction().abort(null);
+        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
 
         future = akka.pattern.Patterns.ask(subject,
                 new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.