Merge "Moved the resources to a separate plugin so they can be used by others."
authorDevin Avery <devin.avery@brocade.com>
Tue, 19 Aug 2014 14:23:14 +0000 (14:23 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Tue, 19 Aug 2014 14:23:14 +0000 (14:23 +0000)
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java [new file with mode: 0644]

index ac01f42a7fb017a9e33b330c64e6b5178aa95ff2..b258c4466a7d1939aacaa5c3ac32159539bf2485 100644 (file)
@@ -18,7 +18,7 @@ public abstract class AbstractUntypedActor extends UntypedActor {
         Logging.getLogger(getContext().system(), this);
 
 
-    public AbstractUntypedActor(){
+    public AbstractUntypedActor() {
         LOG.debug("Actor created {}", getSelf());
         getContext().
             system().
@@ -29,16 +29,18 @@ public abstract class AbstractUntypedActor extends UntypedActor {
     @Override public void onReceive(Object message) throws Exception {
         LOG.debug("Received message {}", message.getClass().getSimpleName());
         handleReceive(message);
-        LOG.debug("Done handling message {}", message.getClass().getSimpleName());
+        LOG.debug("Done handling message {}",
+            message.getClass().getSimpleName());
     }
 
     protected abstract void handleReceive(Object message) throws Exception;
 
-    protected void ignoreMessage(Object message){
+    protected void ignoreMessage(Object message) {
         LOG.debug("Unhandled message {} ", message);
     }
 
-    protected void unknownMessage(Object message) throws Exception{
+    protected void unknownMessage(Object message) throws Exception {
+        LOG.debug("Received unhandled message {}", message);
         unhandled(message);
     }
 }
index c329a10c0408072677bb39d9994c5ce158c74cb0..75f540ade088e6bb45b10e9b92bc4af8789b0218 100644 (file)
@@ -17,6 +17,8 @@ import akka.japi.Creator;
 import akka.serialization.Serialization;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
@@ -111,21 +113,27 @@ public class Shard extends RaftActor {
 
     }
 
-    private static Map<String, String> mapPeerAddresses(Map<ShardIdentifier, String> peerAddresses){
-        Map<String , String> map = new HashMap<>();
+    private static Map<String, String> mapPeerAddresses(
+        Map<ShardIdentifier, String> peerAddresses) {
+        Map<String, String> map = new HashMap<>();
 
-        for(Map.Entry<ShardIdentifier, String> entry : peerAddresses.entrySet()){
+        for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
+            .entrySet()) {
             map.put(entry.getKey().toString(), entry.getValue());
         }
 
         return map;
     }
 
+
+
+
     public static Props props(final ShardIdentifier name,
         final Map<ShardIdentifier, String> peerAddresses,
         final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
         Preconditions.checkNotNull(name, "name should not be null");
-        Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+        Preconditions
+            .checkNotNull(peerAddresses, "peerAddresses should not be null");
 
         return Props.create(new Creator<Shard>() {
 
@@ -164,14 +172,16 @@ public class Shard extends RaftActor {
             }
         } else if (message instanceof PeerAddressResolved) {
             PeerAddressResolved resolved = (PeerAddressResolved) message;
-            setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress());
+            setPeerAddress(resolved.getPeerId().toString(),
+                resolved.getPeerAddress());
         } else {
             super.onReceiveCommand(message);
         }
     }
 
     private ActorRef createTypedTransactionActor(
-        CreateTransaction createTransaction, ShardTransactionIdentifier transactionId) {
+        CreateTransaction createTransaction,
+        ShardTransactionIdentifier transactionId) {
         if (createTransaction.getTransactionType()
             == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
 
@@ -203,24 +213,26 @@ public class Shard extends RaftActor {
                     .props(store.newWriteOnlyTransaction(), getSelf(),
                         schemaContext), transactionId.toString());
         } else {
-            // FIXME: This does not seem right
             throw new IllegalArgumentException(
-                "CreateTransaction message has unidentified transaction type="
+                "Shard="+name + ":CreateTransaction message has unidentified transaction type="
                     + createTransaction.getTransactionType());
         }
     }
 
     private void createTransaction(CreateTransaction createTransaction) {
 
-        ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder().remoteTransactionId(createTransaction.getTransactionId()).build();
+        ShardTransactionIdentifier transactionId =
+            ShardTransactionIdentifier.builder()
+                .remoteTransactionId(createTransaction.getTransactionId())
+                .build();
         LOG.debug("Creating transaction : {} ", transactionId);
         ActorRef transactionActor =
             createTypedTransactionActor(createTransaction, transactionId);
 
         getSender()
             .tell(new CreateTransactionReply(
-                Serialization.serializedActorPath(transactionActor),
-                createTransaction.getTransactionId()).toSerializable(),
+                    Serialization.serializedActorPath(transactionActor),
+                    createTransaction.getTransactionId()).toSerializable(),
                 getSelf());
     }
 
@@ -255,22 +267,21 @@ public class Shard extends RaftActor {
 
         final ListenableFuture<Void> future = cohort.commit();
         final ActorRef self = getSelf();
-        future.addListener(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    future.get();
-                        sender
-                            .tell(new CommitTransactionReply().toSerializable(),
-                                self);
-                        shardMBean.incrementCommittedTransactionCount();
-                        shardMBean.setLastCommittedTransactionTime(new Date());
-                } catch (InterruptedException | ExecutionException e) {
-                    shardMBean.incrementFailedTransactionsCount();
-                    sender.tell(new akka.actor.Status.Failure(e),self);
-                }
+
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            public void onSuccess(Void v) {
+               sender.tell(new CommitTransactionReply().toSerializable(),self);
+               shardMBean.incrementCommittedTransactionCount();
+               shardMBean.setLastCommittedTransactionTime(new Date());
             }
-        }, getContext().dispatcher());
+
+            public void onFailure(Throwable t) {
+                LOG.error(t, "An exception happened during commit");
+                shardMBean.incrementFailedTransactionsCount();
+                sender.tell(new akka.actor.Status.Failure(t), self);
+            }
+        });
+
     }
 
     private void handleForwardedCommit(ForwardedCommitTransaction message) {
@@ -329,7 +340,7 @@ public class Shard extends RaftActor {
 
         LOG.debug(
             "registerDataChangeListener sending reply, listenerRegistrationPath = {} "
-                , listenerRegistration.path().toString());
+            , listenerRegistration.path().toString());
 
         getSender()
             .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
@@ -370,7 +381,7 @@ public class Shard extends RaftActor {
         // Update stats
         ReplicatedLogEntry lastLogEntry = getLastLogEntry();
 
-        if(lastLogEntry != null){
+        if (lastLogEntry != null) {
             shardMBean.setLastLogIndex(lastLogEntry.getIndex());
             shardMBean.setLastLogTerm(lastLogEntry.getTerm());
         }
index 5fce64e248447284b48d1f3237bc9e2252472002..3396eb556456116005459540c66a78119ca64767 100644 (file)
@@ -290,10 +290,12 @@ public class ShardManager extends AbstractUntypedActor {
 
     @Override
     public SupervisorStrategy supervisorStrategy() {
+
         return new OneForOneStrategy(10, Duration.create("1 minute"),
             new Function<Throwable, SupervisorStrategy.Directive>() {
                 @Override
                 public SupervisorStrategy.Directive apply(Throwable t) {
+                    LOG.warning("Supervisor Strategy of resume applied {}",t);
                     return SupervisorStrategy.resume();
                 }
             }
index 500b73ce9de6531c3a1d60df3e192dea18dc4606..25705bff418740c873af8334091b3961612c3f1e 100644 (file)
@@ -14,6 +14,8 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
@@ -26,8 +28,6 @@ import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransacti
 import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 
-import java.util.concurrent.ExecutionException;
-
 public class ThreePhaseCommitCohort extends AbstractUntypedActor {
     private final DOMStoreThreePhaseCommitCohort cohort;
     private final ActorRef shardActor;
@@ -58,13 +58,17 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
 
     @Override
     public void handleReceive(Object message) throws Exception {
-        if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+        if (message.getClass()
+            .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
             canCommit(new CanCommitTransaction());
-        } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+        } else if (message.getClass()
+            .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
             preCommit(new PreCommitTransaction());
-        } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+        } else if (message.getClass()
+            .equals(CommitTransaction.SERIALIZABLE_CLASS)) {
             commit(new CommitTransaction());
-        } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+        } else if (message.getClass()
+            .equals(AbortTransaction.SERIALIZABLE_CLASS)) {
             abort(new AbortTransaction());
         } else {
             unknownMessage(message);
@@ -76,17 +80,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
 
-        future.addListener(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    future.get();
-                    sender.tell(new AbortTransactionReply().toSerializable(), self);
-                } catch (InterruptedException | ExecutionException e) {
-                    log.error(e, "An exception happened when aborting");
-                }
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            public void onSuccess(Void v) {
+                sender
+                    .tell(new AbortTransactionReply().toSerializable(),
+                        self);
+            }
+
+            public void onFailure(Throwable t) {
+                LOG.error(t, "An exception happened during abort");
+                sender
+                    .tell(new akka.actor.Status.Failure(t), getSelf());
             }
-        }, getContext().dispatcher());
+        });
     }
 
     private void commit(CommitTransaction message) {
@@ -103,18 +109,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ListenableFuture<Void> future = cohort.preCommit();
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
+        Futures.addCallback(future, new FutureCallback<Void>() {
+            public void onSuccess(Void v) {
+                sender
+                    .tell(new PreCommitTransactionReply().toSerializable(),
+                        self);
+            }
 
-        future.addListener(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    future.get();
-                    sender.tell(new PreCommitTransactionReply().toSerializable(), self);
-                } catch (InterruptedException | ExecutionException e) {
-                    log.error(e, "An exception happened when preCommitting");
-                }
+            public void onFailure(Throwable t) {
+                LOG.error(t, "An exception happened during pre-commit");
+                sender
+                    .tell(new akka.actor.Status.Failure(t), getSelf());
             }
-        }, getContext().dispatcher());
+        });
 
     }
 
@@ -122,18 +129,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ListenableFuture<Boolean> future = cohort.canCommit();
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
+        Futures.addCallback(future, new FutureCallback<Boolean>() {
+            public void onSuccess(Boolean canCommit) {
+                sender.tell(new CanCommitTransactionReply(canCommit)
+                    .toSerializable(), self);
+            }
 
-        future.addListener(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    Boolean canCommit = future.get();
-                    sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self);
-                } catch (InterruptedException | ExecutionException e) {
-                    log.error(e, "An exception happened when checking canCommit");
-                }
+            public void onFailure(Throwable t) {
+                LOG.error(t, "An exception happened during canCommit");
+                sender
+                    .tell(new akka.actor.Status.Failure(t), getSelf());
             }
-        }, getContext().dispatcher());
+        });
+
 
     }
 }
