From: Devin Avery Date: Tue, 19 Aug 2014 14:23:14 +0000 (+0000) Subject: Merge "Moved the resources to a separate plugin so they can be used by others." X-Git-Tag: release/helium~268 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=3d7027237aa124800753b9438ec625b73000a0d9;hp=eed379a371f49c487aab762c55443fad18613417 Merge "Moved the resources to a separate plugin so they can be used by others." --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java index ac01f42a7f..b258c4466a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java @@ -18,7 +18,7 @@ public abstract class AbstractUntypedActor extends UntypedActor { Logging.getLogger(getContext().system(), this); - public AbstractUntypedActor(){ + public AbstractUntypedActor() { LOG.debug("Actor created {}", getSelf()); getContext(). system(). @@ -29,16 +29,18 @@ public abstract class AbstractUntypedActor extends UntypedActor { @Override public void onReceive(Object message) throws Exception { LOG.debug("Received message {}", message.getClass().getSimpleName()); handleReceive(message); - LOG.debug("Done handling message {}", message.getClass().getSimpleName()); + LOG.debug("Done handling message {}", + message.getClass().getSimpleName()); } protected abstract void handleReceive(Object message) throws Exception; - protected void ignoreMessage(Object message){ + protected void ignoreMessage(Object message) { LOG.debug("Unhandled message {} ", message); } - protected void unknownMessage(Object message) throws Exception{ + protected void unknownMessage(Object message) throws Exception { + LOG.debug("Received unhandled message {}", message); unhandled(message); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c329a10c04..75f540ade0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -17,6 +17,8 @@ import akka.japi.Creator; import akka.serialization.Serialization; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; @@ -111,21 +113,27 @@ public class Shard extends RaftActor { } - private static Map mapPeerAddresses(Map peerAddresses){ - Map map = new HashMap<>(); + private static Map mapPeerAddresses( + Map peerAddresses) { + Map map = new HashMap<>(); - for(Map.Entry entry : peerAddresses.entrySet()){ + for (Map.Entry entry : peerAddresses + .entrySet()) { map.put(entry.getKey().toString(), entry.getValue()); } return map; } + + + public static Props props(final ShardIdentifier name, final Map peerAddresses, final InMemoryDOMDataStoreConfigProperties dataStoreProperties) { Preconditions.checkNotNull(name, "name should not be null"); - Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null"); + Preconditions + .checkNotNull(peerAddresses, "peerAddresses should not be null"); return Props.create(new Creator() { @@ -164,14 +172,16 @@ public class Shard extends RaftActor { } } else if (message instanceof PeerAddressResolved) { PeerAddressResolved resolved = (PeerAddressResolved) message; - setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress()); + setPeerAddress(resolved.getPeerId().toString(), + resolved.getPeerAddress()); } else { super.onReceiveCommand(message); } } private ActorRef createTypedTransactionActor( - CreateTransaction createTransaction, ShardTransactionIdentifier transactionId) { + CreateTransaction createTransaction, + ShardTransactionIdentifier transactionId) { if (createTransaction.getTransactionType() == TransactionProxy.TransactionType.READ_ONLY.ordinal()) { @@ -203,24 +213,26 @@ public class Shard extends RaftActor { .props(store.newWriteOnlyTransaction(), getSelf(), schemaContext), transactionId.toString()); } else { - // FIXME: This does not seem right throw new IllegalArgumentException( - "CreateTransaction message has unidentified transaction type=" + "Shard="+name + ":CreateTransaction message has unidentified transaction type=" + createTransaction.getTransactionType()); } } private void createTransaction(CreateTransaction createTransaction) { - ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder().remoteTransactionId(createTransaction.getTransactionId()).build(); + ShardTransactionIdentifier transactionId = + ShardTransactionIdentifier.builder() + .remoteTransactionId(createTransaction.getTransactionId()) + .build(); LOG.debug("Creating transaction : {} ", transactionId); ActorRef transactionActor = createTypedTransactionActor(createTransaction, transactionId); getSender() .tell(new CreateTransactionReply( - Serialization.serializedActorPath(transactionActor), - createTransaction.getTransactionId()).toSerializable(), + Serialization.serializedActorPath(transactionActor), + createTransaction.getTransactionId()).toSerializable(), getSelf()); } @@ -255,22 +267,21 @@ public class Shard extends RaftActor { final ListenableFuture future = cohort.commit(); final ActorRef self = getSelf(); - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender - .tell(new CommitTransactionReply().toSerializable(), - self); - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(new Date()); - } catch (InterruptedException | ExecutionException e) { - shardMBean.incrementFailedTransactionsCount(); - sender.tell(new akka.actor.Status.Failure(e),self); - } + + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Void v) { + sender.tell(new CommitTransactionReply().toSerializable(),self); + shardMBean.incrementCommittedTransactionCount(); + shardMBean.setLastCommittedTransactionTime(new Date()); } - }, getContext().dispatcher()); + + public void onFailure(Throwable t) { + LOG.error(t, "An exception happened during commit"); + shardMBean.incrementFailedTransactionsCount(); + sender.tell(new akka.actor.Status.Failure(t), self); + } + }); + } private void handleForwardedCommit(ForwardedCommitTransaction message) { @@ -329,7 +340,7 @@ public class Shard extends RaftActor { LOG.debug( "registerDataChangeListener sending reply, listenerRegistrationPath = {} " - , listenerRegistration.path().toString()); + , listenerRegistration.path().toString()); getSender() .tell(new RegisterChangeListenerReply(listenerRegistration.path()), @@ -370,7 +381,7 @@ public class Shard extends RaftActor { // Update stats ReplicatedLogEntry lastLogEntry = getLastLogEntry(); - if(lastLogEntry != null){ + if (lastLogEntry != null) { shardMBean.setLastLogIndex(lastLogEntry.getIndex()); shardMBean.setLastLogTerm(lastLogEntry.getTerm()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 5fce64e248..3396eb5564 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -290,10 +290,12 @@ public class ShardManager extends AbstractUntypedActor { @Override public SupervisorStrategy supervisorStrategy() { + return new OneForOneStrategy(10, Duration.create("1 minute"), new Function() { @Override public SupervisorStrategy.Directive apply(Throwable t) { + LOG.warning("Supervisor Strategy of resume applied {}",t); return SupervisorStrategy.resume(); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java index 500b73ce9d..25705bff41 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java @@ -14,6 +14,8 @@ import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -26,8 +28,6 @@ import org.opendaylight.controller.cluster.datastore.messages.PreCommitTransacti import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import java.util.concurrent.ExecutionException; - public class ThreePhaseCommitCohort extends AbstractUntypedActor { private final DOMStoreThreePhaseCommitCohort cohort; private final ActorRef shardActor; @@ -58,13 +58,17 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { @Override public void handleReceive(Object message) throws Exception { - if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { + if (message.getClass() + .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) { canCommit(new CanCommitTransaction()); - } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) { + } else if (message.getClass() + .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) { preCommit(new PreCommitTransaction()); - } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) { + } else if (message.getClass() + .equals(CommitTransaction.SERIALIZABLE_CLASS)) { commit(new CommitTransaction()); - } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) { + } else if (message.getClass() + .equals(AbortTransaction.SERIALIZABLE_CLASS)) { abort(new AbortTransaction()); } else { unknownMessage(message); @@ -76,17 +80,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { final ActorRef sender = getSender(); final ActorRef self = getSelf(); - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender.tell(new AbortTransactionReply().toSerializable(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when aborting"); - } + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Void v) { + sender + .tell(new AbortTransactionReply().toSerializable(), + self); + } + + public void onFailure(Throwable t) { + LOG.error(t, "An exception happened during abort"); + sender + .tell(new akka.actor.Status.Failure(t), getSelf()); } - }, getContext().dispatcher()); + }); } private void commit(CommitTransaction message) { @@ -103,18 +109,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { final ListenableFuture future = cohort.preCommit(); final ActorRef sender = getSender(); final ActorRef self = getSelf(); + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Void v) { + sender + .tell(new PreCommitTransactionReply().toSerializable(), + self); + } - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender.tell(new PreCommitTransactionReply().toSerializable(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when preCommitting"); - } + public void onFailure(Throwable t) { + LOG.error(t, "An exception happened during pre-commit"); + sender + .tell(new akka.actor.Status.Failure(t), getSelf()); } - }, getContext().dispatcher()); + }); } @@ -122,18 +129,19 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor { final ListenableFuture future = cohort.canCommit(); final ActorRef sender = getSender(); final ActorRef self = getSelf(); + Futures.addCallback(future, new FutureCallback() { + public void onSuccess(Boolean canCommit) { + sender.tell(new CanCommitTransactionReply(canCommit) + .toSerializable(), self); + } - future.addListener(new Runnable() { - @Override - public void run() { - try { - Boolean canCommit = future.get(); - sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when checking canCommit"); - } + public void onFailure(Throwable t) { + LOG.error(t, "An exception happened during canCommit"); + sender + .tell(new akka.actor.Status.Failure(t), getSelf()); } - }, getContext().dispatcher()); + }); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java index 2c23afca12..16b73040a5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java @@ -33,6 +33,7 @@ import static org.junit.Assert.assertTrue; /** * Covers negative test cases + * * @author Basheeruddin Ahmed */ public class ShardTransactionFailureTest extends AbstractActorTest { @@ -48,7 +49,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { private static final ShardIdentifier SHARD_IDENTIFIER = ShardIdentifier.builder().memberName("member-1") - .shardName("inventory").type("config").build(); + .shardName("inventory").type("operational").build(); static { store.onGlobalContextUpdated(testSchemaContext); @@ -95,7 +96,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -129,7 +130,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { throws Throwable { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -164,7 +165,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext()); @@ -203,7 +204,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -241,7 +242,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); @@ -279,7 +280,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest { final ActorRef shard = - getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null)); + getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java new file mode 100644 index 0000000000..870889b350 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java @@ -0,0 +1,232 @@ +/* + * + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + * + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.junit.Test; +import org.mockito.Mockito; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction; +import org.opendaylight.controller.cluster.datastore.modification.CompositeModification; +import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException; +import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; +import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; +import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; +import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages; +import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + + +public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest { + + private static ListeningExecutorService storeExecutor = + MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor()); + + private static final InMemoryDOMDataStore store = + new InMemoryDOMDataStore("OPER", storeExecutor, + MoreExecutors.sameThreadExecutor()); + + private static final SchemaContext testSchemaContext = + TestModel.createTestContext(); + + private static final ShardIdentifier SHARD_IDENTIFIER = + ShardIdentifier.builder().memberName("member-1") + .shardName("inventory").type("config").build(); + + static { + store.onGlobalContextUpdated(testSchemaContext); + } + + private FiniteDuration ASK_RESULT_DURATION = Duration.create(3000, TimeUnit.MILLISECONDS); + + + @Test(expected = TestException.class) + public void testNegativeAbortResultsInException() throws Exception { + + final ActorRef shard = + getSystem() + .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); + final DOMStoreThreePhaseCommitCohort mockCohort = Mockito + .mock(DOMStoreThreePhaseCommitCohort.class); + final CompositeModification mockComposite = + Mockito.mock(CompositeModification.class); + final Props props = + ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeAbortResultsInException"); + + when(mockCohort.abort()).thenReturn( + Futures.immediateFailedFuture(new TestException())); + + Future future = + akka.pattern.Patterns.ask(subject, + ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder() + .build(), 3000); + assertTrue(future.isCompleted()); + + Await.result(future, ASK_RESULT_DURATION); + + + + } + + + @Test(expected = OptimisticLockFailedException.class) + public void testNegativeCanCommitResultsInException() throws Exception { + + final ActorRef shard = + getSystem() + .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); + final DOMStoreThreePhaseCommitCohort mockCohort = Mockito + .mock(DOMStoreThreePhaseCommitCohort.class); + final CompositeModification mockComposite = + Mockito.mock(CompositeModification.class); + final Props props = + ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativeCanCommitResultsInException"); + + when(mockCohort.canCommit()).thenReturn( + Futures + .immediateFailedFuture( + new OptimisticLockFailedException("some exception"))); + + Future future = + akka.pattern.Patterns.ask(subject, + ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder() + .build(), 3000); + + + Await.result(future, ASK_RESULT_DURATION); + + } + + + @Test(expected = TestException.class) + public void testNegativePreCommitResultsInException() throws Exception { + + final ActorRef shard = + getSystem() + .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null)); + final DOMStoreThreePhaseCommitCohort mockCohort = Mockito + .mock(DOMStoreThreePhaseCommitCohort.class); + final CompositeModification mockComposite = + Mockito.mock(CompositeModification.class); + final Props props = + ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite); + + final TestActorRef subject = TestActorRef + .create(getSystem(), props, + "testNegativePreCommitResultsInException"); + + when(mockCohort.preCommit()).thenReturn( + Futures + .immediateFailedFuture( + new TestException())); + + Future future = + akka.pattern.Patterns.ask(subject, + ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder() + .build(), 3000); + + Await.result(future, ASK_RESULT_DURATION); + + } + + @Test(expected = TestException.class) + public void testNegativeCommitResultsInException() throws Exception { + + final TestActorRef subject = TestActorRef + .create(getSystem(), + Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null), + "testNegativeCommitResultsInException"); + + final ActorRef shardTransaction = + getSystem().actorOf( + ShardTransaction.props(store.newReadWriteTransaction(), subject, + TestModel.createTestContext())); + + ShardTransactionMessages.WriteData writeData = + ShardTransactionMessages.WriteData.newBuilder() + .setInstanceIdentifierPathArguments( + NormalizedNodeMessages.InstanceIdentifier.newBuilder() + .build()).setNormalizedNode( + NormalizedNodeMessages.Node.newBuilder().build() + + ).build(); + + //This is done so that Modification list is updated which is used during commit + Future future = + akka.pattern.Patterns.ask(shardTransaction, writeData, 3000); + + //ready transaction creates the cohort so that we get into the + //block where in commmit is done + ShardTransactionMessages.ReadyTransaction readyTransaction = + ShardTransactionMessages.ReadyTransaction.newBuilder().build(); + + future = + akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000); + + //but when the message is sent it will have the MockCommit object + //so that we can simulate throwing of exception + ForwardedCommitTransaction mockForwardCommitTransaction = + Mockito.mock(ForwardedCommitTransaction.class); + DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction = + Mockito.mock(DOMStoreThreePhaseCommitCohort.class); + when(mockForwardCommitTransaction.getCohort()) + .thenReturn(mockThreePhaseCommitTransaction); + when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures + .immediateFailedFuture( + new TestException())); + Modification mockModification = Mockito.mock( + Modification.class); + when(mockForwardCommitTransaction.getModification()) + .thenReturn(mockModification); + + when(mockModification.toSerializable()).thenReturn( + PersistentMessages.CompositeModification.newBuilder().build()); + + future = + akka.pattern.Patterns.ask(subject, + mockForwardCommitTransaction + , 3000); + Await.result(future, ASK_RESULT_DURATION); + + + } + + private class TestException extends Exception { + } + + +}