X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=6da9fe947a0651e5d96981b87a274bde814f902a;hp=c2cc1ac05b5dd0bb51a363b66d27d5398042f401;hb=103ceecd0195cca6c87fbd62a687d8addf128784;hpb=e1010a51ae6637126e23c38302fae288e0d766e1 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c2cc1ac05b..6da9fe947a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -140,7 +140,7 @@ public class Shard extends RaftActor { } commitCoordinator = new ShardCommitCoordinator(store, - TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES), + datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(), datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name); setTransactionCommitTimeout(); @@ -160,7 +160,7 @@ public class Shard extends RaftActor { private void setTransactionCommitTimeout() { transactionCommitTimeout = TimeUnit.MILLISECONDS.convert( - datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS); + datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2; } public static Props props(final ShardIdentifier name, @@ -302,14 +302,15 @@ public class Shard extends RaftActor { private void handleTransactionCommitTimeoutCheck() { CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry(); if(cohortEntry != null) { - long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime(); - if(elapsed > transactionCommitTimeout) { + if(cohortEntry.isExpired(transactionCommitTimeout)) { LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting", persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout); doAbortTransaction(cohortEntry.getTransactionID(), null); } } + + commitCoordinator.cleanupExpiredCohortEntries(); } private static boolean isEmptyCommit(final DataTreeCandidate candidate) { @@ -323,9 +324,9 @@ public class Shard extends RaftActor { // or if cohortEntry has no modifications // we can apply modification to the state immediately if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { - applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate); + applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate); } else { - Shard.this.persistData(getSender(), cohortEntry.getTransactionID(), + Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), DataTreeCandidatePayload.create(candidate)); } }