Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
BUG-5626: split out CohortEntry
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
Shard.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index a647554806034f5b21d52e04dd8530dd6bb4d51d..7165581d8103fc64148e745e477bc1ec7f2278d6 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@
-27,7
+27,6
@@
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
@@
-82,6
+81,7
@@
import scala.concurrent.duration.FiniteDuration;
* </p>
*/
public class Shard extends RaftActor {
* </p>
*/
public class Shard extends RaftActor {
+
@VisibleForTesting
static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
@Override
@VisibleForTesting
static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
@Override
@@
-218,7
+218,7
@@
public class Shard extends RaftActor {
}
@Override
}
@Override
- protected void handleCommand(final Object message) {
+ protected void handle
NonRaft
Command(final Object message) {
try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
if (maybeError.isPresent()) {
try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
if (maybeError.isPresent()) {
@@
-269,8
+269,11
@@
public class Shard extends RaftActor {
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ commitCoordinator.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else {
} else {
- super.handleCommand(message);
+ super.handle
NonRaft
Command(message);
}
}
}
}
}
}
@@
-334,9
+337,9
@@
public class Shard extends RaftActor {
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
-
if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-
shardMBean.incrementFailedTransactionsCount();
-
}
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+ shardMBean.incrementFailedTransactionsCount();
+ }
} else {
ActorSelection leader = getLeader();
if (leader == null) {
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@
-354,7
+357,7
@@
public class Shard extends RaftActor {
try {
try {
try {
try {
-
cohortEntry.commit();
+ cohortEntry.commit();
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
@@
-432,7
+435,7
@@
public class Shard extends RaftActor {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
-
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@
-447,7
+450,7
@@
public class Shard extends RaftActor {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
- commitCoordinator.handleBatchedModifications(batched, sender, this);
+ commitCoordinator.handleBatchedModifications(batched, sender, this
, store.getSchemaContext()
);
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
@@
-516,7
+519,7
@@
public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
- commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this
, store.getSchemaContext()
);
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
@@
-540,7
+543,8
@@
public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+ store.getSchemaContext());
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {