Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Use TransactionCommitOperationTimeout when commiting a Local transaction
[controller.git]
/
opendaylight
/
md-sal
/
sal-distributed-datastore
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
cluster
/
datastore
/
ShardManager.java
diff --git
a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index f5baf699c8afdacac163fa65c66e15d6e631584b..f4fa7b3a97e8617f8ac474b494a08deb330a0b6d 100644
(file)
--- a/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@
-24,6
+24,7
@@
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
@@
-54,17
+55,20
@@
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.Sha
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
@@
-185,8
+189,8
@@
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
} else if(message instanceof ShardNotInitializedTimeout) {
onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
} else if(message instanceof ShardNotInitializedTimeout) {
onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
- } else if(message instanceof LeaderStateChanged) {
- onLeaderStateChanged((LeaderStateChanged)message);
+ } else if(message instanceof
Shard
LeaderStateChanged) {
+ onLeaderStateChanged((
Shard
LeaderStateChanged)message);
} else {
unknownMessage(message);
}
} else {
unknownMessage(message);
}
@@
-202,11
+206,12
@@
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
}
}
}
}
- private void onLeaderStateChanged(LeaderStateChanged leaderStateChanged) {
+ private void onLeaderStateChanged(
Shard
LeaderStateChanged leaderStateChanged) {
LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
+ shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
checkReady();
} else {
shardInformation.setLeaderId(leaderStateChanged.getLeaderId());
checkReady();
} else {
@@
-508,6
+513,7
@@
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
final String shardName = message.getShardName();
LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
final String shardName = message.getShardName();
+ final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
// First see if the there is a local replica for the shard
final ShardInformation info = localShards.get(shardName);
@@
-515,7
+521,10
@@
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
@Override
public Object get() {
- Object found = new PrimaryFound(info.getSerializedLeaderActor());
+ String primaryPath = info.getSerializedLeaderActor();
+ Object found = canReturnLocalShardState && info.isLeader() ?
+ new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().get()) :
+ new RemotePrimaryShardFound(primaryPath);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
@@
-535,7
+544,8
@@
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
shardName, path);
LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
shardName, path);
- getContext().actorSelection(path).forward(message, getContext());
+ getContext().actorSelection(path).forward(new RemoteFindPrimary(shardName,
+ message.isWaitUntilReady()), getContext());
return;
}
}
return;
}
}
@@
-663,6
+673,7
@@
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private ActorRef actor;
private ActorPath actorPath;
private final Map<String, String> peerAddresses;
private ActorRef actor;
private ActorPath actorPath;
private final Map<String, String> peerAddresses;
+ private Optional<DataTree> localShardDataTree;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
// flag that determines if the actor is ready for business
private boolean actorInitialized = false;
@@
-701,6
+712,14
@@
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
return shardId;
}
return shardId;
}
+ void setLocalDataTree(Optional<DataTree> localShardDataTree) {
+ this.localShardDataTree = localShardDataTree;
+ }
+
+ Optional<DataTree> getLocalShardDataTree() {
+ return localShardDataTree;
+ }
+
Map<String, String> getPeerAddresses() {
return peerAddresses;
}
Map<String, String> getPeerAddresses() {
return peerAddresses;
}