2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Address;
13 import akka.actor.Cancellable;
14 import akka.actor.UntypedActor;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.Member;
18 import akka.dispatch.Mapper;
19 import akka.event.Logging;
20 import akka.event.LoggingAdapter;
21 import akka.pattern.Patterns;
22 import scala.concurrent.Future;
23 import scala.concurrent.duration.FiniteDuration;
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.List;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
43 * Gossiper that syncs bucket store across nodes in the cluster.
45 * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions
46 * to a randomly selected remote gossiper.
48 * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions.
49 * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a
50 * gossip status is sent to remote gossiper so that it can send the newer buckets.
52 * When a bucket is received from a remote gossiper, its sent to the bucket store for update.
56 public class Gossiper extends UntypedActor {
58 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
60 Cluster cluster = Cluster.get(getContext().system());
63 * ActorSystem's address for the current cluster node.
65 private Address selfAddress = cluster.selfAddress();
68 * All known cluster members
70 private List<Address> clusterMembers = new ArrayList<>();
72 private Cancellable gossipTask;
74 private Boolean autoStartGossipTicks = true;
80 * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent.
82 public Gossiper(Boolean autoStartGossipTicks){
83 this.autoStartGossipTicks = autoStartGossipTicks;
87 public void preStart(){
89 cluster.subscribe(getSelf(),
90 ClusterEvent.initialStateAsEvents(),
91 ClusterEvent.MemberEvent.class,
92 ClusterEvent.UnreachableMember.class);
94 if (autoStartGossipTicks) {
95 gossipTask = getContext().system().scheduler().schedule(
96 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
97 new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
99 new Messages.GossiperMessages.GossipTick(), //message
100 getContext().dispatcher(), //execution context
107 public void postStop(){
109 cluster.unsubscribe(getSelf());
110 if (gossipTask != null)
115 public void onReceive(Object message) throws Exception {
117 log.debug("Received message: node[{}], message[{}]", selfAddress, message);
119 //Usually sent by self via gossip task defined above. But its not enforced.
120 //These ticks can be sent by another actor as well which is esp. useful while testing
121 if (message instanceof GossipTick)
124 //Message from remote gossiper with its bucket versions
125 else if (message instanceof GossipStatus)
126 receiveGossipStatus((GossipStatus) message);
128 //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
129 //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
130 //message with its local versions
131 else if (message instanceof GossipEnvelope)
132 receiveGossip((GossipEnvelope) message);
134 else if (message instanceof ClusterEvent.MemberUp) {
135 receiveMemberUp(((ClusterEvent.MemberUp) message).member());
137 } else if (message instanceof ClusterEvent.MemberRemoved) {
138 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
140 } else if ( message instanceof ClusterEvent.UnreachableMember){
141 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
148 * Remove member from local copy of member list. If member down is self, then stop the actor
150 * @param member who went down
152 void receiveMemberRemoveOrUnreachable(Member member) {
153 //if its self, then stop itself
154 if (selfAddress.equals(member.address())){
155 getContext().stop(getSelf());
159 clusterMembers.remove(member.address());
160 log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
164 * Add member to the local copy of member list if it doesnt already
167 void receiveMemberUp(Member member) {
169 if (selfAddress.equals(member.address()))
170 return; //ignore up notification for self
172 if (!clusterMembers.contains(member.address()))
173 clusterMembers.add(member.address());
175 log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
179 * Sends Gossip status to other members in the cluster. <br/>
180 * 1. If there are no member, ignore the tick. </br>
181 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
182 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
184 void receiveGossipTick(){
185 if (clusterMembers.size() == 0) return; //no members to send gossip status to
187 Address remoteMemberToGossipTo = null;
189 if (clusterMembers.size() == 1)
190 remoteMemberToGossipTo = clusterMembers.get(0);
192 Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
193 remoteMemberToGossipTo = clusterMembers.get(randomIndex);
196 log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
197 getLocalStatusAndSendTo(remoteMemberToGossipTo);
201 * Process gossip status received from a remote gossiper. Remote versions are compared with
202 * the local copy. <p>
206 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
207 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
208 * <li>If both are same, noop</li>
211 * @param status bucket versions from a remote member
213 void receiveGossipStatus(GossipStatus status){
214 //Dont want to accept messages from non-members
215 if (!clusterMembers.contains(status.from()))
218 final ActorRef sender = getSender();
220 Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
222 futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
227 * Sends the received buckets in the envelope to the parent Bucket store.
229 * @param envelope contains buckets from a remote gossiper
231 void receiveGossip(GossipEnvelope envelope){
232 //TODO: Add more validations
233 if (!selfAddress.equals(envelope.to())) {
234 log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
237 if (envelope.getBuckets() == null)
238 return; //nothing to do
240 updateRemoteBuckets(envelope.getBuckets());
245 * Helper to send received buckets to bucket store
249 void updateRemoteBuckets(Map<Address, Bucket> buckets) {
251 if (buckets == null || buckets.isEmpty())
252 return; //nothing to merge
254 UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
256 getContext().parent().tell(updateRemoteBuckets, getSelf());
260 * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
262 * @param remote remote node to send Buckets to
263 * @param addresses node addresses whose buckets needs to be sent
265 void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
267 Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
269 futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
274 * Gets bucket versions from bucket store and sends to the supplied address
276 * @param remoteActorSystemAddress remote gossiper to send to
278 void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
280 //Get local status from bucket store and send to remote
281 Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
283 ActorSelection remoteRef = getContext().system().actorSelection(
284 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
286 log.debug("Sending bucket versions to [{}]", remoteRef);
288 futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
293 * Helper to send bucket versions received from local store
294 * @param remote remote gossiper to send versions to
295 * @param localVersions bucket versions received from local store
297 void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
299 GossipStatus status = new GossipStatus(selfAddress, localVersions);
300 remote.tell(status, getSelf());
303 void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
305 GossipStatus status = new GossipStatus(selfAddress, localVersions);
306 remote.tell(status, getSelf());
310 /// Private factories to create mappers
313 private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
315 return new Mapper<Object, Void>() {
317 public Void apply(Object replyMessage) {
318 if (replyMessage instanceof GetBucketVersionsReply) {
319 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
320 Map<Address, Long> localVersions = reply.getVersions();
322 sendGossipStatusTo(remote, localVersions);
331 * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
332 * Then this method compares remote bucket versions with local bucket versions.
334 * <li>The buckets that are newer locally, send
335 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote
336 * <li>The buckets that are older locally, send
337 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that
338 * remote sends GossipEnvelop.
341 * @param sender the remote member
342 * @param status bucket versions from a remote member
343 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
346 private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
348 final Map<Address, Long> remoteVersions = status.getVersions();
350 return new Mapper<Object, Void>() {
352 public Void apply(Object replyMessage) {
353 if (replyMessage instanceof GetBucketVersionsReply) {
354 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
355 Map<Address, Long> localVersions = reply.getVersions();
357 //diff between remote list and local
358 Set<Address> localIsOlder = new HashSet<>();
359 localIsOlder.addAll(remoteVersions.keySet());
360 localIsOlder.removeAll(localVersions.keySet());
362 //diff between local list and remote
363 Set<Address> localIsNewer = new HashSet<>();
364 localIsNewer.addAll(localVersions.keySet());
365 localIsNewer.removeAll(remoteVersions.keySet());
368 for (Address address : remoteVersions.keySet()){
370 if (localVersions.get(address) == null || remoteVersions.get(address) == null)
371 continue; //this condition is taken care of by above diffs
372 if (localVersions.get(address) < remoteVersions.get(address))
373 localIsOlder.add(address);
374 else if (localVersions.get(address) > remoteVersions.get(address))
375 localIsNewer.add(address);
380 if (!localIsOlder.isEmpty())
381 sendGossipStatusTo(sender, localVersions );
383 if (!localIsNewer.isEmpty())
384 sendGossipTo(sender, localIsNewer);//send newer buckets to remote
393 * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains
394 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated
395 * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
397 * @param sender the remote member that sent
398 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
399 * in reply to which bucket is being sent back
400 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
403 private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
405 return new Mapper<Object, Void>() {
407 public Void apply(Object msg) {
408 if (msg instanceof GetBucketsByMembersReply) {
409 Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
410 log.info("Buckets to send from {}: {}", selfAddress, buckets);
411 GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
412 sender.tell(envelope, getSelf());
422 List<Address> getClusterMembers() {
423 return clusterMembers;
426 void setClusterMembers(List<Address> clusterMembers) {
427 this.clusterMembers = clusterMembers;
430 Address getSelfAddress() {
434 void setSelfAddress(Address selfAddress) {
435 this.selfAddress = selfAddress;