/*
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.remote.rpc.registry.gossip;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import akka.dispatch.Mapper;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
/**
* Gossiper that syncs bucket store across nodes in the cluster.
*
* It keeps a local scheduler that periodically sends Gossip ticks to
* itself to send bucket store's bucket versions to a randomly selected remote
* gossiper.
*
* When bucket versions are received from a remote gossiper, it is compared
* with bucket store's bucket versions. Which ever buckets are newer
* locally, are sent to remote gossiper. If any bucket is older in bucket store,
* a gossip status is sent to remote gossiper so that it can send the newer buckets.
*
* When a bucket is received from a remote gossiper, its sent to the bucket store
* for update.
*
*/
public class Gossiper extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
Cluster cluster = Cluster.get(getContext().system());
/**
* ActorSystem's address for the current cluster node.
*/
private Address selfAddress = cluster.selfAddress();
/**
* All known cluster members
*/
private List clusterMembers = new ArrayList<>();
private Cancellable gossipTask;
private Boolean autoStartGossipTicks = true;
public Gossiper(){}
/**
* Helpful for testing
* @param autoStartGossipTicks used for turning off gossip ticks during testing.
* Gossip tick can be manually sent.
*/
public Gossiper(Boolean autoStartGossipTicks){
this.autoStartGossipTicks = autoStartGossipTicks;
}
@Override
public void preStart(){
cluster.subscribe(getSelf(),
ClusterEvent.initialStateAsEvents(),
ClusterEvent.MemberEvent.class,
ClusterEvent.UnreachableMember.class);
if (autoStartGossipTicks) {
gossipTask = getContext().system().scheduler().schedule(
new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
getSelf(), //target
new Messages.GossiperMessages.GossipTick(), //message
getContext().dispatcher(), //execution context
getSelf() //sender
);
}
}
@Override
public void postStop(){
if (cluster != null)
cluster.unsubscribe(getSelf());
if (gossipTask != null)
gossipTask.cancel();
}
@Override
public void onReceive(Object message) throws Exception {
log.debug("Received message: node[{}], message[{}]", selfAddress, message);
//Usually sent by self via gossip task defined above. But its not enforced.
//These ticks can be sent by another actor as well which is esp. useful while testing
if (message instanceof GossipTick)
receiveGossipTick();
//Message from remote gossiper with its bucket versions
else if (message instanceof GossipStatus)
receiveGossipStatus((GossipStatus) message);
//Message from remote gossiper with buckets. This is usually in response to GossipStatus message
//The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
//message with its local versions
else if (message instanceof GossipEnvelope)
receiveGossip((GossipEnvelope) message);
else if (message instanceof ClusterEvent.MemberUp) {
receiveMemberUp(((ClusterEvent.MemberUp) message).member());
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
} else if ( message instanceof ClusterEvent.UnreachableMember){
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
} else
unhandled(message);
}
/**
* Remove member from local copy of member list. If member down is self, then stop the actor
*
* @param member who went down
*/
void receiveMemberRemoveOrUnreachable(Member member) {
//if its self, then stop itself
if (selfAddress.equals(member.address())){
getContext().stop(getSelf());
return;
}
clusterMembers.remove(member.address());
log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
* Add member to the local copy of member list if it doesnt already
* @param member
*/
void receiveMemberUp(Member member) {
if (selfAddress.equals(member.address()))
return; //ignore up notification for self
if (!clusterMembers.contains(member.address()))
clusterMembers.add(member.address());
log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
* Sends Gossip status to other members in the cluster.
* 1. If there are no member, ignore the tick.
* 2. If there's only 1 member, send gossip status (bucket versions) to it.
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
void receiveGossipTick(){
if (clusterMembers.size() == 0) return; //no members to send gossip status to
Address remoteMemberToGossipTo = null;
if (clusterMembers.size() == 1)
remoteMemberToGossipTo = clusterMembers.get(0);
else {
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
/**
* Process gossip status received from a remote gossiper. Remote versions are compared with
* the local copy.
*
* For each bucket
*
*
If local copy is newer, the newer buckets are sent in GossipEnvelope to remote
*
If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope
*
If both are same, noop
*
*
* @param status bucket versions from a remote member
*/
void receiveGossipStatus(GossipStatus status){
//Don't accept messages from non-members
if (!clusterMembers.contains(status.from()))
return;
final ActorRef sender = getSender();
Future