X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=7165581d8103fc64148e745e477bc1ec7f2278d6;hb=refs%2Fchanges%2F87%2F38587%2F3;hp=9cb015cfaf35796bdf98692d8d98b9b67dd86a13;hpb=f276ae33b951d173b51c467bb7bb1a5f5cf9a1e6;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 9cb015cfaf..7165581d81 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -27,7 +27,6 @@ import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; import org.opendaylight.controller.cluster.common.actor.MeteringBehavior; -import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry; import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier; @@ -82,6 +81,7 @@ import scala.concurrent.duration.FiniteDuration; *

*/ public class Shard extends RaftActor { + @VisibleForTesting static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() { @Override @@ -269,6 +269,9 @@ public class Shard extends RaftActor { context().parent().forward(message, context()); } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { messageRetrySupport.onTimerMessage(message); + } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { + commitCoordinator.processCohortRegistryCommand(getSender(), + (DataTreeCohortActorRegistry.CohortRegistryCommand) message); } else { super.handleNonRaftCommand(message); } @@ -334,9 +337,9 @@ public class Shard extends RaftActor { private void handleCommitTransaction(final CommitTransaction commit) { if (isLeader()) { - if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { - shardMBean.incrementFailedTransactionsCount(); - } + if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { + shardMBean.incrementFailedTransactionsCount(); + } } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -354,7 +357,7 @@ public class Shard extends RaftActor { try { try { - cohortEntry.commit(); + cohortEntry.commit(); } catch(ExecutionException e) { // We may get a "store tree and candidate base differ" IllegalStateException from commit under // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last @@ -432,7 +435,7 @@ public class Shard extends RaftActor { LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); + commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -447,7 +450,7 @@ public class Shard extends RaftActor { protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) { try { - commitCoordinator.handleBatchedModifications(batched, sender, this); + commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext()); } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionID(), e); @@ -516,7 +519,7 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { try { - commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); + commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext()); } catch (Exception e) { LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionID(), e); @@ -540,7 +543,8 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { - commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); + commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this, + store.getSchemaContext()); } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) {