package org.opendaylight.controller.cluster.datastore;
-import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.dispatch.OnComplete;
LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier,
remoteTransactionPaths.size());
}
- List<Future<ActorPath>> cohortPathFutures = Lists.newArrayList();
+ List<Future<ActorSelection>> cohortFutures = Lists.newArrayList();
for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
LOG.debug("Tx {} Readying transaction for shard {}", identifier,
transactionContext.getShardName());
}
- cohortPathFutures.add(transactionContext.readyTransaction());
+ cohortFutures.add(transactionContext.readyTransaction());
}
if(transactionChainProxy != null){
- transactionChainProxy.onTransactionReady(cohortPathFutures);
+ transactionChainProxy.onTransactionReady(cohortFutures);
}
- return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures,
+ return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures,
identifier.toString());
}
void closeTransaction();
- Future<ActorPath> readyTransaction();
+ Future<ActorSelection> readyTransaction();
void writeData(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
return actor;
}
- private String getResolvedCohortPath(String cohortPath) {
- return actorContext.resolvePath(actorPath, cohortPath);
- }
-
@Override
public void closeTransaction() {
if(LOG.isDebugEnabled()) {
}
@Override
- public Future<ActorPath> readyTransaction() {
+ public Future<ActorSelection> readyTransaction() {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending",
identifier, recordedOperationFutures.size());
// Transform the combined Future into a Future that returns the cohort actor path from
// the ReadyTransactionReply. That's the end result of the ready operation.
- return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorPath>() {
+ return combinedFutures.transform(new AbstractFunction1<Iterable<Object>, ActorSelection>() {
@Override
- public ActorPath apply(Iterable<Object> notUsed) {
+ public ActorSelection apply(Iterable<Object> notUsed) {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded",
identifier);
if(serializedReadyReply.getClass().equals(
ReadyTransactionReply.SERIALIZABLE_CLASS)) {
ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable(
- actorContext.getActorSystem(), serializedReadyReply);
-
- String resolvedCohortPath = getResolvedCohortPath(
- reply.getCohortPath().toString());
+ serializedReadyReply);
- if(LOG.isDebugEnabled()) {
- LOG.debug("Tx {} readyTransaction: resolved cohort path {}",
- identifier, resolvedCohortPath);
- }
- return actorContext.actorFor(resolvedCohortPath);
+ return actorContext.actorSelection(reply.getCohortPath());
} else {
// Throwing an exception here will fail the Future.
}
@Override
- public Future<ActorPath> readyTransaction() {
+ public Future<ActorSelection> readyTransaction() {
if(LOG.isDebugEnabled()) {
LOG.debug("Tx {} readyTransaction called", identifier);
}