Bug 2318: Follow-up changes for previous patch 12535 82/12582/4
authortpantelis <tpanteli@brocade.com>
Sat, 1 Nov 2014 22:17:33 +0000 (18:17 -0400)
committertpantelis <tpanteli@brocade.com>
Sun, 2 Nov 2014 11:51:07 +0000 (06:51 -0500)
Addressed some comments and other minor changes as a follow-up to patch 12535.

Enhanced the comments in TransactionChainProxy.

Modified Shard to send a failure reply if an runtime exception occurs in
createTransaction. This allows the error to be propagated to the caller
instead of being thrown back to akka.

Modified TransactionChainProxy to throw an IllegalStateException if the
next chain Tx is created before the previous one is readied. Prior, the
Shard would throw an IllegalStateException but from the Future returned
from submit and not on new*Transaction as the API docs state. So this make
it consistent with API contract. This was implemented by introducing a
volatile State field with implemnetations, Idle, Allocated and Closed.

Also, modified TransactionChainProxy to throw TransactionChainClosedException
if the chain is already closed when on Tx create. Again. this makes it
consistent with the API docs.

Added unit tests in DistributedDataStoreIntegrationTest to verify failure
in both cases.

Change-Id: I1b1ed06ceb1d4599f3f6c5443ca24ac1441ac38f
Signed-off-by: tpantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/utils/MockDataChangeListener.java

index 5ea9b30c63e7d3f9b44e9800eddb2b5e42f08657..4b130950f27d9f40585cc432eedf4b31e71e3a9b 100644 (file)
@@ -27,6 +27,13 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
@@ -78,14 +85,6 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import javax.annotation.Nonnull;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A Shard represents a portion of the logical data tree <br/>
  * <p>
@@ -491,8 +490,8 @@ public class Shard extends RaftActor {
             }
         }
 
-        if(this.schemaContext == null){
-            throw new NullPointerException("schemaContext should not be null");
+        if(this.schemaContext == null) {
+            throw new IllegalStateException("SchemaContext is not set");
         }
 
         if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
@@ -533,9 +532,16 @@ public class Shard extends RaftActor {
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
-        createTransaction(createTransaction.getTransactionType(),
-            createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
-            createTransaction.getVersion());
+        try {
+            ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
+                createTransaction.getTransactionId(), createTransaction.getTransactionChainId(),
+                createTransaction.getVersion());
+
+            getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
+                    createTransaction.getTransactionId()).toSerializable(), getSelf());
+        } catch (Exception e) {
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
 
     private ActorRef createTransaction(int transactionType, String remoteTransactionId,
@@ -545,18 +551,14 @@ public class Shard extends RaftActor {
             ShardTransactionIdentifier.builder()
                 .remoteTransactionId(remoteTransactionId)
                 .build();
+
         if(LOG.isDebugEnabled()) {
             LOG.debug("Creating transaction : {} ", transactionId);
         }
+
         ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId,
                 transactionChainId, clientVersion);
 
-        getSender()
-            .tell(new CreateTransactionReply(
-                    Serialization.serializedActorPath(transactionActor),
-                    remoteTransactionId).toSerializable(),
-                getSelf());
-
         return transactionActor;
     }
 
index 93f9e6b7de1e2085f01d36c74a37b84ff7ecb4d2..92de88e1126c19c38718f0f7c5c8cd51ad8430c0 100644 (file)
@@ -10,10 +10,14 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
 import java.util.AbstractMap.SimpleEntry;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
@@ -24,10 +28,70 @@ import scala.concurrent.Promise;
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
-public class TransactionChainProxy implements DOMStoreTransactionChain{
+public class TransactionChainProxy implements DOMStoreTransactionChain {
+    private interface State {
+        boolean isReady();
+
+        SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures();
+
+        void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures);
+    }
+
+    private static class Allocated implements State {
+        private volatile SimpleEntry<Object, List<Future<ActorSelection>>> readyFutures;
+
+        @Override
+        public boolean isReady() {
+            return readyFutures != null;
+        }
+
+        @Override
+        public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
+            return readyFutures != null ? readyFutures : EMPTY_READY_FUTURES;
+        }
+
+        @Override
+        public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
+            this.readyFutures = new SimpleEntry<>(txIdentifier, readyFutures);
+        }
+    }
+
+    private static abstract class AbstractDefaultState implements State {
+        @Override
+        public SimpleEntry<Object, List<Future<ActorSelection>>> getReadyFutures() {
+            return EMPTY_READY_FUTURES;
+        }
+
+        @Override
+        public void setReadyFutures(Object txIdentifier, List<Future<ActorSelection>> readyFutures) {
+            throw new IllegalStateException("No transaction is allocated");
+        }
+    }
+
+    private static final State IDLE_STATE = new AbstractDefaultState() {
+        @Override
+        public boolean isReady() {
+            return true;
+        }
+    };
+
+    private static final State CLOSED_STATE = new AbstractDefaultState() {
+        @Override
+        public boolean isReady() {
+            throw new TransactionChainClosedException("Transaction chain has been closed");
+        }
+    };
+
+    private static final SimpleEntry<Object, List<Future<ActorSelection>>> EMPTY_READY_FUTURES =
+            new SimpleEntry<Object, List<Future<ActorSelection>>>("",
+                    Collections.<Future<ActorSelection>>emptyList());
+
+    private static final AtomicReferenceFieldUpdater<TransactionChainProxy, State> STATE_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(TransactionChainProxy.class, State.class, "state");
+
     private final ActorContext actorContext;
     private final String transactionChainId;
