X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FCohortEntry.java;h=80ba952abb926fff45d9864bf521d59925aea911;hb=a6ffbcf79601fb8dc03e90300711acb63ed674b6;hp=84535b593eebb5da9b4df5f7353336ed5b287cd2;hpb=057b787289f7b909d7013c22ac73a1c91c860af8;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java index 84535b593e..80ba952abb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java @@ -11,8 +11,9 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.List; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -26,14 +27,16 @@ final class CohortEntry { private RuntimeException lastBatchedModificationsException; private int totalBatchedModificationsReceived; + private int totalOperationsProcessed; private ShardDataTreeCohort cohort; private boolean doImmediateCommit; private ActorRef replySender; private Shard shard; private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) { + this.cohort = null; this.transaction = Preconditions.checkNotNull(transaction); - this.transactionId = transaction.getId(); + this.transactionId = transaction.getIdentifier(); this.clientVersion = clientVersion; } @@ -76,14 +79,19 @@ final class CohortEntry { return totalBatchedModificationsReceived; } + int getTotalOperationsProcessed() { + return totalOperationsProcessed; + } + RuntimeException getLastBatchedModificationsException() { return lastBatchedModificationsException; } @SuppressWarnings("checkstyle:IllegalCatch") - void applyModifications(final Iterable modifications) { + void applyModifications(final List modifications) { totalBatchedModificationsReceived++; if (lastBatchedModificationsException == null) { + totalOperationsProcessed += modifications.size(); for (Modification modification : modifications) { try { modification.apply(transaction.getSnapshot()); @@ -107,14 +115,14 @@ final class CohortEntry { cohort.commit(callback); } - void abort() throws InterruptedException, ExecutionException, TimeoutException { - cohort.abort().get(); + void abort(final FutureCallback callback) { + cohort.abort(callback); } - void ready(final CohortDecorator cohortDecorator) { + void ready(final Optional> participatingShardNames, final CohortDecorator cohortDecorator) { Preconditions.checkState(cohort == null, "cohort was already set"); - cohort = transaction.ready(); + cohort = transaction.ready(participatingShardNames); if (cohortDecorator != null) { // Call the hook for unit tests. @@ -122,6 +130,14 @@ final class CohortEntry { } } + boolean isSealed() { + return cohort != null; + } + + Optional> getParticipatingShardNames() { + return cohort != null ? cohort.getParticipatingShardNames() : Optional.empty(); + } + boolean isDoImmediateCommit() { return doImmediateCommit; }