Handling ShardTransaction Failure Scenarios 81/9881/1
authorBasheeruddin Ahmed <syedbahm@cisco.com>
Tue, 12 Aug 2014 06:31:45 +0000 (23:31 -0700)
committerBasheeruddin Ahmed <syedbahm@cisco.com>
Tue, 12 Aug 2014 06:34:03 +0000 (23:34 -0700)
and corresponding test cases

Note: Cohort failures handling is being worked on
and will be taken care in separate commit

Change-Id: I7ef165b9cb8c91088b4f20ae6c19418f92edb37c
Signed-off-by: Basheeruddin Ahmed <syedbahm@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java [new file with mode: 0644]

index 308589e7f0b9be1ea41eb6679e2218cab07762d1..21fea96320f30754baa2c300877c768c895b4d02 100644 (file)
@@ -144,7 +144,7 @@ public class Shard extends RaftActor {
         } else if (message instanceof PeerAddressResolved) {
             PeerAddressResolved resolved = (PeerAddressResolved) message;
             setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress());
-        } else {
+        } else{
             super.onReceiveCommand(message);
         }
     }
@@ -240,8 +240,7 @@ public class Shard extends RaftActor {
 
                 } catch (InterruptedException | ExecutionException e) {
                     shardMBean.incrementFailedTransactionsCount();
-                    // FIXME : Handle this properly
-                    LOG.error(e, "An exception happened when committing");
+                    sender.tell(new akka.actor.Status.Failure(e),self);
                 }
             }
         }, getContext().dispatcher());
index f78935b5e72b6c79070efe5b8cbd315fa535ac12..7a18fca100f027b2bfad23b0433eeb91eab06484 100644 (file)
@@ -55,4 +55,9 @@ public class ShardReadTransaction extends ShardTransaction {
     getSelf().tell(PoisonPill.getInstance(), getSelf());
   }
 
+  //default scope test method to check if we get correct exception
+  void forUnitTestOnlyExplicitTransactionClose(){
+      transaction.close();
+  }
+
 }
index 6733bcfb9f6e323be99a849195bddac637f11588..92fb39308357cd63acf2d7810a321b1081b3ace1 100644 (file)
@@ -65,4 +65,13 @@ public class ShardReadWriteTransaction extends ShardTransaction {
     getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
     getSelf().tell(PoisonPill.getInstance(), getSelf());
   }
+
+    /**
+     * The following method is used in unit testing only
+     * hence the default scope.
+     * This is done to test out failure cases.
+     */
+    public void forUnitTestOnlyExplicitTransactionClose() {
+        transaction.close();
+    }
 }
index 3a916bda2c6e163d75c4d39a0c4d80b94b823fb2..312ec9a4ff97f861b1fcbc23220dc44e6920b61a 100644 (file)
@@ -32,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompo
 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
