Bug 1435: CDS: Added support for custom commit cohort.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 9cb015cfaf35796bdf98692d8d98b9b67dd86a13..23ad747d4cf3d4c7247271c09f5c64fa9fc44c2e 100644 (file)
@@ -82,6 +82,7 @@ import scala.concurrent.duration.FiniteDuration;
  * </p>
  */
 public class Shard extends RaftActor {
+
     @VisibleForTesting
     static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
         @Override
@@ -269,6 +270,9 @@ public class Shard extends RaftActor {
                 context().parent().forward(message, context());
             } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
                 messageRetrySupport.onTimerMessage(message);
+            } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+                commitCoordinator.processCohortRegistryCommand(getSender(),
+                        (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
             } else {
                 super.handleNonRaftCommand(message);
             }
@@ -334,9 +338,9 @@ public class Shard extends RaftActor {
 
     private void handleCommitTransaction(final CommitTransaction commit) {
         if (isLeader()) {
-            if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-                shardMBean.incrementFailedTransactionsCount();
-            }
+        if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+            shardMBean.incrementFailedTransactionsCount();
+        }
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -354,7 +358,7 @@ public class Shard extends RaftActor {
 
         try {
             try {
-                cohortEntry.commit();
+            cohortEntry.commit();
             } catch(ExecutionException e) {
                 // We may get a "store tree and candidate base differ" IllegalStateException from commit under
                 // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
@@ -432,7 +436,7 @@ public class Shard extends RaftActor {
         LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
 
         if (isLeader()) {
-            commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+        commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
         } else {
             ActorSelection leader = getLeader();
             if (leader == null) {
@@ -447,7 +451,7 @@ public class Shard extends RaftActor {
 
     protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
         try {
-            commitCoordinator.handleBatchedModifications(batched, sender, this);
+            commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
         } catch (Exception e) {
             LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
                     batched.getTransactionID(), e);
@@ -516,7 +520,7 @@ public class Shard extends RaftActor {
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
             try {
-                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
             } catch (Exception e) {
                 LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                         message.getTransactionID(), e);
@@ -540,7 +544,8 @@ public class Shard extends RaftActor {
 
         boolean isLeaderActive = isLeaderActive();
         if (isLeader() && isLeaderActive) {
-            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+                    store.getSchemaContext());
         } else {
             ActorSelection leader = getLeader();
             if (!isLeaderActive || leader == null) {