Do not allow overrides of onReceive{Command,Recover}
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 38bf34fd3be258c19589b1aadf305b1367f48870..76131e257b893a1671f74f01b1e508e2689ca731 100644 (file)
@@ -14,6 +14,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -30,6 +32,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.slf4j.Logger;
@@ -135,7 +138,7 @@ class ShardCommitCoordinator {
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
         ShardDataTreeCohort cohort = ready.getTransaction().ready();
-        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
+        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
         cohortCache.put(ready.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -169,7 +172,7 @@ class ShardCommitCoordinator {
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
-                        batched.getTransactionChainID()));
+                        batched.getTransactionChainID()), batched.getVersion());
             cohortCache.put(batched.getTransactionID(), cohortEntry);
         }
 
@@ -227,7 +230,8 @@ class ShardCommitCoordinator {
     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
                 message.getTransactionID());
-        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
+                DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
@@ -246,6 +250,36 @@ class ShardCommitCoordinator {
         }
     }
 
+    Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
+            final int maxModificationsPerBatch) {
+        CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+        if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+            return Collections.singletonList(from);
+        }
+
+        cohortEntry.applyModifications(from.getModifications());
+
+        final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+        cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
+            @Override
+            protected BatchedModifications getModifications() {
+                if(newModifications.isEmpty() ||
+                        newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+                    newModifications.add(new BatchedModifications(from.getTransactionID(),
+                            from.getVersion(), from.getTransactionChainID()));
+                }
+
+                return newModifications.getLast();
+            }
+        });
+
+        BatchedModifications last = newModifications.getLast();
+        last.setDoCommitOnReady(from.isDoCommitOnReady());
+        last.setReady(from.isReady());
+        last.setTotalMessagesSent(newModifications.size());
+        return newModifications;
+    }
+
     private void handleCanCommit(CohortEntry cohortEntry) {
         String transactionID = cohortEntry.getTransactionID();
 
@@ -319,9 +353,11 @@ class ShardCommitCoordinator {
                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
                 }
             } else {
+                // FIXME - use caller's version
                 cohortEntry.getReplySender().tell(
-                        canCommit ? CanCommitTransactionReply.YES.toSerializable() :
-                            CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
+                        canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
+                            CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
+                        cohortEntry.getShard().self());
             }
         } catch (Exception e) {
             log.debug("{}: An exception occurred during canCommit", name, e);
@@ -420,7 +456,7 @@ class ShardCommitCoordinator {
             shard.getShardMBean().incrementAbortTransactionsCount();
 
             if(sender != null) {
-                sender.tell(new AbortTransactionReply().toSerializable(), self);
+                sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
             }
         } catch (Exception e) {
             log.error("{}: An exception happened during abort", name, e);
@@ -586,16 +622,19 @@ class ShardCommitCoordinator {
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
         private int totalBatchedModificationsReceived;
         private boolean aborted;
+        private final short clientVersion;
 
-        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
+        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
             this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
+            this.clientVersion = clientVersion;
         }
 
-        CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
+        CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
             this.transactionID = transactionID;
             this.cohort = cohort;
             this.transaction = null;
+            this.clientVersion = clientVersion;
         }
 
         void updateLastAccessTime() {
@@ -607,10 +646,18 @@ class ShardCommitCoordinator {
             return transactionID;
         }
 
+        short getClientVersion() {
+            return clientVersion;
+        }
+
         DataTreeCandidate getCandidate() {
             return cohort.getCandidate();
         }
 
+        ReadWriteShardDataTreeTransaction getTransaction() {
+            return transaction;
+        }
+
         int getTotalBatchedModificationsReceived() {
             return totalBatchedModificationsReceived;
         }