X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fremote%2Frpc%2Fregistry%2Fgossip%2FGossiper.java;h=8af1c83c558a0a6b4842b9cb7accfa0a6e68a79b;hp=1bbcc69f5ed4d5fa6d7d8ea773823c97c9bb6e05;hb=e4c11407593914ed4520253909d0d7669e51cfac;hpb=e3f75aebce9e773237e5a2e85a75496110698728 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java index 1bbcc69f5e..8af1c83c55 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/src/main/java/org/opendaylight/controller/remote/rpc/registry/gossip/Gossiper.java @@ -17,14 +17,7 @@ import akka.cluster.ClusterActorRefProvider; import akka.cluster.ClusterEvent; import akka.cluster.Member; import akka.dispatch.Mapper; -import akka.event.Logging; -import akka.event.LoggingAdapter; import akka.pattern.Patterns; -import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; -import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -32,15 +25,20 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; - -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; -import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering; +import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus; +import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; /** * Gossiper that syncs bucket store across nodes in the cluster. @@ -61,7 +59,7 @@ import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.Go public class Gossiper extends AbstractUntypedActorWithMetering { - final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + private final Logger log = LoggerFactory.getLogger(getClass()); private Cluster cluster; @@ -121,30 +119,29 @@ public class Gossiper extends AbstractUntypedActorWithMetering { @Override public void postStop(){ - if (cluster != null) + if (cluster != null) { cluster.unsubscribe(getSelf()); - if (gossipTask != null) + } + if (gossipTask != null) { gossipTask.cancel(); + } } @Override protected void handleReceive(Object message) throws Exception { //Usually sent by self via gossip task defined above. But its not enforced. //These ticks can be sent by another actor as well which is esp. useful while testing - if (message instanceof GossipTick) + if (message instanceof GossipTick) { receiveGossipTick(); - - //Message from remote gossiper with its bucket versions - else if (message instanceof GossipStatus) + } else if (message instanceof GossipStatus) { + // Message from remote gossiper with its bucket versions receiveGossipStatus((GossipStatus) message); - - //Message from remote gossiper with buckets. This is usually in response to GossipStatus message - //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus - //message with its local versions - else if (message instanceof GossipEnvelope) + } else if (message instanceof GossipEnvelope) { + // Message from remote gossiper with buckets. This is usually in response to GossipStatus + // message. The contained buckets are newer as determined by the remote gossiper by + // comparing the GossipStatus message with its local versions. receiveGossip((GossipEnvelope) message); - - else if (message instanceof ClusterEvent.MemberUp) { + } else if (message instanceof ClusterEvent.MemberUp) { receiveMemberUp(((ClusterEvent.MemberUp) message).member()); } else if (message instanceof ClusterEvent.MemberRemoved) { @@ -153,8 +150,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { } else if ( message instanceof ClusterEvent.UnreachableMember){ receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member()); - } else + } else { unhandled(message); + } } /** @@ -181,11 +179,13 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ void receiveMemberUp(Member member) { - if (selfAddress.equals(member.address())) + if (selfAddress.equals(member.address())) { return; //ignore up notification for self + } - if (!clusterMembers.contains(member.address())) + if (!clusterMembers.contains(member.address())) { clusterMembers.add(member.address()); + } if(log.isDebugEnabled()) { log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers); } @@ -198,13 +198,15 @@ public class Gossiper extends AbstractUntypedActorWithMetering { * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it. */ void receiveGossipTick(){ - if (clusterMembers.size() == 0) return; //no members to send gossip status to + if (clusterMembers.size() == 0) { + return; //no members to send gossip status to + } Address remoteMemberToGossipTo; - if (clusterMembers.size() == 1) + if (clusterMembers.size() == 1) { remoteMemberToGossipTo = clusterMembers.get(0); - else { + } else { Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size()); remoteMemberToGossipTo = clusterMembers.get(randomIndex); } @@ -229,8 +231,9 @@ public class Gossiper extends AbstractUntypedActorWithMetering { */ void receiveGossipStatus(GossipStatus status){ //Don't accept messages from non-members - if (!clusterMembers.contains(status.from())) + if (!clusterMembers.contains(status.from())) { return; + } final ActorRef sender = getSender(); Future futureReply = @@ -385,19 +388,23 @@ public class Gossiper extends AbstractUntypedActorWithMetering { for (Address address : remoteVersions.keySet()){ - if (localVersions.get(address) == null || remoteVersions.get(address) == null) + if (localVersions.get(address) == null || remoteVersions.get(address) == null) { continue; //this condition is taken care of by above diffs - if (localVersions.get(address) < remoteVersions.get(address)) + } + if (localVersions.get(address) < remoteVersions.get(address)) { localIsOlder.add(address); - else if (localVersions.get(address) > remoteVersions.get(address)) + } else if (localVersions.get(address) > remoteVersions.get(address)) { localIsNewer.add(address); + } } - if (!localIsOlder.isEmpty()) + if (!localIsOlder.isEmpty()) { sendGossipStatusTo(sender, localVersions ); + } - if (!localIsNewer.isEmpty()) + if (!localIsNewer.isEmpty()) { sendGossipTo(sender, localIsNewer);//send newer buckets to remote + } } return null;