import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
* </p>
*/
public class Shard extends RaftActor {
+
@VisibleForTesting
static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
@Override
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ commitCoordinator.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else {
super.handleNonRaftCommand(message);
}
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
- if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
- shardMBean.incrementFailedTransactionsCount();
- }
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+ shardMBean.incrementFailedTransactionsCount();
+ }
} else {
ActorSelection leader = getLeader();
if (leader == null) {
try {
try {
- cohortEntry.commit();
+ cohortEntry.commit();
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
- commitCoordinator.handleBatchedModifications(batched, sender, this);
+ commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
- commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+ store.getSchemaContext());
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {