X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FCohortEntry.java;h=6c0c13b3abfd6dec5ca89b1014f095e4e87c468a;hb=987e2e706d0b343304142626bc870f3e8c909b51;hp=767749af29900a2c298758ec78e54e26768b8dc7;hpb=a47dd7a5d21ca68804a6d0e2e3ca765f223c2ef4;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java index 767749af29..6c0c13b3ab 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java @@ -7,12 +7,15 @@ */ package org.opendaylight.controller.cluster.datastore; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; -import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; +import java.util.List; +import java.util.Optional; +import java.util.SortedSet; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -21,25 +24,27 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification final class CohortEntry { private final ReadWriteShardDataTreeTransaction transaction; - private final TransactionIdentifier transactionID; + private final TransactionIdentifier transactionId; private final short clientVersion; private RuntimeException lastBatchedModificationsException; private int totalBatchedModificationsReceived; + private int totalOperationsProcessed; private ShardDataTreeCohort cohort; private boolean doImmediateCommit; private ActorRef replySender; private Shard shard; private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) { - this.transaction = Preconditions.checkNotNull(transaction); - this.transactionID = transaction.getId(); + this.cohort = null; + this.transaction = requireNonNull(transaction); + this.transactionId = transaction.getIdentifier(); this.clientVersion = clientVersion; } private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) { - this.cohort = Preconditions.checkNotNull(cohort); - this.transactionID = cohort.getIdentifier(); + this.cohort = requireNonNull(cohort); + this.transactionId = cohort.getIdentifier(); this.transaction = null; this.clientVersion = clientVersion; } @@ -52,8 +57,8 @@ final class CohortEntry { return new CohortEntry(cohort, clientVersion); } - TransactionIdentifier getTransactionID() { - return transactionID; + TransactionIdentifier getTransactionId() { + return transactionId; } short getClientVersion() { @@ -76,20 +81,26 @@ final class CohortEntry { return totalBatchedModificationsReceived; } + int getTotalOperationsProcessed() { + return totalOperationsProcessed; + } + RuntimeException getLastBatchedModificationsException() { return lastBatchedModificationsException; } - void applyModifications(final Iterable modifications) { + @SuppressWarnings("checkstyle:IllegalCatch") + void applyModifications(final List modifications) { totalBatchedModificationsReceived++; - if(lastBatchedModificationsException == null) { + if (lastBatchedModificationsException == null) { + totalOperationsProcessed += modifications.size(); for (Modification modification : modifications) { - try { - modification.apply(transaction.getSnapshot()); - } catch (RuntimeException e) { - lastBatchedModificationsException = e; - throw e; - } + try { + modification.apply(transaction.getSnapshot()); + } catch (RuntimeException e) { + lastBatchedModificationsException = e; + throw e; + } } } } @@ -106,21 +117,29 @@ final class CohortEntry { cohort.commit(callback); } - void abort() throws InterruptedException, ExecutionException, TimeoutException { - cohort.abort().get(); + void abort(final FutureCallback callback) { + cohort.abort(callback); } - void ready(final CohortDecorator cohortDecorator) { - Preconditions.checkState(cohort == null, "cohort was already set"); + void ready(final Optional> participatingShardNames, final CohortDecorator cohortDecorator) { + checkState(cohort == null, "cohort was already set"); - cohort = transaction.ready(); + cohort = transaction.ready(participatingShardNames); - if(cohortDecorator != null) { + if (cohortDecorator != null) { // Call the hook for unit tests. - cohort = cohortDecorator.decorate(transactionID, cohort); + cohort = cohortDecorator.decorate(transactionId, cohort); } } + boolean isSealed() { + return cohort != null; + } + + Optional> getParticipatingShardNames() { + return cohort != null ? cohort.getParticipatingShardNames() : Optional.empty(); + } + boolean isDoImmediateCommit() { return doImmediateCommit; } @@ -148,8 +167,8 @@ final class CohortEntry { @Override public String toString() { final StringBuilder builder = new StringBuilder(); - builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=") + builder.append("CohortEntry [transactionId=").append(transactionId).append(", doImmediateCommit=") .append(doImmediateCommit).append("]"); return builder.toString(); } -} \ No newline at end of file +}