X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=51d8d5caec18e0c94a22520fb23d1b9708acdbf4;hb=89274c9c31212a1d0f13aeb90384442e72221029;hp=fd55ceeed34c14d47143e6fb863900b64e596916;hpb=204f45f8b3233dbea87e2c8065914f0d2a0ded07;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index fd55ceeed3..51d8d5caec 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -8,19 +8,20 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; -import akka.actor.Status; +import akka.actor.Status.Failure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply; @@ -116,7 +117,7 @@ class ShardCommitCoordinator { " capacity %d has been reached.", name, cohortEntry.getTransactionID(), queueCapacity)); log.error(ex.getMessage()); - sender.tell(new Status.Failure(ex), shard.self()); + sender.tell(new Failure(ex), shard.self()); return false; } } @@ -134,41 +135,22 @@ class ShardCommitCoordinator { ready.getTransactionID(), ready.getTxnClientVersion()); ShardDataTreeCohort cohort = ready.getTransaction().ready(); - CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort); + CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion()); cohortCache.put(ready.getTransactionID(), cohortEntry); if(!queueCohortEntry(cohortEntry, sender, shard)) { return; } - if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) { - // Return our actor path as we'll handle the three phase commit except if the Tx client - // version < Helium-1 version which means the Tx was initiated by a base Helium version node. - // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to - // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior. - ActorRef replyActorPath = shard.self(); - if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) { - log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name); - replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props( - ready.getTransactionID())); - } - - ReadyTransactionReply readyTransactionReply = - new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath), - ready.getTxnClientVersion()); - sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() : - readyTransactionReply, shard.self()); + if(ready.isDoImmediateCommit()) { + cohortEntry.setDoImmediateCommit(true); + cohortEntry.setReplySender(sender); + cohortEntry.setShard(shard); + handleCanCommit(cohortEntry); } else { - if(ready.isDoImmediateCommit()) { - cohortEntry.setDoImmediateCommit(true); - cohortEntry.setReplySender(sender); - cohortEntry.setShard(shard); - handleCanCommit(cohortEntry); - } else { - // The caller does not want immediate commit - the 3-phase commit will be coordinated by the - // front-end so send back a ReadyTransactionReply with our actor path. - sender.tell(readyTransactionReply(shard), shard.self()); - } + // The caller does not want immediate commit - the 3-phase commit will be coordinated by the + // front-end so send back a ReadyTransactionReply with our actor path. + sender.tell(readyTransactionReply(shard), shard.self()); } } @@ -187,7 +169,7 @@ class ShardCommitCoordinator { if(cohortEntry == null) { cohortEntry = new CohortEntry(batched.getTransactionID(), dataTree.newReadWriteTransaction(batched.getTransactionID(), - batched.getTransactionChainID())); + batched.getTransactionChainID()), batched.getVersion()); cohortCache.put(batched.getTransactionID(), cohortEntry); } @@ -245,7 +227,8 @@ class ShardCommitCoordinator { void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) { final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(), message.getTransactionID()); - final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort); + final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, + DataStoreVersions.CURRENT_VERSION); cohortCache.put(message.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -312,7 +295,7 @@ class ShardCommitCoordinator { IllegalStateException ex = new IllegalStateException( String.format("%s: No cohort entry found for transaction %s", name, transactionID)); log.error(ex.getMessage()); - sender.tell(new Status.Failure(ex), shard.self()); + sender.tell(new Failure(ex), shard.self()); return; } @@ -333,13 +316,15 @@ class ShardCommitCoordinator { if(canCommit) { doCommit(cohortEntry); } else { - cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException( + cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException( "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self()); } } else { + // FIXME - use caller's version cohortEntry.getReplySender().tell( - canCommit ? CanCommitTransactionReply.YES.toSerializable() : - CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self()); + canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() : + CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(), + cohortEntry.getShard().self()); } } catch (Exception e) { log.debug("{}: An exception occurred during canCommit", name, e); @@ -349,7 +334,7 @@ class ShardCommitCoordinator { failure = e.getCause(); } - cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self()); + cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); } finally { if(!canCommit) { // Remove the entry from the cache now. @@ -379,7 +364,7 @@ class ShardCommitCoordinator { } catch (Exception e) { log.error("{} An exception occurred while preCommitting transaction {}", name, cohortEntry.getTransactionID(), e); - cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self()); + cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self()); currentTransactionComplete(cohortEntry.getTransactionID(), true); } @@ -406,7 +391,7 @@ class ShardCommitCoordinator { String.format("%s: Cannot commit transaction %s - it is not the current transaction", name, transactionID)); log.error(ex.getMessage()); - sender.tell(new akka.actor.Status.Failure(ex), shard.self()); + sender.tell(new Failure(ex), shard.self()); return false; } @@ -438,13 +423,49 @@ class ShardCommitCoordinator { shard.getShardMBean().incrementAbortTransactionsCount(); if(sender != null) { - sender.tell(new AbortTransactionReply().toSerializable(), self); + sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } } catch (Exception e) { log.error("{}: An exception happened during abort", name, e); if(sender != null) { - sender.tell(new akka.actor.Status.Failure(e), self); + sender.tell(new Failure(e), self); + } + } + } + + void checkForExpiredTransactions(final long timeout, final Shard shard) { + CohortEntry cohortEntry = getCurrentCohortEntry(); + if(cohortEntry != null) { + if(cohortEntry.isExpired(timeout)) { + log.warn("{}: Current transaction {} has timed out after {} ms - aborting", + name, cohortEntry.getTransactionID(), timeout); + + handleAbort(cohortEntry.getTransactionID(), null, shard); + } + } + + cleanupExpiredCohortEntries(); + } + + void abortPendingTransactions(final String reason, final Shard shard) { + if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) { + return; + } + + List cohortEntries = new ArrayList<>(); + + if(currentCohortEntry != null) { + cohortEntries.add(currentCohortEntry); + currentCohortEntry = null; + } + + cohortEntries.addAll(queuedCohortEntries); + queuedCohortEntries.clear(); + + for(CohortEntry cohortEntry: cohortEntries) { + if(cohortEntry.getReplySender() != null) { + cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self()); } } } @@ -457,7 +478,7 @@ class ShardCommitCoordinator { * @return the current CohortEntry or null if the given transaction ID does not match the * current entry. */ - public CohortEntry getCohortEntryIfCurrent(String transactionID) { + CohortEntry getCohortEntryIfCurrent(String transactionID) { if(isCurrentTransaction(transactionID)) { return currentCohortEntry; } @@ -465,15 +486,15 @@ class ShardCommitCoordinator { return null; } - public CohortEntry getCurrentCohortEntry() { + CohortEntry getCurrentCohortEntry() { return currentCohortEntry; } - public CohortEntry getAndRemoveCohortEntry(String transactionID) { + CohortEntry getAndRemoveCohortEntry(String transactionID) { return cohortCache.remove(transactionID); } - public boolean isCurrentTransaction(String transactionID) { + boolean isCurrentTransaction(String transactionID) { return currentCohortEntry != null && currentCohortEntry.getTransactionID().equals(transactionID); } @@ -487,7 +508,7 @@ class ShardCommitCoordinator { * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from * the cache. */ - public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) { + void currentTransactionComplete(String transactionID, boolean removeCohortEntry) { if(removeCohortEntry) { cohortCache.remove(transactionID); } @@ -568,16 +589,19 @@ class ShardCommitCoordinator { private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); private int totalBatchedModificationsReceived; private boolean aborted; + private final short clientVersion; - CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { + CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) { this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; + this.clientVersion = clientVersion; } - CohortEntry(String transactionID, ShardDataTreeCohort cohort) { + CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) { this.transactionID = transactionID; this.cohort = cohort; this.transaction = null; + this.clientVersion = clientVersion; } void updateLastAccessTime() { @@ -589,6 +613,10 @@ class ShardCommitCoordinator { return transactionID; } + short getClientVersion() { + return clientVersion; + } + DataTreeCandidate getCandidate() { return cohort.getCandidate(); }