X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FTransactionProxy.java;h=ec198510d3586f88dee40d04f9294a3fb370a9ae;hb=c66e710cd61119c4779784571e59cdc3b490673f;hp=6cf16b44268c6c16e26e0658632f61994ee33971;hpb=7225f60c394a26143f8421b0f99f2585699fa306;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java index 6cf16b4426..ec198510d3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; -import akka.actor.ActorPath; import akka.actor.ActorSelection; import akka.dispatch.OnComplete; @@ -22,6 +21,7 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException; import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; @@ -156,7 +156,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(remoteTransactionActorsMB.get()) { for(ActorSelection actor : remoteTransactionActors) { LOG.trace("Sending CloseTransaction to {}", actor); - actorContext.sendRemoteOperationAsync(actor, + actorContext.sendOperationAsync(actor, new CloseTransaction().toSerializable()); } } @@ -314,7 +314,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Trying to get {} transactions ready for commit", identifier, remoteTransactionPaths.size()); } - List> cohortPathFutures = Lists.newArrayList(); + List> cohortFutures = Lists.newArrayList(); for(TransactionContext transactionContext : remoteTransactionPaths.values()) { @@ -322,14 +322,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { LOG.debug("Tx {} Readying transaction for shard {}", identifier, transactionContext.getShardName()); } - cohortPathFutures.add(transactionContext.readyTransaction()); + cohortFutures.add(transactionContext.readyTransaction()); } if(transactionChainProxy != null){ - transactionChainProxy.onTransactionReady(cohortPathFutures); + transactionChainProxy.onTransactionReady(cohortFutures); } - return new ThreePhaseCommitCohortProxy(actorContext, cohortPathFutures, + return new ThreePhaseCommitCohortProxy(actorContext, cohortFutures, identifier.toString()); } @@ -379,9 +379,14 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } try { - Object response = actorContext.executeShardOperation(shardName, - new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), - getTransactionChainId()).toSerializable()); + Optional primaryShard = actorContext.findPrimaryShard(shardName); + if (!primaryShard.isPresent()) { + throw new PrimaryNotFoundException("Primary could not be found for shard " + shardName); + } + + Object response = actorContext.executeOperation(primaryShard.get(), + new CreateTransaction(identifier.toString(), this.transactionType.ordinal(), + getTransactionChainId()).toSerializable()); if (response.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) { CreateTransactionReply reply = CreateTransactionReply.fromSerializable(response); @@ -433,7 +438,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { void closeTransaction(); - Future readyTransaction(); + Future readyTransaction(); void writeData(YangInstanceIdentifier path, NormalizedNode data); @@ -493,27 +498,23 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { return actor; } - private String getResolvedCohortPath(String cohortPath) { - return actorContext.resolvePath(actorPath, cohortPath); - } - @Override public void closeTransaction() { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} closeTransaction called", identifier); } - actorContext.sendRemoteOperationAsync(getActor(), new CloseTransaction().toSerializable()); + actorContext.sendOperationAsync(getActor(), new CloseTransaction().toSerializable()); } @Override - public Future readyTransaction() { + public Future readyTransaction() { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} readyTransaction called with {} previous recorded operations pending", identifier, recordedOperationFutures.size()); } // Send the ReadyTransaction message to the Tx actor. - final Future replyFuture = actorContext.executeRemoteOperationAsync(getActor(), + final Future replyFuture = actorContext.executeOperationAsync(getActor(), new ReadyTransaction().toSerializable()); // Combine all the previously recorded put/merge/delete operation reply Futures and the @@ -532,9 +533,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { // Transform the combined Future into a Future that returns the cohort actor path from // the ReadyTransactionReply. That's the end result of the ready operation. - return combinedFutures.transform(new AbstractFunction1, ActorPath>() { + return combinedFutures.transform(new AbstractFunction1, ActorSelection>() { @Override - public ActorPath apply(Iterable notUsed) { + public ActorSelection apply(Iterable notUsed) { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} readyTransaction: pending recorded operations succeeded", identifier); @@ -551,16 +552,9 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(serializedReadyReply.getClass().equals( ReadyTransactionReply.SERIALIZABLE_CLASS)) { ReadyTransactionReply reply = ReadyTransactionReply.fromSerializable( - actorContext.getActorSystem(), serializedReadyReply); + serializedReadyReply); - String resolvedCohortPath = getResolvedCohortPath( - reply.getCohortPath().toString()); - - if(LOG.isDebugEnabled()) { - LOG.debug("Tx {} readyTransaction: resolved cohort path {}", - identifier, resolvedCohortPath); - } - return actorContext.actorFor(resolvedCohortPath); + return actorContext.actorSelection(reply.getCohortPath()); } else { // Throwing an exception here will fail the Future. @@ -576,8 +570,8 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} deleteData called path = {}", identifier, path); } - recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), - new DeleteData(path).toSerializable() )); + recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), + new DeleteData(path).toSerializable())); } @Override @@ -585,7 +579,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} mergeData called path = {}", identifier, path); } - recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), + recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), new MergeData(path, data, schemaContext).toSerializable())); } @@ -594,7 +588,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} writeData called path = {}", identifier, path); } - recordedOperationFutures.add(actorContext.executeRemoteOperationAsync(getActor(), + recordedOperationFutures.add(actorContext.executeOperationAsync(getActor(), new WriteData(path, data, schemaContext).toSerializable())); } @@ -686,7 +680,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } }; - Future readFuture = actorContext.executeRemoteOperationAsync(getActor(), + Future readFuture = actorContext.executeOperationAsync(getActor(), new ReadData(path).toSerializable()); readFuture.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } @@ -773,7 +767,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } }; - Future future = actorContext.executeRemoteOperationAsync(getActor(), + Future future = actorContext.executeOperationAsync(getActor(), new DataExists(path).toSerializable()); future.onComplete(onComplete, actorContext.getActorSystem().dispatcher()); } @@ -799,7 +793,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction { } @Override - public Future readyTransaction() { + public Future readyTransaction() { if(LOG.isDebugEnabled()) { LOG.debug("Tx {} readyTransaction called", identifier); }