txCommitTimeoutCheckSchedule.cancel();
}
+ commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
+
shardMBean.unregisterMBean();
}
setPeerAddress(resolved.getPeerId().toString(),
resolved.getPeerAddress());
} else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
- handleTransactionCommitTimeoutCheck();
+ commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
} else if(message instanceof RegisterRoleChangeListener){
updateConfigParams(datastoreContext.getShardRaftConfig());
}
- private void handleTransactionCommitTimeoutCheck() {
- CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
- if(cohortEntry != null) {
- if(cohortEntry.isExpired(transactionCommitTimeout)) {
- LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
- persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
-
- doAbortTransaction(cohortEntry.getTransactionID(), null);
- }
- }
-
- commitCoordinator.cleanupExpiredCohortEntries();
- }
-
private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
}
}
store.closeAllTransactionChains();
+
+ commitCoordinator.abortPendingTransactions(
+ "The transacton was aborted due to inflight leadership change.", this);
}
if(hasLeader && !isIsolatedLeader()) {
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorRef;
-import akka.actor.Status;
+import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
" capacity %d has been reached.",
name, cohortEntry.getTransactionID(), queueCapacity));
log.error(ex.getMessage());
- sender.tell(new Status.Failure(ex), shard.self());
+ sender.tell(new Failure(ex), shard.self());
return false;
}
}
IllegalStateException ex = new IllegalStateException(
String.format("%s: No cohort entry found for transaction %s", name, transactionID));
log.error(ex.getMessage());
- sender.tell(new Status.Failure(ex), shard.self());
+ sender.tell(new Failure(ex), shard.self());
return;
}
if(canCommit) {
doCommit(cohortEntry);
} else {
- cohortEntry.getReplySender().tell(new Status.Failure(new TransactionCommitFailedException(
+ cohortEntry.getReplySender().tell(new Failure(new TransactionCommitFailedException(
"Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
}
} else {
failure = e.getCause();
}
- cohortEntry.getReplySender().tell(new Status.Failure(failure), cohortEntry.getShard().self());
+ cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
} finally {
if(!canCommit) {
// Remove the entry from the cache now.
} catch (Exception e) {
log.error("{} An exception occurred while preCommitting transaction {}",
name, cohortEntry.getTransactionID(), e);
- cohortEntry.getReplySender().tell(new akka.actor.Status.Failure(e), cohortEntry.getShard().self());
+ cohortEntry.getReplySender().tell(new Failure(e), cohortEntry.getShard().self());
currentTransactionComplete(cohortEntry.getTransactionID(), true);
}
String.format("%s: Cannot commit transaction %s - it is not the current transaction",
name, transactionID));
log.error(ex.getMessage());
- sender.tell(new akka.actor.Status.Failure(ex), shard.self());
+ sender.tell(new Failure(ex), shard.self());
return false;
}
log.error("{}: An exception happened during abort", name, e);
if(sender != null) {
- sender.tell(new akka.actor.Status.Failure(e), self);
+ sender.tell(new Failure(e), self);
+ }
+ }
+ }
+
+ void checkForExpiredTransactions(final long timeout, final Shard shard) {
+ CohortEntry cohortEntry = getCurrentCohortEntry();
+ if(cohortEntry != null) {
+ if(cohortEntry.isExpired(timeout)) {
+ log.warn("{}: Current transaction {} has timed out after {} ms - aborting",
+ name, cohortEntry.getTransactionID(), timeout);
+
+ handleAbort(cohortEntry.getTransactionID(), null, shard);
+ }
+ }
+
+ cleanupExpiredCohortEntries();
+ }
+
+ void abortPendingTransactions(final String reason, final Shard shard) {
+ if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+ return;
+ }
+
+ List<CohortEntry> cohortEntries = new ArrayList<>();
+
+ if(currentCohortEntry != null) {
+ cohortEntries.add(currentCohortEntry);
+ currentCohortEntry = null;
+ }
+
+ cohortEntries.addAll(queuedCohortEntries);
+ queuedCohortEntries.clear();
+
+ for(CohortEntry cohortEntry: cohortEntries) {
+ if(cohortEntry.getReplySender() != null) {
+ cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
}
}
}
* @return the current CohortEntry or null if the given transaction ID does not match the
* current entry.
*/
- public CohortEntry getCohortEntryIfCurrent(String transactionID) {
+ CohortEntry getCohortEntryIfCurrent(String transactionID) {
if(isCurrentTransaction(transactionID)) {
return currentCohortEntry;
}
return null;
}
- public CohortEntry getCurrentCohortEntry() {
+ CohortEntry getCurrentCohortEntry() {
return currentCohortEntry;
}
- public CohortEntry getAndRemoveCohortEntry(String transactionID) {
+ CohortEntry getAndRemoveCohortEntry(String transactionID) {
return cohortCache.remove(transactionID);
}
- public boolean isCurrentTransaction(String transactionID) {
+ boolean isCurrentTransaction(String transactionID) {
return currentCohortEntry != null &&
currentCohortEntry.getTransactionID().equals(transactionID);
}
* @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
* the cache.
*/
- public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+ void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
if(removeCohortEntry) {
cohortCache.remove(transactionID);
}