*/
package org.opendaylight.controller.cluster.datastore;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
-import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.util.List;
import java.util.Optional;
import java.util.SortedSet;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
final class CohortEntry {
private final ReadWriteShardDataTreeTransaction transaction;
private RuntimeException lastBatchedModificationsException;
private int totalBatchedModificationsReceived;
+ private int totalOperationsProcessed;
private ShardDataTreeCohort cohort;
private boolean doImmediateCommit;
private ActorRef replySender;
private Shard shard;
private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
- this.transaction = Preconditions.checkNotNull(transaction);
- this.transactionId = transaction.getIdentifier();
+ cohort = null;
+ this.transaction = requireNonNull(transaction);
+ transactionId = transaction.getIdentifier();
this.clientVersion = clientVersion;
}
private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) {
- this.cohort = Preconditions.checkNotNull(cohort);
- this.transactionId = cohort.getIdentifier();
- this.transaction = null;
+ this.cohort = requireNonNull(cohort);
+ transactionId = cohort.getIdentifier();
+ transaction = null;
this.clientVersion = clientVersion;
}
return totalBatchedModificationsReceived;
}
+ int getTotalOperationsProcessed() {
+ return totalOperationsProcessed;
+ }
+
RuntimeException getLastBatchedModificationsException() {
return lastBatchedModificationsException;
}
@SuppressWarnings("checkstyle:IllegalCatch")
- void applyModifications(final Iterable<Modification> modifications) {
+ @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Re-thrown")
+ void applyModifications(final List<Modification> modifications) {
totalBatchedModificationsReceived++;
if (lastBatchedModificationsException == null) {
+ totalOperationsProcessed += modifications.size();
for (Modification modification : modifications) {
try {
modification.apply(transaction.getSnapshot());
}
}
- void canCommit(final FutureCallback<Void> callback) {
+ void canCommit(final FutureCallback<Empty> callback) {
cohort.canCommit(callback);
}
cohort.commit(callback);
}
- void abort(final FutureCallback<Void> callback) {
+ void abort(final FutureCallback<Empty> callback) {
cohort.abort(callback);
}
void ready(final Optional<SortedSet<String>> participatingShardNames, final CohortDecorator cohortDecorator) {
- Preconditions.checkState(cohort == null, "cohort was already set");
+ checkState(cohort == null, "cohort was already set");
cohort = transaction.ready(participatingShardNames);
}
}
+ boolean isSealed() {
+ return cohort != null;
+ }
+
Optional<SortedSet<String>> getParticipatingShardNames() {
return cohort != null ? cohort.getParticipatingShardNames() : Optional.empty();
}