@@ -41,8 +42,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
-import java.util.concurrent.ExecutionException;
-
 /**
  * The ShardTransaction Actor represents a remote transaction
  * <p>
@@ -197,10 +196,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
           } else {
             sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
           }
-        } catch (InterruptedException | ExecutionException e) {
-          log.error(e,
-              "An exception happened when reading data from path : "
-                  + path.toString());
+        } catch (Exception e) {
+            sender.tell(new akka.actor.Status.Failure(new ReadFailedException( "An Exception occurred  when reading data from path : "
+                + path.toString(),e)),self);
         }
 
       }
@@ -212,22 +210,35 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
     modification.addModification(
         new WriteModification(message.getPath(), message.getData(),schemaContext));
     LOG.debug("writeData at path : " + message.getPath().toString());
-    transaction.write(message.getPath(), message.getData());
-    getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+
+    try {
+        transaction.write(message.getPath(), message.getData());
+        getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+    }catch(Exception e){
+        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+    }
   }
 
   protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
     modification.addModification(
         new MergeModification(message.getPath(), message.getData(), schemaContext));
     LOG.debug("mergeData at path : " + message.getPath().toString());
-    transaction.merge(message.getPath(), message.getData());
-    getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+    try {
+        transaction.merge(message.getPath(), message.getData());
+        getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+    }catch(Exception e){
+        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+    }
   }
 
   protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
     modification.addModification(new DeleteModification(message.getPath()));
-    transaction.delete(message.getPath());
-    getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+    try {
+        transaction.delete(message.getPath());
+        getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+    }catch(Exception e){
+        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+    }
   }
 
   protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
index 2a5429ba81c4b0025e0799631da1ba914956ac3c..91e578b46d1f6de8f2e841f7d7f459541a9c3bdb 100644 (file)
@@ -63,4 +63,13 @@ public class ShardWriteTransaction extends ShardTransaction {
     getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
     getSelf().tell(PoisonPill.getInstance(), getSelf());
   }
+
+    /**
+     * The following method is used in unit testing only
+     * hence the default scope.
+     * This is done to test out failure cases.
+     */
+    public void forUnitTestOnlyExplicitTransactionClose() {
+        transaction.close();
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
new file mode 100644 (file)
index 0000000..86016a6
--- /dev/null
@@ -0,0 +1,303 @@
+/*
+ *
+ *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ *  and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Covers negative test cases
+ * @author Basheeruddin Ahmed <syedbahm@cisco.com>
+ */
+public class ShardTransactionFailureTest extends AbstractActorTest {
+    private static ListeningExecutorService storeExecutor =
+        MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+    private static final InMemoryDOMDataStore store =
+        new InMemoryDOMDataStore("OPER", storeExecutor,
+            MoreExecutors.sameThreadExecutor());
+
+    private static final SchemaContext testSchemaContext =
+        TestModel.createTestContext();
+
+    static {
+        store.onGlobalContextUpdated(testSchemaContext);
+    }
+
+
+    @Test
+    public void testNegativePerformingWriteOperationOnReadTransaction()
+        throws Exception {
+        try {
+
+            final ActorRef
+                shard = getSystem()
+                .actorOf(Shard.props("config", Collections.EMPTY_MAP));
+            final Props props =
+                ShardTransaction
+                    .props(store.newReadOnlyTransaction(), shard, TestModel
+                        .createTestContext());
+            final TestActorRef subject = TestActorRef.apply(props, getSystem());
+
+            subject
+                .receive(new DeleteData(TestModel.TEST_PATH).toSerializable(),
+                    ActorRef.noSender());
+            Assert.assertFalse(true);
+
+
+        } catch (Exception cs) {
+            assertEquals(cs.getClass().getSimpleName(),
+                Exception.class.getSimpleName());
+            assertTrue(cs.getMessage().startsWith(
+                "ShardTransaction:handleRecieve received an unknown message"));
+        }
+    }
+
+    @Test(expected = ReadFailedException.class)
+    public void testNegativeReadWithReadOnlyTransactionClosed()
+        throws Throwable {
+
+        final ActorRef shard =
+            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+        final Props props =
+            ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                TestModel.createTestContext());
+
+        final TestActorRef<ShardTransaction> subject = TestActorRef
+            .create(getSystem(), props,
+                "testNegativeReadWithReadOnlyTransactionClosed");
+
+        ShardTransactionMessages.ReadData readData =
+            ShardTransactionMessages.ReadData.newBuilder()
+                .setInstanceIdentifierPathArguments(
+                    NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+                        .build()
+                ).build();
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject, readData, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+        ((ShardReadTransaction) subject.underlyingActor())
+            .forUnitTestOnlyExplicitTransactionClose();
+
+        future = akka.pattern.Patterns.ask(subject, readData, 3000);
+        Await.result(future, Duration.Zero());
+
+
+    }
+
+
+    @Test(expected = ReadFailedException.class)
+    public void testNegativeReadWithReadWriteOnlyTransactionClosed()
+        throws Throwable {
+
+        final ActorRef shard =
+            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+        final Props props =
+            ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                TestModel.createTestContext());
+
+        final TestActorRef<ShardTransaction> subject = TestActorRef
+            .create(getSystem(), props,
+                "testNegativeReadWithReadWriteOnlyTransactionClosed");
+
+        ShardTransactionMessages.ReadData readData =
+            ShardTransactionMessages.ReadData.newBuilder()
+                .setInstanceIdentifierPathArguments(
+                    NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+                        .build()
+                ).build();
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject, readData, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+        ((ShardReadWriteTransaction) subject.underlyingActor())
+            .forUnitTestOnlyExplicitTransactionClose();
+
+        future = akka.pattern.Patterns.ask(subject, readData, 3000);
+        Await.result(future, Duration.Zero());
+
+
+    }
+
+
+    @Test(expected = IllegalStateException.class)
+    public void testNegativeWriteWithTransactionReady() throws Exception {
+
+
+        final ActorRef shard =
+            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+        final Props props =
+            ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                TestModel.createTestContext());
+
+        final TestActorRef<ShardTransaction> subject = TestActorRef
+            .create(getSystem(), props,
+                "testNegativeWriteWithTransactionReady");
+
+        ShardTransactionMessages.ReadyTransaction readyTransaction =
+            ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+        ShardTransactionMessages.WriteData writeData =
+            ShardTransactionMessages.WriteData.newBuilder()
+                .setInstanceIdentifierPathArguments(
+                    NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+                        .build()).setNormalizedNode(
+                NormalizedNodeMessages.Node.newBuilder().build()
+
+            ).build();
+
+        future = akka.pattern.Patterns.ask(subject, writeData, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+
+    }
+
+
+    @Test(expected = IllegalStateException.class)
+    public void testNegativeReadWriteWithTransactionReady() throws Exception {
+
+
+        final ActorRef shard =
+            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+        final Props props =
+            ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                TestModel.createTestContext());
+
+        final TestActorRef<ShardTransaction> subject = TestActorRef
+            .create(getSystem(), props,
+                "testNegativeReadWriteWithTransactionReady");
+
+        ShardTransactionMessages.ReadyTransaction readyTransaction =
+            ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+        ShardTransactionMessages.WriteData writeData =
+            ShardTransactionMessages.WriteData.newBuilder()
+                .setInstanceIdentifierPathArguments(
+                    NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+                        .build()).setNormalizedNode(
+                NormalizedNodeMessages.Node.newBuilder().build()
+
+            ).build();
+
+        future = akka.pattern.Patterns.ask(subject, writeData, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testNegativeMergeTransactionReady() throws Exception {
+
+
+        final ActorRef shard =
+            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+        final Props props =
+            ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                TestModel.createTestContext());
+
+        final TestActorRef<ShardTransaction> subject = TestActorRef
+            .create(getSystem(), props, "testNegativeMergeTransactionReady");
+
+        ShardTransactionMessages.ReadyTransaction readyTransaction =
+            ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+        ShardTransactionMessages.MergeData mergeData =
+            ShardTransactionMessages.MergeData.newBuilder()
+                .setInstanceIdentifierPathArguments(
+                    NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+                        .build()).setNormalizedNode(
+                NormalizedNodeMessages.Node.newBuilder().build()
+
+            ).build();
+
+        future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+
+    }
+
+
+    @Test(expected = IllegalStateException.class)
+    public void testNegativeDeleteDataWhenTransactionReady() throws Exception {
+
+
+        final ActorRef shard =
+            getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP));
+        final Props props =
+            ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                TestModel.createTestContext());
+
+        final TestActorRef<ShardTransaction> subject = TestActorRef
+            .create(getSystem(), props,
+                "testNegativeDeleteDataWhenTransactionReady");
+
+        ShardTransactionMessages.ReadyTransaction readyTransaction =
+            ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+        ShardTransactionMessages.DeleteData deleteData =
+            ShardTransactionMessages.DeleteData.newBuilder()
+                .setInstanceIdentifierPathArguments(
+                    NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+                        .build()).build();
+
+        future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
+        assertTrue(future.isCompleted());
+        Await.result(future, Duration.Zero());
+
+
+    }
+}