Code Review
/
controller.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Merge "Fix bug 2554 - XSQL getting stuck on some models"
[controller.git]
/
opendaylight
/
md-sal
/
sal-remoterpc-connector
/
src
/
main
/
java
/
org
/
opendaylight
/
controller
/
remote
/
rpc
/
registry
/
gossip
/
Gossiper.java
diff --git
a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
index a8bc25c40ba14b2ecf8c45926f7871f189dbdafa..1bbcc69f5ed4d5fa6d7d8ea773823c97c9bb6e05 100644
(file)
--- a/
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
+++ b/
opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java
@@
-12,7
+12,6
@@
import akka.actor.ActorRefProvider;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
-import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
import akka.cluster.Cluster;
import akka.cluster.ClusterActorRefProvider;
import akka.cluster.ClusterEvent;
@@
-21,6
+20,8
@@
import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
+import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@
-58,7
+59,7
@@
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go
*
*/
*
*/
-public class Gossiper extends
UntypedActor
{
+public class Gossiper extends
AbstractUntypedActorWithMetering
{
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@@
-78,7
+79,11
@@
public class Gossiper extends UntypedActor {
private Boolean autoStartGossipTicks = true;
private Boolean autoStartGossipTicks = true;
- public Gossiper(){}
+ private RemoteRpcProviderConfig config;
+
+ public Gossiper(){
+ config = new RemoteRpcProviderConfig(getContext().system().settings().config());
+ }
/**
* Helpful for testing
/**
* Helpful for testing
@@
-105,7
+110,7
@@
public class Gossiper extends UntypedActor {
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
-
new FiniteDuration(500, TimeUnit.MILLISECONDS),
//interval
+
config.getGossipTickInterval(),
//interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
@@
-123,22
+128,19
@@
public class Gossiper extends UntypedActor {
}
@Override
}
@Override
- public void onReceive(Object message) throws Exception {
-
- log.debug("Received message: node[{}], message[{}]", selfAddress, message);
-
+ protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick)
receiveGossipTick();
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick)
receiveGossipTick();
- //Message from remote gossiper with its bucket versions
+
//Message from remote gossiper with its bucket versions
else if (message instanceof GossipStatus)
receiveGossipStatus((GossipStatus) message);
else if (message instanceof GossipStatus)
receiveGossipStatus((GossipStatus) message);
- //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
- //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
- //message with its local versions
+
//Message from remote gossiper with buckets. This is usually in response to GossipStatus message
+
//The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
+
//message with its local versions
else if (message instanceof GossipEnvelope)
receiveGossip((GossipEnvelope) message);
else if (message instanceof GossipEnvelope)
receiveGossip((GossipEnvelope) message);
@@
-168,7
+170,9
@@
public class Gossiper extends UntypedActor {
}
clusterMembers.remove(member.address());
}
clusterMembers.remove(member.address());
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
}
/**
@@
-182,8
+186,9
@@
public class Gossiper extends UntypedActor {
if (!clusterMembers.contains(member.address()))
clusterMembers.add(member.address());
if (!clusterMembers.contains(member.address()))
clusterMembers.add(member.address());
-
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ if(log.isDebugEnabled()) {
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
+ }
}
/**
}
/**
@@
-195,7
+200,7
@@
public class Gossiper extends UntypedActor {
void receiveGossipTick(){
if (clusterMembers.size() == 0) return; //no members to send gossip status to
void receiveGossipTick(){
if (clusterMembers.size() == 0) return; //no members to send gossip status to
- Address remoteMemberToGossipTo
= null
;
+ Address remoteMemberToGossipTo;
if (clusterMembers.size() == 1)
remoteMemberToGossipTo = clusterMembers.get(0);
if (clusterMembers.size() == 1)
remoteMemberToGossipTo = clusterMembers.get(0);
@@
-203,8
+208,9
@@
public class Gossiper extends UntypedActor {
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
-
- log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ if(log.isDebugEnabled()) {
+ log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
+ }
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
@@
-227,7
+233,9
@@
public class Gossiper extends UntypedActor {
return;
final ActorRef sender = getSender();
return;
final ActorRef sender = getSender();
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
+
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
}
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
}
@@
-240,7
+248,9
@@
public class Gossiper extends UntypedActor {
void receiveGossip(GossipEnvelope envelope){
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
void receiveGossip(GossipEnvelope envelope){
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ if(log.isDebugEnabled()) {
+ log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ }
return;
}
return;
}
@@
-267,7
+277,8
@@
public class Gossiper extends UntypedActor {
*/
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
*/
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
}
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
}
@@
-279,11
+290,16
@@
public class Gossiper extends UntypedActor {
void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
//Get local status from bucket store and send to remote
void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
//Get local status from bucket store and send to remote
- Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
+ Future<Object> futureReply =
+ Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
+
+ //Find gossiper on remote system
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- log.debug("Sending bucket versions to [{}]", remoteRef);
+ if(log.isDebugEnabled()) {
+ log.debug("Sending bucket versions to [{}]", remoteRef);
+ }
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
@@
-375,8
+391,6
@@
public class Gossiper extends UntypedActor {
localIsOlder.add(address);
else if (localVersions.get(address) > remoteVersions.get(address))
localIsNewer.add(address);
localIsOlder.add(address);
else if (localVersions.get(address) > remoteVersions.get(address))
localIsNewer.add(address);
- else
- continue;
}
if (!localIsOlder.isEmpty())
}
if (!localIsOlder.isEmpty())
@@
-410,7
+424,9
@@
public class Gossiper extends UntypedActor {
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+ if(log.isDebugEnabled()) {
+ log.debug("Buckets to send from {}: {}", selfAddress, buckets);
+ }
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}