X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=23ad747d4cf3d4c7247271c09f5c64fa9fc44c2e;hb=d4fa6758d6b94aad894854c0fe6fcd82e7bbefd6;hp=ccd0b85c36d94e4586ff0df9bdd7dfc596da24d7;hpb=e836005e1d0674e4d260b33d91511cd97d36928c;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index ccd0b85c36..23ad747d4c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -83,10 +83,21 @@ import scala.concurrent.duration.FiniteDuration; */ public class Shard extends RaftActor { - protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; + @VisibleForTesting + static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() { + @Override + public String toString() { + return "txCommitTimeoutCheck"; + } + }; @VisibleForTesting - static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage"; + static final Object GET_SHARD_MBEAN_MESSAGE = new Object() { + @Override + public String toString() { + return "getShardMBeanMessage"; + } + }; // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant. public static final String DEFAULT_NAME = "default"; @@ -208,16 +219,14 @@ public class Shard extends RaftActor { } @Override - protected void handleCommand(final Object message) { - - final MessageTracker.Context context = appendEntriesReplyTracker.received(message); - final Optional maybeError = context.error(); - if (maybeError.isPresent()) { - LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(), - maybeError.get()); - } + protected void handleNonRaftCommand(final Object message) { + try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { + final Optional maybeError = context.error(); + if (maybeError.isPresent()) { + LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(), + maybeError.get()); + } - try { if (CreateTransaction.isSerializedType(message)) { handleCreateTransaction(message); } else if (message instanceof BatchedModifications) { @@ -261,11 +270,12 @@ public class Shard extends RaftActor { context().parent().forward(message, context()); } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { messageRetrySupport.onTimerMessage(message); + } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) { + commitCoordinator.processCohortRegistryCommand(getSender(), + (DataTreeCohortActorRegistry.CohortRegistryCommand) message); } else { - super.handleCommand(message); + super.handleNonRaftCommand(message); } - } finally { - context.done(); } } @@ -328,9 +338,9 @@ public class Shard extends RaftActor { private void handleCommitTransaction(final CommitTransaction commit) { if (isLeader()) { - if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { - shardMBean.incrementFailedTransactionsCount(); - } + if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) { + shardMBean.incrementFailedTransactionsCount(); + } } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -348,7 +358,7 @@ public class Shard extends RaftActor { try { try { - cohortEntry.commit(); + cohortEntry.commit(); } catch(ExecutionException e) { // We may get a "store tree and candidate base differ" IllegalStateException from commit under // certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last @@ -426,7 +436,7 @@ public class Shard extends RaftActor { LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID()); if (isLeader()) { - commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); + commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this); } else { ActorSelection leader = getLeader(); if (leader == null) { @@ -441,7 +451,7 @@ public class Shard extends RaftActor { protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) { try { - commitCoordinator.handleBatchedModifications(batched, sender, this); + commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext()); } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionID(), e); @@ -510,7 +520,7 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { try { - commitCoordinator.handleReadyLocalTransaction(message, getSender(), this); + commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext()); } catch (Exception e) { LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionID(), e); @@ -534,7 +544,8 @@ public class Shard extends RaftActor { boolean isLeaderActive = isLeaderActive(); if (isLeader() && isLeaderActive) { - commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this); + commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this, + store.getSchemaContext()); } else { ActorSelection leader = getLeader(); if (!isLeaderActive || leader == null) {