From a7223085cb1b53fbe96ab5508b5bdcbff2a04224 Mon Sep 17 00:00:00 2001 From: Basheeruddin Ahmed Date: Mon, 11 Aug 2014 23:31:45 -0700 Subject: [PATCH] Handling ShardTransaction Failure Scenarios and corresponding test cases Note: Cohort failures handling is being worked on and will be taken care in separate commit Change-Id: I7ef165b9cb8c91088b4f20ae6c19418f92edb37c Signed-off-by: Basheeruddin Ahmed --- .../controller/cluster/datastore/Shard.java | 5 +- .../datastore/ShardReadTransaction.java | 5 + .../datastore/ShardReadWriteTransaction.java | 9 + .../cluster/datastore/ShardTransaction.java | 35 +- .../datastore/ShardWriteTransaction.java | 9 + .../ShardTransactionFailureTest.java | 303 ++++++++++++++++++ 6 files changed, 351 insertions(+), 15 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 308589e7f0..21fea96320 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -144,7 +144,7 @@ public class Shard extends RaftActor { } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; setPeerAddress(resolved.getPeerId(), resolved.getPeerAddress()); - } else { + } else{ super.onReceiveCommand(message); } } @@ -240,8 +240,7 @@ public class Shard extends RaftActor { } catch (InterruptedException | ExecutionException e) { shardMBean.incrementFailedTransactionsCount(); - // FIXME : Handle this properly - LOG.error(e, "An exception happened when committing"); + sender.tell(new akka.actor.Status.Failure(e),self); } } }, getContext().dispatcher()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index f78935b5e7..7a18fca100 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -55,4 +55,9 @@ public class ShardReadTransaction extends ShardTransaction { getSelf().tell(PoisonPill.getInstance(), getSelf()); } + //default scope test method to check if we get correct exception + void forUnitTestOnlyExplicitTransactionClose(){ + transaction.close(); + } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java index 6733bcfb9f..92fb393083 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java @@ -65,4 +65,13 @@ public class ShardReadWriteTransaction extends ShardTransaction { getSender().tell(new CloseTransactionReply().toSerializable(), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } + + /** + * The following method is used in unit testing only + * hence the default scope. + * This is done to test out failure cases. + */ + public void forUnitTestOnlyExplicitTransactionClose() { + transaction.close(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java index 3a916bda2c..312ec9a4ff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java @@ -32,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.modification.ImmutableCompo import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; @@ -41,8 +42,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; -import java.util.concurrent.ExecutionException; - /** * The ShardTransaction Actor represents a remote transaction *

@@ -197,10 +196,9 @@ public abstract class ShardTransaction extends AbstractUntypedActor { } else { sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self); } - } catch (InterruptedException | ExecutionException e) { - log.error(e, - "An exception happened when reading data from path : " - + path.toString()); + } catch (Exception e) { + sender.tell(new akka.actor.Status.Failure(new ReadFailedException( "An Exception occurred when reading data from path : " + + path.toString(),e)),self); } } @@ -212,22 +210,35 @@ public abstract class ShardTransaction extends AbstractUntypedActor { modification.addModification( new WriteModification(message.getPath(), message.getData(),schemaContext)); LOG.debug("writeData at path : " + message.getPath().toString()); - transaction.write(message.getPath(), message.getData()); - getSender().tell(new WriteDataReply().toSerializable(), getSelf()); + + try { + transaction.write(message.getPath(), message.getData()); + getSender().tell(new WriteDataReply().toSerializable(), getSelf()); + }catch(Exception e){ + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } } protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) { modification.addModification( new MergeModification(message.getPath(), message.getData(), schemaContext)); LOG.debug("mergeData at path : " + message.getPath().toString()); - transaction.merge(message.getPath(), message.getData()); - getSender().tell(new MergeDataReply().toSerializable(), getSelf()); + try { + transaction.merge(message.getPath(), message.getData()); + getSender().tell(new MergeDataReply().toSerializable(), getSelf()); + }catch(Exception e){ + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } } protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) { modification.addModification(new DeleteModification(message.getPath())); - transaction.delete(message.getPath()); - getSender().tell(new DeleteDataReply().toSerializable(), getSelf()); + try { + transaction.delete(message.getPath()); + getSender().tell(new DeleteDataReply().toSerializable(), getSelf()); + }catch(Exception e){ + getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + } } protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 2a5429ba81..91e578b46d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -63,4 +63,13 @@ public class ShardWriteTransaction extends ShardTransaction { getSender().tell(new CloseTransactionReply().toSerializable(), getSelf()); getSelf().tell(PoisonPill.getInstance(), getSelf()); } + + /** + * The following method is used in unit testing only + * hence the default scope. + * This is done to test out failure cases. + */ + public void forUnitTestOnlyExplicitTransactionClose() { + transaction.close(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java new file mode 100644 index 0000000000..86016a677b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -0,0 +1,303 @@ +/* + * + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + * + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.Assert; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.messages.DeleteData; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Covers negative test cases + * @author Basheeruddin Ahmed + */ +public class ShardTransactionFailureTest extends AbstractActorTest { + private static ListeningExecutorService storeExecutor = + MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); + + private static final InMemoryDOMDataStore store = + new InMemoryDOMDataStore("OPER", storeExecutor, + MoreExecutors.sameThreadExecutor()); + + private static final SchemaContext testSchemaContext = + TestModel.createTestContext(); + + static { + store.onGlobalContextUpdated(testSchemaContext); + } + + + @Test + public void testNegativePerformingWriteOperationOnReadTransaction() + throws Exception { + try { + + final ActorRef + shard = getSystem() + .actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final Props props = + ShardTransaction + .props(store.newReadOnlyTransaction(), shard, TestModel + .createTestContext()); + final TestActorRef subject = TestActorRef.apply(props, getSystem()); + + subject + .receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), + ActorRef.noSender()); + Assert.assertFalse(true); + + + } catch (Exception cs) { + assertEquals(cs.getClass().getSimpleName(), + Exception.class.getSimpleName()); + assertTrue(cs.getMessage().startsWith( + "ShardTransaction:handleRecieve received an unknown message")); + } + } + + @Test(expected = ReadFailedException.class) + public void testNegativeReadWithReadOnlyTransactionClosed() + throws Throwable { + + final ActorRef shard = + getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final Props props = + ShardTransaction.props(store.newReadOnlyTransaction(), shard, + TestModel.createTestContext()); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeReadWithReadOnlyTransactionClosed"); + + ShardTransactionMessages.ReadData readData = + ShardTransactionMessages.ReadData.newBuilder() + .setInstanceIdentifierPathArguments( + NormalizedNodeMessages.InstanceIdentifier.newBuilder() + .build() + ).build(); + Future future = + akka.pattern.Patterns.ask(subject, readData, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + ((ShardReadTransaction) subject.underlyingActor()) + .forUnitTestOnlyExplicitTransactionClose(); + + future = akka.pattern.Patterns.ask(subject, readData, 3000); + Await.result(future, Duration.Zero()); + + + } + + + @Test(expected = ReadFailedException.class) + public void testNegativeReadWithReadWriteOnlyTransactionClosed() + throws Throwable { + + final ActorRef shard = + getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard, + TestModel.createTestContext()); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeReadWithReadWriteOnlyTransactionClosed"); + + ShardTransactionMessages.ReadData readData = + ShardTransactionMessages.ReadData.newBuilder() + .setInstanceIdentifierPathArguments( + NormalizedNodeMessages.InstanceIdentifier.newBuilder() + .build() + ).build(); + Future future = + akka.pattern.Patterns.ask(subject, readData, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + ((ShardReadWriteTransaction) subject.underlyingActor()) + .forUnitTestOnlyExplicitTransactionClose(); + + future = akka.pattern.Patterns.ask(subject, readData, 3000); + Await.result(future, Duration.Zero()); + + + } + + + @Test(expected = IllegalStateException.class) + public void testNegativeWriteWithTransactionReady() throws Exception { + + + final ActorRef shard = + getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final Props props = + ShardTransaction.props(store.newWriteOnlyTransaction(), shard, + TestModel.createTestContext()); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeWriteWithTransactionReady"); + + ShardTransactionMessages.ReadyTransaction readyTransaction = + ShardTransactionMessages.ReadyTransaction.newBuilder().build(); + + Future future = + akka.pattern.Patterns.ask(subject, readyTransaction, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + ShardTransactionMessages.WriteData writeData = + ShardTransactionMessages.WriteData.newBuilder() + .setInstanceIdentifierPathArguments( + NormalizedNodeMessages.InstanceIdentifier.newBuilder() + .build()).setNormalizedNode( + NormalizedNodeMessages.Node.newBuilder().build() + + ).build(); + + future = akka.pattern.Patterns.ask(subject, writeData, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + + } + + + @Test(expected = IllegalStateException.class) + public void testNegativeReadWriteWithTransactionReady() throws Exception { + + + final ActorRef shard = + getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard, + TestModel.createTestContext()); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeReadWriteWithTransactionReady"); + + ShardTransactionMessages.ReadyTransaction readyTransaction = + ShardTransactionMessages.ReadyTransaction.newBuilder().build(); + + Future future = + akka.pattern.Patterns.ask(subject, readyTransaction, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + ShardTransactionMessages.WriteData writeData = + ShardTransactionMessages.WriteData.newBuilder() + .setInstanceIdentifierPathArguments( + NormalizedNodeMessages.InstanceIdentifier.newBuilder() + .build()).setNormalizedNode( + NormalizedNodeMessages.Node.newBuilder().build() + + ).build(); + + future = akka.pattern.Patterns.ask(subject, writeData, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + + } + + @Test(expected = IllegalStateException.class) + public void testNegativeMergeTransactionReady() throws Exception { + + + final ActorRef shard = + getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard, + TestModel.createTestContext()); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, "testNegativeMergeTransactionReady"); + + ShardTransactionMessages.ReadyTransaction readyTransaction = + ShardTransactionMessages.ReadyTransaction.newBuilder().build(); + + Future future = + akka.pattern.Patterns.ask(subject, readyTransaction, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + ShardTransactionMessages.MergeData mergeData = + ShardTransactionMessages.MergeData.newBuilder() + .setInstanceIdentifierPathArguments( + NormalizedNodeMessages.InstanceIdentifier.newBuilder() + .build()).setNormalizedNode( + NormalizedNodeMessages.Node.newBuilder().build() + + ).build(); + + future = akka.pattern.Patterns.ask(subject, mergeData, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + + } + + + @Test(expected = IllegalStateException.class) + public void testNegativeDeleteDataWhenTransactionReady() throws Exception { + + + final ActorRef shard = + getSystem().actorOf(Shard.props("config", Collections.EMPTY_MAP)); + final Props props = + ShardTransaction.props(store.newReadWriteTransaction(), shard, + TestModel.createTestContext()); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeDeleteDataWhenTransactionReady"); + + ShardTransactionMessages.ReadyTransaction readyTransaction = + ShardTransactionMessages.ReadyTransaction.newBuilder().build(); + + Future future = + akka.pattern.Patterns.ask(subject, readyTransaction, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + ShardTransactionMessages.DeleteData deleteData = + ShardTransactionMessages.DeleteData.newBuilder() + .setInstanceIdentifierPathArguments( + NormalizedNodeMessages.InstanceIdentifier.newBuilder() + .build()).build(); + + future = akka.pattern.Patterns.ask(subject, deleteData, 3000); + assertTrue(future.isCompleted()); + Await.result(future, Duration.Zero()); + + + } +} -- 2.36.6