import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.slf4j.Logger;
}
}
+ Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
+ final int maxModificationsPerBatch) {
+ CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+ if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+ return Collections.singletonList(from);
+ }
+
+ cohortEntry.applyModifications(from.getModifications());
+
+ final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+ cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
+ @Override
+ protected BatchedModifications getModifications() {
+ if(newModifications.isEmpty() ||
+ newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+ newModifications.add(new BatchedModifications(from.getTransactionID(),
+ from.getVersion(), from.getTransactionChainID()));
+ }
+
+ return newModifications.getLast();
+ }
+ });
+
+ BatchedModifications last = newModifications.getLast();
+ last.setDoCommitOnReady(from.isDoCommitOnReady());
+ last.setReady(from.isReady());
+ last.setTotalMessagesSent(newModifications.size());
+ return newModifications;
+ }
+
private void handleCanCommit(CohortEntry cohortEntry) {
String transactionID = cohortEntry.getTransactionID();
return cohort.getCandidate();
}
+ ReadWriteShardDataTreeTransaction getTransaction() {
+ return transaction;
+ }
+
int getTotalBatchedModificationsReceived() {
return totalBatchedModificationsReceived;
}