index 2c23afca127dd17ec150b2811da0a6b64b91f137..16b73040a5b6e5375a24570fb5b1617240eadb04 100644 (file)
@@ -33,6 +33,7 @@ import static org.junit.Assert.assertTrue;
 
 /**
  * Covers negative test cases
+ *
  * @author Basheeruddin Ahmed <syedbahm@cisco.com>
  */
 public class ShardTransactionFailureTest extends AbstractActorTest {
@@ -48,7 +49,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
     private static final ShardIdentifier SHARD_IDENTIFIER =
         ShardIdentifier.builder().memberName("member-1")
-            .shardName("inventory").type("config").build();
+            .shardName("inventory").type("operational").build();
 
     static {
         store.onGlobalContextUpdated(testSchemaContext);
@@ -95,7 +96,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
@@ -129,7 +130,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
@@ -164,7 +165,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
         final Props props =
             ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
                 TestModel.createTestContext());
@@ -203,7 +204,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
@@ -241,7 +242,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
@@ -279,7 +280,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
         final Props props =
             ShardTransaction.props(store.newReadWriteTransaction(), shard,
                 TestModel.createTestContext());
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
new file mode 100644 (file)
index 0000000..870889b
--- /dev/null
@@ -0,0 +1,232 @@
+/*
+ *
+ *  Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ *  This program and the accompanying materials are made available under the
+ *  terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ *  and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+
+public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
+
+    private static ListeningExecutorService storeExecutor =
+        MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+    private static final InMemoryDOMDataStore store =
+        new InMemoryDOMDataStore("OPER", storeExecutor,
+            MoreExecutors.sameThreadExecutor());
+
+    private static final SchemaContext testSchemaContext =
+        TestModel.createTestContext();
+
+    private static final ShardIdentifier SHARD_IDENTIFIER =
+        ShardIdentifier.builder().memberName("member-1")
+            .shardName("inventory").type("config").build();
+
+    static {
+        store.onGlobalContextUpdated(testSchemaContext);
+    }
+
+    private FiniteDuration ASK_RESULT_DURATION = Duration.create(3000, TimeUnit.MILLISECONDS);
+
+
+    @Test(expected = TestException.class)
+    public void testNegativeAbortResultsInException() throws Exception {
+
+        final ActorRef shard =
+            getSystem()
+                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+            .mock(DOMStoreThreePhaseCommitCohort.class);
+        final CompositeModification mockComposite =
+            Mockito.mock(CompositeModification.class);
+        final Props props =
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+        final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+            .create(getSystem(), props,
+                "testNegativeAbortResultsInException");
+
+        when(mockCohort.abort()).thenReturn(
+            Futures.<Void>immediateFailedFuture(new TestException()));
+
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject,
+                ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
+                    .build(), 3000);
+        assertTrue(future.isCompleted());
+
+        Await.result(future, ASK_RESULT_DURATION);
+
+
+
+    }
+
+
+    @Test(expected = OptimisticLockFailedException.class)
+    public void testNegativeCanCommitResultsInException() throws Exception {
+
+        final ActorRef shard =
+            getSystem()
+                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+            .mock(DOMStoreThreePhaseCommitCohort.class);
+        final CompositeModification mockComposite =
+            Mockito.mock(CompositeModification.class);
+        final Props props =
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+        final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+            .create(getSystem(), props,
+                "testNegativeCanCommitResultsInException");
+
+        when(mockCohort.canCommit()).thenReturn(
+            Futures
+                .<Boolean>immediateFailedFuture(
+                    new OptimisticLockFailedException("some exception")));
+
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject,
+                ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
+                    .build(), 3000);
+
+
+        Await.result(future, ASK_RESULT_DURATION);
+
+    }
+
+
+    @Test(expected = TestException.class)
+    public void testNegativePreCommitResultsInException() throws Exception {
+
+        final ActorRef shard =
+            getSystem()
+                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+            .mock(DOMStoreThreePhaseCommitCohort.class);
+        final CompositeModification mockComposite =
+            Mockito.mock(CompositeModification.class);
+        final Props props =
+            ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+        final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+            .create(getSystem(), props,
+                "testNegativePreCommitResultsInException");
+
+        when(mockCohort.preCommit()).thenReturn(
+            Futures
+                .<Void>immediateFailedFuture(
+                    new TestException()));
+
+        Future<Object> future =
+            akka.pattern.Patterns.ask(subject,
+                ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
+                    .build(), 3000);
+
+        Await.result(future, ASK_RESULT_DURATION);
+
+    }
+
+    @Test(expected = TestException.class)
+    public void testNegativeCommitResultsInException() throws Exception {
+
+        final TestActorRef<Shard> subject = TestActorRef
+            .create(getSystem(),
+                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null),
+                "testNegativeCommitResultsInException");
+
+        final ActorRef shardTransaction =
+            getSystem().actorOf(
+                ShardTransaction.props(store.newReadWriteTransaction(), subject,
+                    TestModel.createTestContext()));
+
+        ShardTransactionMessages.WriteData writeData =
+            ShardTransactionMessages.WriteData.newBuilder()
+                .setInstanceIdentifierPathArguments(
+                    NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+                        .build()).setNormalizedNode(
+                NormalizedNodeMessages.Node.newBuilder().build()
+
+            ).build();
+
+        //This is done so that Modification list is updated which is used during commit
+        Future future =
+            akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+
+        //ready transaction creates the cohort so that we get into the
+        //block where in commmit is done
+        ShardTransactionMessages.ReadyTransaction readyTransaction =
+            ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+        future =
+            akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+
+        //but when the message is sent it will have the MockCommit object
+        //so that we can simulate throwing of exception
+        ForwardedCommitTransaction mockForwardCommitTransaction =
+            Mockito.mock(ForwardedCommitTransaction.class);
+        DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
+            Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
+        when(mockForwardCommitTransaction.getCohort())
+            .thenReturn(mockThreePhaseCommitTransaction);
+        when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
+            .<Void>immediateFailedFuture(
+                new TestException()));
+        Modification mockModification = Mockito.mock(
+            Modification.class);
+        when(mockForwardCommitTransaction.getModification())
+            .thenReturn(mockModification);
+
+        when(mockModification.toSerializable()).thenReturn(
+            PersistentMessages.CompositeModification.newBuilder().build());
+
+        future =
+            akka.pattern.Patterns.ask(subject,
+                mockForwardCommitTransaction
+                , 3000);
+        Await.result(future, ASK_RESULT_DURATION);
+
+
+    }
+
+    private class TestException extends Exception {
+    }
+
+
+}