import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.japi.Creator;
-
+import akka.serialization.Serialization;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.CheckedFuture;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-
-
import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
ActorRef cohortActor = getContext().actorOf(
ThreePhaseCommitCohort.props(cohort, shardActor, modification, shardStats), "cohort");
- getSender()
- .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
+ getSender().tell(new ReadyTransactionReply(
+ Serialization.serializedActorPath(cohortActor)).toSerializable(), getSelf());
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
private static final Logger LOG = LoggerFactory.getLogger(ThreePhaseCommitCohortProxy.class);
private final ActorContext actorContext;
- private final List<Future<ActorPath>> cohortPathFutures;
- private volatile List<ActorPath> cohortPaths;
+ private final List<Future<ActorSelection>> cohortFutures;
+ private volatile List<ActorSelection> cohorts;
private final String transactionId;
public ThreePhaseCommitCohortProxy(ActorContext actorContext,
- List<Future<ActorPath>> cohortPathFutures, String transactionId) {
+ List<Future<ActorSelection>> cohortFutures, String transactionId) {
this.actorContext = actorContext;
- this.cohortPathFutures = cohortPathFutures;
+ this.cohortFutures = cohortFutures;
this.transactionId = transactionId;
}
- private Future<Void> buildCohortPathsList() {
+ private Future<Void> buildCohortList() {
- Future<Iterable<ActorPath>> combinedFutures = Futures.sequence(cohortPathFutures,
+ Future<Iterable<ActorSelection>> combinedFutures = Futures.sequence(cohortFutures,
actorContext.getActorSystem().dispatcher());
- return combinedFutures.transform(new AbstractFunction1<Iterable<ActorPath>, Void>() {
+ return combinedFutures.transform(new AbstractFunction1<Iterable<ActorSelection>, Void>() {
@Override
- public Void apply(Iterable<ActorPath> paths) {
- cohortPaths = Lists.newArrayList(paths);
+ public Void apply(Iterable<ActorSelection> actorSelections) {
+ cohorts = Lists.newArrayList(actorSelections);
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} successfully built cohort path list: {}",
- transactionId, cohortPaths);
+ transactionId, cohorts);
}
return null;
}
// extracted from ReadyTransactionReply messages by the Futures that were obtained earlier
// and passed to us from upstream processing. If any one fails then we'll fail canCommit.
- buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ buildCohortList().onComplete(new OnComplete<Void>() {
@Override
public void onComplete(Throwable failure, Void notUsed) throws Throwable {
if(failure != null) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: a cohort path Future failed: {}", transactionId, failure);
+ LOG.debug("Tx {}: a cohort Future failed: {}", transactionId, failure);
}
returnFuture.setException(failure);
} else {
}
private Future<Iterable<Object>> invokeCohorts(Object message) {
- List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohortPaths.size());
- for(ActorPath actorPath : cohortPaths) {
+ List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
+ for(ActorSelection cohort : cohorts) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, actorPath);
+ LOG.debug("Tx {}: Sending {} to cohort {}", transactionId, message, cohort);
}
- ActorSelection cohort = actorContext.actorSelection(actorPath);
futureList.add(actorContext.executeOperationAsync(cohort, message));
}
// The cohort actor list should already be built at this point by the canCommit phase but,
// if not for some reason, we'll try to build it here.
- if(cohortPaths != null) {
+ if(cohorts != null) {
finishVoidOperation(operationName, message, expectedResponseClass, propagateException,
returnFuture);
} else {
- buildCohortPathsList().onComplete(new OnComplete<Void>() {
+ buildCohortList().onComplete(new OnComplete<Void>() {
@Override
public void onComplete(Throwable failure, Void notUsed) throws Throwable {
if(failure != null) {
}
@VisibleForTesting
- List<Future<ActorPath>> getCohortPathFutures() {
- return Collections.unmodifiableList(cohortPathFutures);
+ List<Future<ActorSelection>> getCohortFutures() {
+ return Collections.unmodifiableList(cohortFutures);
}
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
+import akka.actor.ActorSelection;
import akka.dispatch.Futures;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
public class TransactionChainProxy implements DOMStoreTransactionChain{
private final ActorContext actorContext;
private final String transactionChainId;
- private volatile List<Future<ActorPath>> cohortPathFutures = Collections.emptyList();
+ private volatile List<Future<ActorSelection>> cohortFutures = Collections.emptyList();
public TransactionChainProxy(ActorContext actorContext) {
this.actorContext = actorContext;
return transactionChainId;
}
- public void onTransactionReady(List<Future<ActorPath>> cohortPathFutures){
- this.cohortPathFutures = cohortPathFutures;
+ public void onTransactionReady(List<Future<ActorSelection>> cohortFutures){
+ this.cohortFutures = cohortFutures;
}
public void waitTillCurrentTransactionReady(){
try {
Await.result(Futures
- .sequence(this.cohortPathFutures, actorContext.getActorSystem().dispatcher()),
+ .sequence(this.cohortFutures, actorContext.getActorSystem().dispatcher()),
actorContext.getOperationDuration());
} catch (Exception e) {
throw new IllegalStateException("Failed when waiting for transaction on a chain to become ready", e);
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
}
- List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
}
- cohortPathFutures.add(transactionContext.readyTransaction());
+ cohortFutures.add(transactionContext.readyTransaction());
}
if(transactionChainProxy != null){
- transactionChainProxy.onTransactionReady(cohortPathFutures);
+ transactionChainProxy.onTransactionReady(cohortFutures);
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
identifier.toString());
}
void closeTransaction();
- Future<ActorPath> readyTransaction();
+ Future<ActorSelection> readyTransaction();
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
return actor;
}
- private String getResolvedCohortPath(String cohortPath) {
- return actorContext.resolvePath(actorPath, cohortPath);
- }
-
@Override
public void closeTransaction() {
if(LOG.isDebugEnabled()) {
}
@Override
- public Future<ActorPath> readyTransaction() {
+ public Future<ActorSelection> readyTransaction() {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
// Transform the combined Future into a Future that returns the cohort actor path from
// the ReadyTransactionReply. That's the end result of the ready operation.
- return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+ return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
@Override
- public ActorPath apply(Iterable<Object> notUsed) {
+ public ActorSelection apply(Iterable<Object> notUsed) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
identifier);
if(serializedReadyReply.getClass().equals(
ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
- actorContext.getActorSystem(), serializedReadyReply);
-
- String resolvedCohortPath = getResolvedCohortPath(
- reply.getCohortPath().toString());
+ serializedReadyReply);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
- identifier, resolvedCohortPath);
- }
- return actorContext.actorFor(resolvedCohortPath);
+ return actorContext.actorSelection(reply.getCohortPath());
} else {
// Throwing an exception here will fail the Future.
}
@Override
- public Future<ActorPath> readyTransaction() {
+ public Future<ActorSelection> readyTransaction() {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called", identifier);
}
package org.opendaylight.controller.cluster.datastore.messages;
-import akka.actor.ActorPath;
-import akka.actor.ActorSystem;
import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
public class ReadyTransactionReply implements SerializableMessage {
public static final Class SERIALIZABLE_CLASS = ShardTransactionMessages.ReadyTransactionReply.class;
- private final ActorPath cohortPath;
+ private final String cohortPath;
- public ReadyTransactionReply(ActorPath cohortPath) {
+ public ReadyTransactionReply(String cohortPath) {
this.cohortPath = cohortPath;
}
- public ActorPath getCohortPath() {
+ public String getCohortPath() {
return cohortPath;
}
@Override
public ShardTransactionMessages.ReadyTransactionReply toSerializable() {
return ShardTransactionMessages.ReadyTransactionReply.newBuilder()
- .setActorPath(cohortPath.toString()).build();
+ .setActorPath(cohortPath).build();
}
- public static ReadyTransactionReply fromSerializable(ActorSystem actorSystem,Object serializable){
- ShardTransactionMessages.ReadyTransactionReply o = (ShardTransactionMessages.ReadyTransactionReply) serializable;
- return new ReadyTransactionReply(
- actorSystem.actorFor(o.getActorPath()).path());
+ public static ReadyTransactionReply fromSerializable(Object serializable) {
+ ShardTransactionMessages.ReadyTransactionReply o =
+ (ShardTransactionMessages.ReadyTransactionReply) serializable;
+
+ return new ReadyTransactionReply(o.getActorPath());
+
}
}
actorSystem.shutdown();
}
- /**
- * @deprecated Need to stop using this method. There are ways to send a
- * remote ActorRef as a string which should be used instead of this hack
- *
- * @param primaryPath
- * @param localPathOfRemoteActor
- * @return
- */
- @Deprecated
- public String resolvePath(final String primaryPath,
- final String localPathOfRemoteActor) {
- StringBuilder builder = new StringBuilder();
- String[] primaryPathElements = primaryPath.split("/");
- builder.append(primaryPathElements[0]).append("//")
- .append(primaryPathElements[1]).append(primaryPathElements[2]);
- String[] remotePathElements = localPathOfRemoteActor.split("/");
- for (int i = 3; i < remotePathElements.length; i++) {
- builder.append("/").append(remotePathElements[i]);
- }
-
- return builder.toString();
-
- }
-
- public ActorPath actorFor(String path){
- return actorSystem.actorFor(path).path();
- }
-
public String getCurrentMemberName(){
return clusterWrapper.getCurrentMemberName();
}
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
@Override
protected ActorSelection match(Object in) {
if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
- ActorPath cohortPath =
- ReadyTransactionReply.fromSerializable(getSystem(),in)
+ String cohortPath =
+ ReadyTransactionReply.fromSerializable(in)
.getCohortPath();
- return getSystem()
- .actorSelection(cohortPath);
+ return getSystem().actorSelection(cohortPath);
} else {
throw noMatch();
}
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
-
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.datastore.messages.SerializableMessage;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
-
import scala.concurrent.Future;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
@SuppressWarnings("serial")
doReturn(getSystem()).when(actorContext).getActorSystem();
}
- private Future<ActorPath> newCohortPath() {
+ private Future<ActorSelection> newCohort() {
ActorPath path = getSystem().actorOf(Props.create(DoNothingActor.class)).path();
- doReturn(mock(ActorSelection.class)).when(actorContext).actorSelection(path);
- return Futures.successful(path);
+ ActorSelection actorSelection = getSystem().actorSelection(path);
+ return Futures.successful(actorSelection);
}
private final ThreePhaseCommitCohortProxy setupProxy(int nCohorts) throws Exception {
- List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
for(int i = 1; i <= nCohorts; i++) {
- cohortPathFutures.add(newCohortPath());
+ cohortFutures.add(newCohort());
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
}
private ThreePhaseCommitCohortProxy setupProxyWithFailedCohortPath()
throws Exception {
- List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
- cohortPathFutures.add(newCohortPath());
- cohortPathFutures.add(Futures.<ActorPath>failed(new TestException()));
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
+ cohortFutures.add(newCohort());
+ cohortFutures.add(Futures.<ActorSelection>failed(new TestException()));
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, "txn-1");
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, "txn-1");
}
private void setupMockActorContext(Class<?> requestType, Object... responses) {
package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
+import com.google.common.util.concurrent.CheckedFuture;
+
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.dispatch.Futures;
import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
return argThat(matcher);
}
- private Future<Object> readyTxReply(ActorPath path) {
+ private Future<Object> readyTxReply(String path) {
return Futures.successful((Object)new ReadyTransactionReply(path).toSerializable());
}
doReturn(createTransactionReply(actorRef)).when(mockActorContext).
executeOperation(eq(getSystem().actorSelection(actorRef.path())),
eqCreateTransaction(memberName, type));
-
- doReturn(actorRef.path().toString()).when(mockActorContext).resolvePath(
- anyString(), eq(actorRef.path().toString()));
-
- doReturn(actorRef.path()).when(mockActorContext).actorFor(actorRef.path().toString());
-
return actorRef;
}
DeleteDataReply.SERIALIZABLE_CLASS);
}
- private void verifyCohortPathFutures(ThreePhaseCommitCohortProxy proxy,
- Object... expReplies) throws Exception {
+ private void verifyCohortFutures(ThreePhaseCommitCohortProxy proxy,
+ Object... expReplies) throws Exception {
assertEquals("getReadyOperationFutures size", expReplies.length,
- proxy.getCohortPathFutures().size());
+ proxy.getCohortFutures().size());
int i = 0;
- for( Future<ActorPath> future: proxy.getCohortPathFutures()) {
+ for( Future<ActorSelection> future: proxy.getCohortFutures()) {
assertNotNull("Ready operation Future is null", future);
Object expReply = expReplies[i++];
- if(expReply instanceof ActorPath) {
- ActorPath actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
- assertEquals("Cohort actor path", expReply, actual);
+ if(expReply instanceof ActorSelection) {
+ ActorSelection actual = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
+ assertEquals("Cohort actor path", (ActorSelection) expReply, actual);
} else {
// Expecting exception.
try {
doReturn(writeDataReply()).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
WriteDataReply.SERIALIZABLE_CLASS);
- verifyCohortPathFutures(proxy, actorRef.path());
+ verifyCohortFutures(proxy, getSystem().actorSelection(actorRef.path()));
}
@SuppressWarnings("unchecked")
doReturn(Futures.failed(new TestException())).when(mockActorContext).
executeOperationAsync(eq(actorSelection(actorRef)), eqWriteData(nodeToWrite));
- doReturn(readyTxReply(actorRef.path())).when(mockActorContext).executeOperationAsync(
+ doReturn(readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS));
TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS, TestException.class);
- verifyCohortPathFutures(proxy, TestException.class);
+ verifyCohortFutures(proxy, TestException.class);
}
@SuppressWarnings("unchecked")
verifyRecordingOperationFutures(transactionProxy.getRecordedOperationFutures(),
MergeDataReply.SERIALIZABLE_CLASS);
- verifyCohortPathFutures(proxy, TestException.class);
+ verifyCohortFutures(proxy, TestException.class);
}
@Test
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortPathFutures(proxy, PrimaryNotFoundException.class);
+ verifyCohortFutures(proxy, PrimaryNotFoundException.class);
}
@SuppressWarnings("unchecked")
ThreePhaseCommitCohortProxy proxy = (ThreePhaseCommitCohortProxy) ready;
- verifyCohortPathFutures(proxy, IllegalArgumentException.class);
+ verifyCohortFutures(proxy, IllegalArgumentException.class);
}
@Test
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Creator;
import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
+
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import static org.mockito.Mockito.mock;
public class ActorContextTest extends AbstractActorTest{
- @Test
- public void testResolvePathForRemoteActor(){
- ActorContext actorContext =
- new ActorContext(mock(ActorSystem.class), mock(ActorRef.class),mock(
- ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka.tcp://system@127.0.0.1:2550/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testResolvePathForLocalActor(){
- ActorContext actorContext =
- new ActorContext(getSystem(), mock(ActorRef.class), mock(ClusterWrapper.class),
- mock(Configuration.class));
-
- String actual = actorContext.resolvePath(
- "akka://system/user/shardmanager/shard",
- "akka://system/user/shardmanager/shard/transaction");
-
- String expected = "akka://system/user/shardmanager/shard/transaction";
-
- assertEquals(expected, actual);
-
- System.out.println(actorContext
- .actorFor("akka://system/user/shardmanager/shard/transaction"));
- }
-
private static class MockShardManager extends UntypedActor {