X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=5ea9b30c63e7d3f9b44e9800eddb2b5e42f08657;hp=7d67e0856fe69da8721cd7173c63ec79d294a83e;hb=f5a373c5378af41f62a2c36ced4046fbdb77e00b;hpb=139937c2e646894af6a9b2b8a8a1047c6ef82485 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7d67e0856f..5ea9b30c63 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -31,6 +31,7 @@ import org.opendaylight.controller.cluster.DataPersistenceProvider; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; +import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; @@ -374,7 +375,8 @@ public class Shard extends RaftActor { } private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) { - LOG.debug("Readying transaction {}", ready.getTransactionID()); + LOG.debug("Readying transaction {}, client version {}", ready.getTransactionID(), + ready.getTxnClientVersion()); // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the // commitCoordinator in preparation for the subsequent three phase commit initiated by @@ -382,12 +384,22 @@ public class Shard extends RaftActor { commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(), ready.getModification()); - // Return our actor path as we'll handle the three phase commit. - ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(self())); - getSender().tell( - ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : readyTransactionReply, - getSelf()); + // Return our actor path as we'll handle the three phase commit, except if the Tx client + // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version + // node. In that case, the subsequent 3-phase commit messages won't contain the + // transactionId so to maintain backwards compatibility, we create a separate cohort actor + // to provide the compatible behavior. + ActorRef replyActorPath = self(); + if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) { + LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort"); + replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( + ready.getTransactionID())); + } + + ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply( + Serialization.serializedActorPath(replyActorPath)); + getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : + readyTransactionReply, getSelf()); } private void handleAbortTransaction(AbortTransaction abort) { @@ -465,10 +477,8 @@ public class Shard extends RaftActor { } } - private ActorRef createTypedTransactionActor( - int transactionType, - ShardTransactionIdentifier transactionId, - String transactionChainId ) { + private ActorRef createTypedTransactionActor(int transactionType, + ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) { DOMStoreTransactionFactory factory = store; @@ -492,7 +502,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(), schemaContext,datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) { @@ -501,7 +512,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) { @@ -511,7 +523,8 @@ public class Shard extends RaftActor { return getContext().actorOf( ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(), schemaContext, datastoreContext, shardMBean, - transactionId.getRemoteTransactionId()), transactionId.toString()); + transactionId.getRemoteTransactionId(), clientVersion), + transactionId.toString()); } else { throw new IllegalArgumentException( "Shard="+name + ":CreateTransaction message has unidentified transaction type=" @@ -521,10 +534,12 @@ public class Shard extends RaftActor { private void createTransaction(CreateTransaction createTransaction) { createTransaction(createTransaction.getTransactionType(), - createTransaction.getTransactionId(), createTransaction.getTransactionChainId()); + createTransaction.getTransactionId(), createTransaction.getTransactionChainId(), + createTransaction.getVersion()); } - private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) { + private ActorRef createTransaction(int transactionType, String remoteTransactionId, + String transactionChainId, int clientVersion) { ShardTransactionIdentifier transactionId = ShardTransactionIdentifier.builder() @@ -533,8 +548,8 @@ public class Shard extends RaftActor { if(LOG.isDebugEnabled()) { LOG.debug("Creating transaction : {} ", transactionId); } - ActorRef transactionActor = - createTypedTransactionActor(transactionType, transactionId, transactionChainId); + ActorRef transactionActor = createTypedTransactionActor(transactionType, transactionId, + transactionChainId, clientVersion); getSender() .tell(new CreateTransactionReply( @@ -599,7 +614,7 @@ public class Shard extends RaftActor { LOG.debug("registerDataChangeListener sending reply, listenerRegistrationPath = {} ", listenerRegistration.path()); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()),getSelf()); + getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); } private ListenerRegistration