2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import com.google.common.base.Preconditions;
22 import java.util.ArrayList;
23 import java.util.HashSet;
24 import java.util.List;
27 import java.util.concurrent.ThreadLocalRandom;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
30 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import scala.concurrent.Future;
42 import scala.concurrent.duration.FiniteDuration;
45 * Gossiper that syncs bucket store across nodes in the cluster.
47 * It keeps a local scheduler that periodically sends Gossip ticks to
48 * itself to send bucket store's bucket versions to a randomly selected remote
51 * When bucket versions are received from a remote gossiper, it is compared
52 * with bucket store's bucket versions. Which ever buckets are newer
53 * locally, are sent to remote gossiper. If any bucket is older in bucket store,
54 * a gossip status is sent to remote gossiper so that it can send the newer buckets.
56 * When a bucket is received from a remote gossiper, its sent to the bucket store
61 public class Gossiper extends AbstractUntypedActorWithMetering {
63 private final Logger log = LoggerFactory.getLogger(getClass());
65 private Cluster cluster;
68 * ActorSystem's address for the current cluster node.
70 private Address selfAddress;
73 * All known cluster members
75 private List<Address> clusterMembers = new ArrayList<>();
77 private Cancellable gossipTask;
79 private Boolean autoStartGossipTicks = true;
81 private final RemoteRpcProviderConfig config;
83 public Gossiper(RemoteRpcProviderConfig config){
84 this.config = Preconditions.checkNotNull(config);
89 * @param autoStartGossipTicks used for turning off gossip ticks during testing.
90 * Gossip tick can be manually sent.
92 public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config){
94 this.autoStartGossipTicks = autoStartGossipTicks;
98 public void preStart(){
99 ActorRefProvider provider = getContext().provider();
100 selfAddress = provider.getDefaultAddress();
102 if ( provider instanceof ClusterActorRefProvider ) {
103 cluster = Cluster.get(getContext().system());
104 cluster.subscribe(getSelf(),
105 ClusterEvent.initialStateAsEvents(),
106 ClusterEvent.MemberEvent.class,
107 ClusterEvent.UnreachableMember.class);
110 if (autoStartGossipTicks) {
111 gossipTask = getContext().system().scheduler().schedule(
112 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
113 config.getGossipTickInterval(), //interval
115 new Messages.GossiperMessages.GossipTick(), //message
116 getContext().dispatcher(), //execution context
123 public void postStop(){
124 if (cluster != null) {
125 cluster.unsubscribe(getSelf());
127 if (gossipTask != null) {
133 protected void handleReceive(Object message) throws Exception {
134 //Usually sent by self via gossip task defined above. But its not enforced.
135 //These ticks can be sent by another actor as well which is esp. useful while testing
136 if (message instanceof GossipTick) {
138 } else if (message instanceof GossipStatus) {
139 // Message from remote gossiper with its bucket versions
140 receiveGossipStatus((GossipStatus) message);
141 } else if (message instanceof GossipEnvelope) {
142 // Message from remote gossiper with buckets. This is usually in response to GossipStatus
143 // message. The contained buckets are newer as determined by the remote gossiper by
144 // comparing the GossipStatus message with its local versions.
145 receiveGossip((GossipEnvelope) message);
146 } else if (message instanceof ClusterEvent.MemberUp) {
147 receiveMemberUp(((ClusterEvent.MemberUp) message).member());
149 } else if (message instanceof ClusterEvent.MemberRemoved) {
150 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
152 } else if ( message instanceof ClusterEvent.UnreachableMember){
153 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
161 * Remove member from local copy of member list. If member down is self, then stop the actor
163 * @param member who went down
165 void receiveMemberRemoveOrUnreachable(Member member) {
166 //if its self, then stop itself
167 if (selfAddress.equals(member.address())){
168 getContext().stop(getSelf());
172 clusterMembers.remove(member.address());
173 if(log.isDebugEnabled()) {
174 log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
179 * Add member to the local copy of member list if it doesnt already
182 void receiveMemberUp(Member member) {
184 if (selfAddress.equals(member.address())) {
185 return; //ignore up notification for self
188 if (!clusterMembers.contains(member.address())) {
189 clusterMembers.add(member.address());
191 if(log.isDebugEnabled()) {
192 log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
197 * Sends Gossip status to other members in the cluster. <br/>
198 * 1. If there are no member, ignore the tick. </br>
199 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
200 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
202 void receiveGossipTick(){
203 if (clusterMembers.size() == 0) {
204 return; //no members to send gossip status to
207 Address remoteMemberToGossipTo;
209 if (clusterMembers.size() == 1) {
210 remoteMemberToGossipTo = clusterMembers.get(0);
212 Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
213 remoteMemberToGossipTo = clusterMembers.get(randomIndex);
215 if(log.isTraceEnabled()) {
216 log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
218 getLocalStatusAndSendTo(remoteMemberToGossipTo);
222 * Process gossip status received from a remote gossiper. Remote versions are compared with
223 * the local copy. <p>
227 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
228 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
229 * <li>If both are same, noop</li>
232 * @param status bucket versions from a remote member
234 void receiveGossipStatus(GossipStatus status){
235 //Don't accept messages from non-members
236 if (!clusterMembers.contains(status.from())) {
240 final ActorRef sender = getSender();
241 Future<Object> futureReply =
242 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
244 futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
249 * Sends the received buckets in the envelope to the parent Bucket store.
251 * @param envelope contains buckets from a remote gossiper
253 void receiveGossip(GossipEnvelope envelope){
254 //TODO: Add more validations
255 if (!selfAddress.equals(envelope.to())) {
256 if(log.isTraceEnabled()) {
257 log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
262 updateRemoteBuckets(envelope.getBuckets());
267 * Helper to send received buckets to bucket store
271 void updateRemoteBuckets(Map<Address, Bucket> buckets) {
273 UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
274 getContext().parent().tell(updateRemoteBuckets, getSelf());
278 * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
280 * @param remote remote node to send Buckets to
281 * @param addresses node addresses whose buckets needs to be sent
283 void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
285 Future<Object> futureReply =
286 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
287 futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
291 * Gets bucket versions from bucket store and sends to the supplied address
293 * @param remoteActorSystemAddress remote gossiper to send to
295 void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
297 //Get local status from bucket store and send to remote
298 Future<Object> futureReply =
299 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
301 //Find gossiper on remote system
302 ActorSelection remoteRef = getContext().system().actorSelection(
303 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
305 if(log.isTraceEnabled()) {
306 log.trace("Sending bucket versions to [{}]", remoteRef);
309 futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
314 * Helper to send bucket versions received from local store
315 * @param remote remote gossiper to send versions to
316 * @param localVersions bucket versions received from local store
318 void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
320 GossipStatus status = new GossipStatus(selfAddress, localVersions);
321 remote.tell(status, getSelf());
324 void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
326 GossipStatus status = new GossipStatus(selfAddress, localVersions);
327 remote.tell(status, getSelf());
331 /// Private factories to create mappers
334 private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
336 return new Mapper<Object, Void>() {
338 public Void apply(Object replyMessage) {
339 if (replyMessage instanceof GetBucketVersionsReply) {
340 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
341 Map<Address, Long> localVersions = reply.getVersions();
343 sendGossipStatusTo(remote, localVersions);
352 * Process bucket versions received from
353 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
354 * Then this method compares remote bucket versions with local bucket versions.
356 * <li>The buckets that are newer locally, send
357 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
359 * <li>The buckets that are older locally, send
360 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
361 * to remote so that remote sends GossipEnvelop.
364 * @param sender the remote member
365 * @param status bucket versions from a remote member
366 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
369 private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
371 final Map<Address, Long> remoteVersions = status.getVersions();
373 return new Mapper<Object, Void>() {
375 public Void apply(Object replyMessage) {
376 if (replyMessage instanceof GetBucketVersionsReply) {
377 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
378 Map<Address, Long> localVersions = reply.getVersions();
380 //diff between remote list and local
381 Set<Address> localIsOlder = new HashSet<>();
382 localIsOlder.addAll(remoteVersions.keySet());
383 localIsOlder.removeAll(localVersions.keySet());
385 //diff between local list and remote
386 Set<Address> localIsNewer = new HashSet<>();
387 localIsNewer.addAll(localVersions.keySet());
388 localIsNewer.removeAll(remoteVersions.keySet());
391 for (Address address : remoteVersions.keySet()){
393 if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
394 continue; //this condition is taken care of by above diffs
396 if (localVersions.get(address) < remoteVersions.get(address)) {
397 localIsOlder.add(address);
398 } else if (localVersions.get(address) > remoteVersions.get(address)) {
399 localIsNewer.add(address);
403 if (!localIsOlder.isEmpty()) {
404 sendGossipStatusTo(sender, localVersions );
407 if (!localIsNewer.isEmpty()) {
408 sendGossipTo(sender, localIsNewer);//send newer buckets to remote
418 * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
419 * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
420 * These buckets are sent to a remote member encapsulated in
421 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
423 * @param sender the remote member that sent
424 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
425 * in reply to which bucket is being sent back
426 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
429 private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
431 return new Mapper<Object, Void>() {
433 public Void apply(Object msg) {
434 if (msg instanceof GetBucketsByMembersReply) {
435 Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
436 if(log.isTraceEnabled()) {
437 log.trace("Buckets to send from {}: {}", selfAddress, buckets);
439 GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
440 sender.tell(envelope, getSelf());
450 List<Address> getClusterMembers() {
451 return clusterMembers;
454 void setClusterMembers(List<Address> clusterMembers) {
455 this.clusterMembers = clusterMembers;
458 Address getSelfAddress() {
462 void setSelfAddress(Address selfAddress) {
463 this.selfAddress = selfAddress;