-    private volatile SimpleEntry<Object, List<Future<ActorSelection>>> previousTxReadyFutures;
+    private volatile State state = IDLE_STATE;
 
     public TransactionChainProxy(ActorContext actorContext) {
         this.actorContext = actorContext;
@@ -36,27 +100,40 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
+        checkReadyState();
         return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+        return allocateWriteTransaction(TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new ChainedTransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+        return allocateWriteTransaction(TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
     public void close() {
+        state = CLOSED_STATE;
+
         // Send a close transaction chain request to each and every shard
         actorContext.broadcast(new CloseTransactionChain(transactionChainId));
     }
 
-    public String getTransactionChainId() {
-        return transactionChainId;
+    private ChainedTransactionProxy allocateWriteTransaction(TransactionProxy.TransactionType type) {
+        checkReadyState();
+
+        ChainedTransactionProxy txProxy = new ChainedTransactionProxy(actorContext, type);
+        STATE_UPDATER.compareAndSet(this, IDLE_STATE, new Allocated());
+
+        return txProxy;
+    }
+
+    private void checkReadyState() {
+        Preconditions.checkState(state.isReady(), "Previous transaction %s is not ready yet",
+                state.getReadyFutures().getKey());
     }
 
     private class ChainedTransactionProxy extends TransactionProxy {
@@ -66,12 +143,8 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
         }
 
         @Override
-        protected void onTransactionReady(List<Future<ActorSelection>> cohortFutures) {
-            if(!cohortFutures.isEmpty()) {
-                previousTxReadyFutures = new SimpleEntry<>(getIdentifier(), cohortFutures);
-            } else {
-                previousTxReadyFutures = null;
-            }
+        protected void onTransactionReady(List<Future<ActorSelection>> readyFutures) {
+            state.setReadyFutures(getIdentifier(), readyFutures);
         }
 
         /**
@@ -82,18 +155,34 @@ public class TransactionChainProxy implements DOMStoreTransactionChain{
         @Override
         protected Future<Object> sendCreateTransaction(final ActorSelection shard,
                 final Object serializedCreateMessage) {
-            // Check if there are any previous ready Futures. Also make sure the previous ready
-            // Futures aren't for this Tx as deadlock would occur if tried to wait on our own
-            // Futures. This may happen b/c the shard Tx creates are done async so it's possible
-            // for the client to ready this Tx before we've even attempted to create a shard Tx.
-            if(previousTxReadyFutures == null ||
-                    previousTxReadyFutures.getKey().equals(getIdentifier())) {
+
+            // Check if there are any previous ready Futures, otherwise let the super class handle it.
+            // The second check is done to ensure the the previous ready Futures aren't for this
+            // Tx instance as deadlock would occur if we tried to wait on our own Futures. This can
+            // occur in this scenario:
+            //
+            //     - the TransactionProxy is created and the client does a write.
+            //
+            //     - the TransactionProxy then attempts to create the shard Tx. However it first
+            //       sends a FindPrimaryShard message to the shard manager to find the local shard
+            //       This call is done async.
+            //
+            //     - the client submits the Tx and the TransactionProxy is readied and we cache
+            //       the ready Futures here.
+            //
+            //     - then the FindPrimaryShard call completes and this method is called to create
+            //       the shard Tx. However the cached Futures were from the ready on this Tx. If we
+            //       tried to wait on them, it would cause a form of deadlock as the ready Future
+            //       would be waiting on the Tx create Future and vice versa.
+            SimpleEntry<Object, List<Future<ActorSelection>>> readyFuturesEntry = state.getReadyFutures();
+            List<Future<ActorSelection>> readyFutures = readyFuturesEntry.getValue();
+            if(readyFutures.isEmpty() || getIdentifier().equals(readyFuturesEntry.getKey())) {
                 return super.sendCreateTransaction(shard, serializedCreateMessage);
             }
 
             // Combine the ready Futures into 1.
             Future<Iterable<ActorSelection>> combinedFutures = akka.dispatch.Futures.sequence(
-                    previousTxReadyFutures.getValue(), actorContext.getActorSystem().dispatcher());
+                    readyFutures, actorContext.getActorSystem().dispatcher());
 
             // Add a callback for completion of the combined Futures.
             final Promise<Object> createTxPromise = akka.dispatch.Futures.promise();
index 12c566d33de786db62bfad3ec86a56939edbe1c2..4f1a02e43557298c11c78f37a35c16078df4d0d7 100644 (file)
@@ -2,12 +2,14 @@ package org.opendaylight.controller.cluster.datastore;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -25,6 +27,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -601,7 +604,7 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
             // 7. Create a new read Tx from the chain to read the data from the last RW Tx to
             // verify it is visible.
 
-            readTx = txChain.newReadOnlyTransaction();
+            readTx = txChain.newReadWriteTransaction();
             optional = readTx.read(TestModel.OUTER_LIST_PATH).get(5, TimeUnit.SECONDS);
             assertEquals("isPresent", true, optional.isPresent());
             assertEquals("Data node", outerNode, optional.get());
@@ -630,6 +633,42 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         }};
     }
 
+    @Test
+    public void testCreateChainedTransactionWhenPreviousNotReady() throws Throwable {
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore = setupDistributedDataStore(
+                    "testCreateChainedTransactionWhenPreviousNotReady", "test-1");
+
+            final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+            DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+            assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+            writeTx.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+            // Try to create another Tx of each type - each should fail b/c the previous Tx wasn't
+            // readied.
+
+            assertExceptionOnTxChainCreates(txChain, IllegalStateException.class);
+        }};
+    }
+
+    @Test
+    public void testCreateChainedTransactionAfterClose() throws Throwable {
+        new IntegrationTestKit(getSystem()) {{
+            DistributedDataStore dataStore = setupDistributedDataStore(
+                    "testCreateChainedTransactionAfterClose", "test-1");
+
+            DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
+
+            txChain.close();
+
+            // Try to create another Tx of each type - should fail b/c the previous Tx was closed.
+
+            assertExceptionOnTxChainCreates(txChain, TransactionChainClosedException.class);
+        }};
+    }
+
     @Test
     public void testChangeListenerRegistration() throws Exception{
         new IntegrationTestKit(getSystem()) {{
@@ -761,6 +800,43 @@ public class DistributedDataStoreIntegrationTest extends AbstractActorTest {
         void cleanup(DistributedDataStore dataStore) {
             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
         }
+
+        void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
+                throws Exception {
+            try {
+                callable.call();
+                fail("Expected " + expType.getSimpleName());
+            } catch(Exception e) {
+                assertEquals("Exception type", expType, e.getClass());
+            }
+        }
+
+        void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
+                Class<? extends Exception> expType) throws Exception {
+            assertExceptionOnCall(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    txChain.newWriteOnlyTransaction();
+                    return null;
+                }
+            }, expType);
+
+            assertExceptionOnCall(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    txChain.newReadWriteTransaction();
+                    return null;
+                }
+            }, expType);
+
+            assertExceptionOnCall(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    txChain.newReadOnlyTransaction();
+                    return null;
+                }
+            }, expType);
+        }
     }
 
 }
index 5bbdcae93c2241cd8b9e7a87613735b69841973d..efd58620a23501b114e80a15084406a004ed4179 100644 (file)
@@ -59,8 +59,6 @@ public class MockDataChangeListener implements
                     expChangeEventCount, (expChangeEventCount - changeLatch.getCount())));
         }
 
-        assertEquals("Change notifications complete", true, done);
-
         for(int i = 0; i < expPaths.length; i++) {
             assertTrue(String.format("Change %d does not contain %s", (i+1), expPaths[i]),
                     changeList.get(i).getCreatedData().containsKey(expPaths[i]));