Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
BUG-5414 introduce EOS inJeopardy flag
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
Shard.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
index a47e437e32cf3f694b7497ebec35761f35425cbb..23ad747d4cf3d4c7247271c09f5c64fa9fc44c2e 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
@@
-83,10
+83,21
@@
import scala.concurrent.duration.FiniteDuration;
*/
public class Shard extends RaftActor {
*/
public class Shard extends RaftActor {
- protected static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
+ @VisibleForTesting
+ static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "txCommitTimeoutCheck";
+ }
+ };
@VisibleForTesting
@VisibleForTesting
- static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+ static final Object GET_SHARD_MBEAN_MESSAGE = new Object() {
+ @Override
+ public String toString() {
+ return "getShardMBeanMessage";
+ }
+ };
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
@@
-208,7
+219,7
@@
public class Shard extends RaftActor {
}
@Override
}
@Override
- protected void handleCommand(final Object message) {
+ protected void handle
NonRaft
Command(final Object message) {
try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
if (maybeError.isPresent()) {
try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
if (maybeError.isPresent()) {
@@
-259,8
+270,11
@@
public class Shard extends RaftActor {
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
context().parent().forward(message, context());
} else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
messageRetrySupport.onTimerMessage(message);
+ } else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
+ commitCoordinator.processCohortRegistryCommand(getSender(),
+ (DataTreeCohortActorRegistry.CohortRegistryCommand) message);
} else {
} else {
- super.handleCommand(message);
+ super.handle
NonRaft
Command(message);
}
}
}
}
}
}
@@
-324,9
+338,9
@@
public class Shard extends RaftActor {
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
-
if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
-
shardMBean.incrementFailedTransactionsCount();
-
}
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
+ shardMBean.incrementFailedTransactionsCount();
+ }
} else {
ActorSelection leader = getLeader();
if (leader == null) {
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@
-344,7
+358,7
@@
public class Shard extends RaftActor {
try {
try {
try {
try {
-
cohortEntry.commit();
+ cohortEntry.commit();
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
} catch(ExecutionException e) {
// We may get a "store tree and candidate base differ" IllegalStateException from commit under
// certain edge case scenarios so we'll try to re-apply the candidate from scratch as a last
@@
-422,7
+436,7
@@
public class Shard extends RaftActor {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
-
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
} else {
ActorSelection leader = getLeader();
if (leader == null) {
@@
-437,7
+451,7
@@
public class Shard extends RaftActor {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
- commitCoordinator.handleBatchedModifications(batched, sender, this);
+ commitCoordinator.handleBatchedModifications(batched, sender, this
, store.getSchemaContext()
);
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
@@
-506,7
+520,7
@@
public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
- commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this
, store.getSchemaContext()
);
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
@@
-530,7
+544,8
@@
public class Shard extends RaftActor {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
- commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this,
+ store.getSchemaContext());
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {
} else {
ActorSelection leader = getLeader();
if (!isLeaderActive || leader == null) {