Logging.getLogger(getContext().system(), this);
- public AbstractUntypedActor(){
+ public AbstractUntypedActor() {
LOG.debug("Actor created {}", getSelf());
getContext().
system().
@Override public void onReceive(Object message) throws Exception {
LOG.debug("Received message {}", message.getClass().getSimpleName());
handleReceive(message);
- LOG.debug("Done handling message {}", message.getClass().getSimpleName());
+ LOG.debug("Done handling message {}",
+ message.getClass().getSimpleName());
}
protected abstract void handleReceive(Object message) throws Exception;
- protected void ignoreMessage(Object message){
+ protected void ignoreMessage(Object message) {
LOG.debug("Unhandled message {} ", message);
}
- protected void unknownMessage(Object message) throws Exception{
+ protected void unknownMessage(Object message) throws Exception {
+ LOG.debug("Received unhandled message {}", message);
unhandled(message);
}
}
import akka.serialization.Serialization;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
}
- private static Map<String, String> mapPeerAddresses(Map<ShardIdentifier, String> peerAddresses){
- Map<String , String> map = new HashMap<>();
+ private static Map<String, String> mapPeerAddresses(
+ Map<ShardIdentifier, String> peerAddresses) {
+ Map<String, String> map = new HashMap<>();
- for(Map.Entry<ShardIdentifier, String> entry : peerAddresses.entrySet()){
+ for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
+ .entrySet()) {
map.put(entry.getKey().toString(), entry.getValue());
}
return map;
}
+
+
+
public static Props props(final ShardIdentifier name,
final Map<ShardIdentifier, String> peerAddresses,
final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
Preconditions.checkNotNull(name, "name should not be null");
- Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+ Preconditions
+ .checkNotNull(peerAddresses, "peerAddresses should not be null");
return Props.create(new Creator<Shard>() {
}
} else if (message instanceof PeerAddressResolved) {
PeerAddressResolved resolved = (PeerAddressResolved) message;
- setPeerAddress(resolved.getPeerId().toString(), resolved.getPeerAddress());
+ setPeerAddress(resolved.getPeerId().toString(),
+ resolved.getPeerAddress());
} else {
super.onReceiveCommand(message);
}
}
private ActorRef createTypedTransactionActor(
- CreateTransaction createTransaction, ShardTransactionIdentifier transactionId) {
+ CreateTransaction createTransaction,
+ ShardTransactionIdentifier transactionId) {
if (createTransaction.getTransactionType()
== TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
.props(store.newWriteOnlyTransaction(), getSelf(),
schemaContext), transactionId.toString());
} else {
- // FIXME: This does not seem right
throw new IllegalArgumentException(
- "CreateTransaction message has unidentified transaction type="
+ "Shard="+name + ":CreateTransaction message has unidentified transaction type="
+ createTransaction.getTransactionType());
}
}
private void createTransaction(CreateTransaction createTransaction) {
- ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder().remoteTransactionId(createTransaction.getTransactionId()).build();
+ ShardTransactionIdentifier transactionId =
+ ShardTransactionIdentifier.builder()
+ .remoteTransactionId(createTransaction.getTransactionId())
+ .build();
LOG.debug("Creating transaction : {} ", transactionId);
ActorRef transactionActor =
createTypedTransactionActor(createTransaction, transactionId);
getSender()
.tell(new CreateTransactionReply(
- Serialization.serializedActorPath(transactionActor),
- createTransaction.getTransactionId()).toSerializable(),
+ Serialization.serializedActorPath(transactionActor),
+ createTransaction.getTransactionId()).toSerializable(),
getSelf());
}
final ListenableFuture<Void> future = cohort.commit();
final ActorRef self = getSelf();
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender
- .tell(new CommitTransactionReply().toSerializable(),
- self);
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(new Date());
- } catch (InterruptedException | ExecutionException e) {
- shardMBean.incrementFailedTransactionsCount();
- sender.tell(new akka.actor.Status.Failure(e),self);
- }
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ public void onSuccess(Void v) {
+ sender.tell(new CommitTransactionReply().toSerializable(),self);
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(new Date());
}
- }, getContext().dispatcher());
+
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during commit");
+ shardMBean.incrementFailedTransactionsCount();
+ sender.tell(new akka.actor.Status.Failure(t), self);
+ }
+ });
+
}
private void handleForwardedCommit(ForwardedCommitTransaction message) {
LOG.debug(
"registerDataChangeListener sending reply, listenerRegistrationPath = {} "
- , listenerRegistration.path().toString());
+ , listenerRegistration.path().toString());
getSender()
.tell(new RegisterChangeListenerReply(listenerRegistration.path()),
// Update stats
ReplicatedLogEntry lastLogEntry = getLastLogEntry();
- if(lastLogEntry != null){
+ if (lastLogEntry != null) {
shardMBean.setLastLogIndex(lastLogEntry.getIndex());
shardMBean.setLastLogTerm(lastLogEntry.getTerm());
}
@Override
public SupervisorStrategy supervisorStrategy() {
+
return new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, SupervisorStrategy.Directive>() {
@Override
public SupervisorStrategy.Directive apply(Throwable t) {
+ LOG.warning("Supervisor Strategy of resume applied {}",t);
return SupervisorStrategy.resume();
}
}
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import java.util.concurrent.ExecutionException;
-
public class ThreePhaseCommitCohort extends AbstractUntypedActor {
private final DOMStoreThreePhaseCommitCohort cohort;
private final ActorRef shardActor;
@Override
public void handleReceive(Object message) throws Exception {
- if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ if (message.getClass()
+ .equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
canCommit(new CanCommitTransaction());
- } else if (message.getClass().equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (message.getClass()
+ .equals(PreCommitTransaction.SERIALIZABLE_CLASS)) {
preCommit(new PreCommitTransaction());
- } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (message.getClass()
+ .equals(CommitTransaction.SERIALIZABLE_CLASS)) {
commit(new CommitTransaction());
- } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ } else if (message.getClass()
+ .equals(AbortTransaction.SERIALIZABLE_CLASS)) {
abort(new AbortTransaction());
} else {
unknownMessage(message);
final ActorRef sender = getSender();
final ActorRef self = getSelf();
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender.tell(new AbortTransactionReply().toSerializable(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when aborting");
- }
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ public void onSuccess(Void v) {
+ sender
+ .tell(new AbortTransactionReply().toSerializable(),
+ self);
+ }
+
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during abort");
+ sender
+ .tell(new akka.actor.Status.Failure(t), getSelf());
}
- }, getContext().dispatcher());
+ });
}
private void commit(CommitTransaction message) {
final ListenableFuture<Void> future = cohort.preCommit();
final ActorRef sender = getSender();
final ActorRef self = getSelf();
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ public void onSuccess(Void v) {
+ sender
+ .tell(new PreCommitTransactionReply().toSerializable(),
+ self);
+ }
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- future.get();
- sender.tell(new PreCommitTransactionReply().toSerializable(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when preCommitting");
- }
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during pre-commit");
+ sender
+ .tell(new akka.actor.Status.Failure(t), getSelf());
}
- }, getContext().dispatcher());
+ });
}
final ListenableFuture<Boolean> future = cohort.canCommit();
final ActorRef sender = getSender();
final ActorRef self = getSelf();
+ Futures.addCallback(future, new FutureCallback<Boolean>() {
+ public void onSuccess(Boolean canCommit) {
+ sender.tell(new CanCommitTransactionReply(canCommit)
+ .toSerializable(), self);
+ }
- future.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- Boolean canCommit = future.get();
- sender.tell(new CanCommitTransactionReply(canCommit).toSerializable(), self);
- } catch (InterruptedException | ExecutionException e) {
- log.error(e, "An exception happened when checking canCommit");
- }
+ public void onFailure(Throwable t) {
+ LOG.error(t, "An exception happened during canCommit");
+ sender
+ .tell(new akka.actor.Status.Failure(t), getSelf());
}
- }, getContext().dispatcher());
+ });
+
}
}
/**
* Covers negative test cases
+ *
* @author Basheeruddin Ahmed <syedbahm@cisco.com>
*/
public class ShardTransactionFailureTest extends AbstractActorTest {
private static final ShardIdentifier SHARD_IDENTIFIER =
ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
+ .shardName("inventory").type("operational").build();
static {
store.onGlobalContextUpdated(testSchemaContext);
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
throws Throwable {
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
final ActorRef shard =
- getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
+ getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
final Props props =
ShardTransaction.props(store.newReadWriteTransaction(), shard,
TestModel.createTestContext());
--- /dev/null
+/*
+ *
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ *
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.ForwardedCommitTransaction;
+import org.opendaylight.controller.cluster.datastore.modification.CompositeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages;
+import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
+import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessages;
+import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+
+public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
+
+ private static ListeningExecutorService storeExecutor =
+ MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+ private static final InMemoryDOMDataStore store =
+ new InMemoryDOMDataStore("OPER", storeExecutor,
+ MoreExecutors.sameThreadExecutor());
+
+ private static final SchemaContext testSchemaContext =
+ TestModel.createTestContext();
+
+ private static final ShardIdentifier SHARD_IDENTIFIER =
+ ShardIdentifier.builder().memberName("member-1")
+ .shardName("inventory").type("config").build();
+
+ static {
+ store.onGlobalContextUpdated(testSchemaContext);
+ }
+
+ private FiniteDuration ASK_RESULT_DURATION = Duration.create(3000, TimeUnit.MILLISECONDS);
+
+
+ @Test(expected = TestException.class)
+ public void testNegativeAbortResultsInException() throws Exception {
+
+ final ActorRef shard =
+ getSystem()
+ .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+ .mock(DOMStoreThreePhaseCommitCohort.class);
+ final CompositeModification mockComposite =
+ Mockito.mock(CompositeModification.class);
+ final Props props =
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+ final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeAbortResultsInException");
+
+ when(mockCohort.abort()).thenReturn(
+ Futures.<Void>immediateFailedFuture(new TestException()));
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject,
+ ThreePhaseCommitCohortMessages.AbortTransaction.newBuilder()
+ .build(), 3000);
+ assertTrue(future.isCompleted());
+
+ Await.result(future, ASK_RESULT_DURATION);
+
+
+
+ }
+
+
+ @Test(expected = OptimisticLockFailedException.class)
+ public void testNegativeCanCommitResultsInException() throws Exception {
+
+ final ActorRef shard =
+ getSystem()
+ .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+ .mock(DOMStoreThreePhaseCommitCohort.class);
+ final CompositeModification mockComposite =
+ Mockito.mock(CompositeModification.class);
+ final Props props =
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+ final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativeCanCommitResultsInException");
+
+ when(mockCohort.canCommit()).thenReturn(
+ Futures
+ .<Boolean>immediateFailedFuture(
+ new OptimisticLockFailedException("some exception")));
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject,
+ ThreePhaseCommitCohortMessages.CanCommitTransaction.newBuilder()
+ .build(), 3000);
+
+
+ Await.result(future, ASK_RESULT_DURATION);
+
+ }
+
+
+ @Test(expected = TestException.class)
+ public void testNegativePreCommitResultsInException() throws Exception {
+
+ final ActorRef shard =
+ getSystem()
+ .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+ final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
+ .mock(DOMStoreThreePhaseCommitCohort.class);
+ final CompositeModification mockComposite =
+ Mockito.mock(CompositeModification.class);
+ final Props props =
+ ThreePhaseCommitCohort.props(mockCohort, shard, mockComposite);
+
+ final TestActorRef<ThreePhaseCommitCohort> subject = TestActorRef
+ .create(getSystem(), props,
+ "testNegativePreCommitResultsInException");
+
+ when(mockCohort.preCommit()).thenReturn(
+ Futures
+ .<Void>immediateFailedFuture(
+ new TestException()));
+
+ Future<Object> future =
+ akka.pattern.Patterns.ask(subject,
+ ThreePhaseCommitCohortMessages.PreCommitTransaction.newBuilder()
+ .build(), 3000);
+
+ Await.result(future, ASK_RESULT_DURATION);
+
+ }
+
+ @Test(expected = TestException.class)
+ public void testNegativeCommitResultsInException() throws Exception {
+
+ final TestActorRef<Shard> subject = TestActorRef
+ .create(getSystem(),
+ Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null),
+ "testNegativeCommitResultsInException");
+
+ final ActorRef shardTransaction =
+ getSystem().actorOf(
+ ShardTransaction.props(store.newReadWriteTransaction(), subject,
+ TestModel.createTestContext()));
+
+ ShardTransactionMessages.WriteData writeData =
+ ShardTransactionMessages.WriteData.newBuilder()
+ .setInstanceIdentifierPathArguments(
+ NormalizedNodeMessages.InstanceIdentifier.newBuilder()
+ .build()).setNormalizedNode(
+ NormalizedNodeMessages.Node.newBuilder().build()
+
+ ).build();
+
+ //This is done so that Modification list is updated which is used during commit
+ Future future =
+ akka.pattern.Patterns.ask(shardTransaction, writeData, 3000);
+
+ //ready transaction creates the cohort so that we get into the
+ //block where in commmit is done
+ ShardTransactionMessages.ReadyTransaction readyTransaction =
+ ShardTransactionMessages.ReadyTransaction.newBuilder().build();
+
+ future =
+ akka.pattern.Patterns.ask(shardTransaction, readyTransaction, 3000);
+
+ //but when the message is sent it will have the MockCommit object
+ //so that we can simulate throwing of exception
+ ForwardedCommitTransaction mockForwardCommitTransaction =
+ Mockito.mock(ForwardedCommitTransaction.class);
+ DOMStoreThreePhaseCommitCohort mockThreePhaseCommitTransaction =
+ Mockito.mock(DOMStoreThreePhaseCommitCohort.class);
+ when(mockForwardCommitTransaction.getCohort())
+ .thenReturn(mockThreePhaseCommitTransaction);
+ when(mockThreePhaseCommitTransaction.commit()).thenReturn(Futures
+ .<Void>immediateFailedFuture(
+ new TestException()));
+ Modification mockModification = Mockito.mock(
+ Modification.class);
+ when(mockForwardCommitTransaction.getModification())
+ .thenReturn(mockModification);
+
+ when(mockModification.toSerializable()).thenReturn(
+ PersistentMessages.CompositeModification.newBuilder().build());
+
+ future =
+ akka.pattern.Patterns.ask(subject,
+ mockForwardCommitTransaction
+ , 3000);
+ Await.result(future, ASK_RESULT_DURATION);
+
+
+ }
+
+ private class TestException extends Exception {
+ }
+
+
+}