/**
* Gossiper that syncs bucket store across nodes in the cluster.
- * <p>
- * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions
- * to a randomly selected remote gossiper.
- * <p>
- * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions.
- * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a
- * gossip status is sent to remote gossiper so that it can send the newer buckets.
- * <p>
- * When a bucket is received from a remote gossiper, its sent to the bucket store for update.
+ * <p/>
+ * It keeps a local scheduler that periodically sends Gossip ticks to
+ * itself to send bucket store's bucket versions to a randomly selected remote
+ * gossiper.
+ * <p/>
+ * When bucket versions are received from a remote gossiper, it is compared
+ * with bucket store's bucket versions. Which ever buckets are newer
+ * locally, are sent to remote gossiper. If any bucket is older in bucket store,
+ * a gossip status is sent to remote gossiper so that it can send the newer buckets.
+ * <p/>
+ * When a bucket is received from a remote gossiper, its sent to the bucket store
+ * for update.
*
*/
/**
* Helpful for testing
- * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent.
+ * @param autoStartGossipTicks used for turning off gossip ticks during testing.
+ * Gossip tick can be manually sent.
*/
public Gossiper(Boolean autoStartGossipTicks){
this.autoStartGossipTicks = autoStartGossipTicks;
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
- new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
+ new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
* @param status bucket versions from a remote member
*/
void receiveGossipStatus(GossipStatus status){
- //Dont want to accept messages from non-members
+ //Don't accept messages from non-members
if (!clusterMembers.contains(status.from()))
return;
final ActorRef sender = getSender();
-
Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
-
futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
}
void receiveGossip(GossipEnvelope envelope){
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
return;
}
- if (envelope.getBuckets() == null)
- return; //nothing to do
updateRemoteBuckets(envelope.getBuckets());
*/
void updateRemoteBuckets(Map<Address, Bucket> buckets) {
- if (buckets == null || buckets.isEmpty())
- return; //nothing to merge
-
UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
-
getContext().parent().tell(updateRemoteBuckets, getSelf());
}
void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
-
futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
-
}
/**
//Get local status from bucket store and send to remote
Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
-
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
}
/**
- * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
+ * Process bucket versions received from
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
* Then this method compares remote bucket versions with local bucket versions.
* <ul>
* <li>The buckets that are newer locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+ * to remote
* <li>The buckets that are older locally, send
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that
- * remote sends GossipEnvelop.
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
+ * to remote so that remote sends GossipEnvelop.
* </ul>
*
* @param sender the remote member
}
/**
- * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated
- * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
+ * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
+ * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
+ * These buckets are sent to a remote member encapsulated in
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
*
* @param sender the remote member that sent
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- log.info("Buckets to send from {}: {}", selfAddress, buckets);
+ log.debug("Buckets to send from {}: {}", selfAddress, buckets);
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}