2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.UntypedActor;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.event.Logging;
22 import akka.event.LoggingAdapter;
23 import akka.pattern.Patterns;
24 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
25 import scala.concurrent.Future;
26 import scala.concurrent.duration.FiniteDuration;
28 import java.util.ArrayList;
29 import java.util.HashSet;
30 import java.util.List;
33 import java.util.concurrent.ThreadLocalRandom;
34 import java.util.concurrent.TimeUnit;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
41 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
42 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
43 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
46 * Gossiper that syncs bucket store across nodes in the cluster.
48 * It keeps a local scheduler that periodically sends Gossip ticks to
49 * itself to send bucket store's bucket versions to a randomly selected remote
52 * When bucket versions are received from a remote gossiper, it is compared
53 * with bucket store's bucket versions. Which ever buckets are newer
54 * locally, are sent to remote gossiper. If any bucket is older in bucket store,
55 * a gossip status is sent to remote gossiper so that it can send the newer buckets.
57 * When a bucket is received from a remote gossiper, its sent to the bucket store
62 public class Gossiper extends UntypedActor {
64 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
66 private Cluster cluster;
69 * ActorSystem's address for the current cluster node.
71 private Address selfAddress;
74 * All known cluster members
76 private List<Address> clusterMembers = new ArrayList<>();
78 private Cancellable gossipTask;
80 private Boolean autoStartGossipTicks = true;
86 * @param autoStartGossipTicks used for turning off gossip ticks during testing.
87 * Gossip tick can be manually sent.
89 public Gossiper(Boolean autoStartGossipTicks){
90 this.autoStartGossipTicks = autoStartGossipTicks;
94 public void preStart(){
95 ActorRefProvider provider = getContext().provider();
96 selfAddress = provider.getDefaultAddress();
98 if ( provider instanceof ClusterActorRefProvider ) {
99 cluster = Cluster.get(getContext().system());
100 cluster.subscribe(getSelf(),
101 ClusterEvent.initialStateAsEvents(),
102 ClusterEvent.MemberEvent.class,
103 ClusterEvent.UnreachableMember.class);
106 if (autoStartGossipTicks) {
107 gossipTask = getContext().system().scheduler().schedule(
108 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
109 ActorUtil.GOSSIP_TICK_INTERVAL, //interval
111 new Messages.GossiperMessages.GossipTick(), //message
112 getContext().dispatcher(), //execution context
119 public void postStop(){
121 cluster.unsubscribe(getSelf());
122 if (gossipTask != null)
127 public void onReceive(Object message) throws Exception {
129 log.debug("Received message: node[{}], message[{}]", selfAddress, message);
131 //Usually sent by self via gossip task defined above. But its not enforced.
132 //These ticks can be sent by another actor as well which is esp. useful while testing
133 if (message instanceof GossipTick)
136 //Message from remote gossiper with its bucket versions
137 else if (message instanceof GossipStatus)
138 receiveGossipStatus((GossipStatus) message);
140 //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
141 //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
142 //message with its local versions
143 else if (message instanceof GossipEnvelope)
144 receiveGossip((GossipEnvelope) message);
146 else if (message instanceof ClusterEvent.MemberUp) {
147 receiveMemberUp(((ClusterEvent.MemberUp) message).member());
149 } else if (message instanceof ClusterEvent.MemberRemoved) {
150 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
152 } else if ( message instanceof ClusterEvent.UnreachableMember){
153 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
160 * Remove member from local copy of member list. If member down is self, then stop the actor
162 * @param member who went down
164 void receiveMemberRemoveOrUnreachable(Member member) {
165 //if its self, then stop itself
166 if (selfAddress.equals(member.address())){
167 getContext().stop(getSelf());
171 clusterMembers.remove(member.address());
172 log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
176 * Add member to the local copy of member list if it doesnt already
179 void receiveMemberUp(Member member) {
181 if (selfAddress.equals(member.address()))
182 return; //ignore up notification for self
184 if (!clusterMembers.contains(member.address()))
185 clusterMembers.add(member.address());
187 log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
191 * Sends Gossip status to other members in the cluster. <br/>
192 * 1. If there are no member, ignore the tick. </br>
193 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
194 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
196 void receiveGossipTick(){
197 if (clusterMembers.size() == 0) return; //no members to send gossip status to
199 Address remoteMemberToGossipTo = null;
201 if (clusterMembers.size() == 1)
202 remoteMemberToGossipTo = clusterMembers.get(0);
204 Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
205 remoteMemberToGossipTo = clusterMembers.get(randomIndex);
208 log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
209 getLocalStatusAndSendTo(remoteMemberToGossipTo);
213 * Process gossip status received from a remote gossiper. Remote versions are compared with
214 * the local copy. <p>
218 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
219 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
220 * <li>If both are same, noop</li>
223 * @param status bucket versions from a remote member
225 void receiveGossipStatus(GossipStatus status){
226 //Don't accept messages from non-members
227 if (!clusterMembers.contains(status.from()))
230 final ActorRef sender = getSender();
231 Future<Object> futureReply =
232 Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
234 futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
239 * Sends the received buckets in the envelope to the parent Bucket store.
241 * @param envelope contains buckets from a remote gossiper
243 void receiveGossip(GossipEnvelope envelope){
244 //TODO: Add more validations
245 if (!selfAddress.equals(envelope.to())) {
246 log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
250 updateRemoteBuckets(envelope.getBuckets());
255 * Helper to send received buckets to bucket store
259 void updateRemoteBuckets(Map<Address, Bucket> buckets) {
261 UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
262 getContext().parent().tell(updateRemoteBuckets, getSelf());
266 * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
268 * @param remote remote node to send Buckets to
269 * @param addresses node addresses whose buckets needs to be sent
271 void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
273 Future<Object> futureReply =
274 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
275 futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
279 * Gets bucket versions from bucket store and sends to the supplied address
281 * @param remoteActorSystemAddress remote gossiper to send to
283 void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
285 //Get local status from bucket store and send to remote
286 Future<Object> futureReply =
287 Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
289 //Find gossiper on remote system
290 ActorSelection remoteRef = getContext().system().actorSelection(
291 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
293 log.debug("Sending bucket versions to [{}]", remoteRef);
295 futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
300 * Helper to send bucket versions received from local store
301 * @param remote remote gossiper to send versions to
302 * @param localVersions bucket versions received from local store
304 void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
306 GossipStatus status = new GossipStatus(selfAddress, localVersions);
307 remote.tell(status, getSelf());
310 void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
312 GossipStatus status = new GossipStatus(selfAddress, localVersions);
313 remote.tell(status, getSelf());
317 /// Private factories to create mappers
320 private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
322 return new Mapper<Object, Void>() {
324 public Void apply(Object replyMessage) {
325 if (replyMessage instanceof GetBucketVersionsReply) {
326 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
327 Map<Address, Long> localVersions = reply.getVersions();
329 sendGossipStatusTo(remote, localVersions);
338 * Process bucket versions received from
339 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
340 * Then this method compares remote bucket versions with local bucket versions.
342 * <li>The buckets that are newer locally, send
343 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
345 * <li>The buckets that are older locally, send
346 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
347 * to remote so that remote sends GossipEnvelop.
350 * @param sender the remote member
351 * @param status bucket versions from a remote member
352 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
355 private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
357 final Map<Address, Long> remoteVersions = status.getVersions();
359 return new Mapper<Object, Void>() {
361 public Void apply(Object replyMessage) {
362 if (replyMessage instanceof GetBucketVersionsReply) {
363 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
364 Map<Address, Long> localVersions = reply.getVersions();
366 //diff between remote list and local
367 Set<Address> localIsOlder = new HashSet<>();
368 localIsOlder.addAll(remoteVersions.keySet());
369 localIsOlder.removeAll(localVersions.keySet());
371 //diff between local list and remote
372 Set<Address> localIsNewer = new HashSet<>();
373 localIsNewer.addAll(localVersions.keySet());
374 localIsNewer.removeAll(remoteVersions.keySet());
377 for (Address address : remoteVersions.keySet()){
379 if (localVersions.get(address) == null || remoteVersions.get(address) == null)
380 continue; //this condition is taken care of by above diffs
381 if (localVersions.get(address) < remoteVersions.get(address))
382 localIsOlder.add(address);
383 else if (localVersions.get(address) > remoteVersions.get(address))
384 localIsNewer.add(address);
389 if (!localIsOlder.isEmpty())
390 sendGossipStatusTo(sender, localVersions );
392 if (!localIsNewer.isEmpty())
393 sendGossipTo(sender, localIsNewer);//send newer buckets to remote
402 * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
403 * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
404 * These buckets are sent to a remote member encapsulated in
405 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
407 * @param sender the remote member that sent
408 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
409 * in reply to which bucket is being sent back
410 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
413 private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
415 return new Mapper<Object, Void>() {
417 public Void apply(Object msg) {
418 if (msg instanceof GetBucketsByMembersReply) {
419 Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
420 log.debug("Buckets to send from {}: {}", selfAddress, buckets);
421 GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
422 sender.tell(envelope, getSelf());
432 List<Address> getClusterMembers() {
433 return clusterMembers;
436 void setClusterMembers(List<Address> clusterMembers) {
437 this.clusterMembers = clusterMembers;
440 Address getSelfAddress() {
444 void setSelfAddress(Address selfAddress) {
445 this.selfAddress = selfAddress;