X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=51d8d5caec18e0c94a22520fb23d1b9708acdbf4;hb=dd16edd5a758f0e51727de511f9868c72b2a1dd0;hp=57c5b1de11fca6ad0496f9c4a3b06eb873048fdf;hpb=9f5e0e3f4d467e95ab6b93c2f3c90c0cbd312f50;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 57c5b1de11..51d8d5caec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; @@ -136,41 +135,22 @@ class ShardCommitCoordinator { ready.getTransactionID(), ready.getTxnClientVersion()); ShardDataTreeCohort cohort = ready.getTransaction().ready(); - CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort); + CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion()); cohortCache.put(ready.getTransactionID(), cohortEntry); if(!queueCohortEntry(cohortEntry, sender, shard)) { return; } - if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) { - // Return our actor path as we'll handle the three phase commit except if the Tx client - // version < Helium-1 version which means the Tx was initiated by a base Helium version node. - // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to - // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior. - ActorRef replyActorPath = shard.self(); - if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { - log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name); - replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( - ready.getTransactionID())); - } - - ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath), - ready.getTxnClientVersion()); - sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : - readyTransactionReply, shard.self()); + if(ready.isDoImmediateCommit()) { + cohortEntry.setDoImmediateCommit(true); + cohortEntry.setReplySender(sender); + cohortEntry.setShard(shard); + handleCanCommit(cohortEntry); } else { - if(ready.isDoImmediateCommit()) { - cohortEntry.setDoImmediateCommit(true); - cohortEntry.setReplySender(sender); - cohortEntry.setShard(shard); - handleCanCommit(cohortEntry); - } else { - // The caller does not want immediate commit - the 3-phase commit will be coordinated by the - // front-end so send back a ReadyTransactionReply with our actor path. - sender.tell(readyTransactionReply(shard), shard.self()); - } + // The caller does not want immediate commit - the 3-phase commit will be coordinated by the + // front-end so send back a ReadyTransactionReply with our actor path. + sender.tell(readyTransactionReply(shard), shard.self()); } } @@ -189,7 +169,7 @@ class ShardCommitCoordinator { if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), dataTree.newReadWriteTransaction(batched.getTransactionID(), - batched.getTransactionChainID())); + batched.getTransactionChainID()), batched.getVersion()); cohortCache.put(batched.getTransactionID(), cohortEntry); } @@ -247,7 +227,8 @@ class ShardCommitCoordinator { void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) { final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(), message.getTransactionID()); - final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort); + final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, + DataStoreVersions.CURRENT_VERSION); cohortCache.put(message.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -339,9 +320,11 @@ class ShardCommitCoordinator { "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self()); } } else { + // FIXME - use caller's version cohortEntry.getReplySender().tell( - canCommit ? CanCommitTransactionReply.YES.toSerializable() : - CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self()); + canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() : + CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(), + cohortEntry.getShard().self()); } } catch (Exception e) { log.debug("{}: An exception occurred during canCommit", name, e); @@ -440,7 +423,7 @@ class ShardCommitCoordinator { shard.getShardMBean().incrementAbortTransactionsCount(); if(sender != null) { - sender.tell(new AbortTransactionReply().toSerializable(), self); + sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } } catch (Exception e) { log.error("{}: An exception happened during abort", name, e); @@ -606,16 +589,19 @@ class ShardCommitCoordinator { private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); private int totalBatchedModificationsReceived; private boolean aborted; + private final short clientVersion; - CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { + CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) { this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; + this.clientVersion = clientVersion; } - CohortEntry(String transactionID, ShardDataTreeCohort cohort) { + CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) { this.transactionID = transactionID; this.cohort = cohort; this.transaction = null; + this.clientVersion = clientVersion; } void updateLastAccessTime() { @@ -627,6 +613,10 @@ class ShardCommitCoordinator { return transactionID; } + short getClientVersion() { + return clientVersion; + } + DataTreeCandidate getCandidate() { return cohort.getCandidate(); }