2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import java.util.ArrayList;
22 import java.util.HashSet;
23 import java.util.List;
26 import java.util.concurrent.ThreadLocalRandom;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
29 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.FiniteDuration;
44 * Gossiper that syncs bucket store across nodes in the cluster.
46 * It keeps a local scheduler that periodically sends Gossip ticks to
47 * itself to send bucket store's bucket versions to a randomly selected remote
50 * When bucket versions are received from a remote gossiper, it is compared
51 * with bucket store's bucket versions. Which ever buckets are newer
52 * locally, are sent to remote gossiper. If any bucket is older in bucket store,
53 * a gossip status is sent to remote gossiper so that it can send the newer buckets.
55 * When a bucket is received from a remote gossiper, its sent to the bucket store
60 public class Gossiper extends AbstractUntypedActorWithMetering {
62 private final Logger log = LoggerFactory.getLogger(getClass());
64 private Cluster cluster;
67 * ActorSystem's address for the current cluster node.
69 private Address selfAddress;
72 * All known cluster members
74 private List<Address> clusterMembers = new ArrayList<>();
76 private Cancellable gossipTask;
78 private Boolean autoStartGossipTicks = true;
80 private RemoteRpcProviderConfig config;
83 config = new RemoteRpcProviderConfig(getContext().system().settings().config());
88 * @param autoStartGossipTicks used for turning off gossip ticks during testing.
89 * Gossip tick can be manually sent.
91 public Gossiper(Boolean autoStartGossipTicks){
92 this.autoStartGossipTicks = autoStartGossipTicks;
96 public void preStart(){
97 ActorRefProvider provider = getContext().provider();
98 selfAddress = provider.getDefaultAddress();
100 if ( provider instanceof ClusterActorRefProvider ) {
101 cluster = Cluster.get(getContext().system());
102 cluster.subscribe(getSelf(),
103 ClusterEvent.initialStateAsEvents(),
104 ClusterEvent.MemberEvent.class,
105 ClusterEvent.UnreachableMember.class);
108 if (autoStartGossipTicks) {
109 gossipTask = getContext().system().scheduler().schedule(
110 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
111 config.getGossipTickInterval(), //interval
113 new Messages.GossiperMessages.GossipTick(), //message
114 getContext().dispatcher(), //execution context
121 public void postStop(){
122 if (cluster != null) {
123 cluster.unsubscribe(getSelf());
125 if (gossipTask != null) {
131 protected void handleReceive(Object message) throws Exception {
132 //Usually sent by self via gossip task defined above. But its not enforced.
133 //These ticks can be sent by another actor as well which is esp. useful while testing
134 if (message instanceof GossipTick) {
136 } else if (message instanceof GossipStatus) {
137 // Message from remote gossiper with its bucket versions
138 receiveGossipStatus((GossipStatus) message);
139 } else if (message instanceof GossipEnvelope) {
140 // Message from remote gossiper with buckets. This is usually in response to GossipStatus
141 // message. The contained buckets are newer as determined by the remote gossiper by
142 // comparing the GossipStatus message with its local versions.
143 receiveGossip((GossipEnvelope) message);
144 } else if (message instanceof ClusterEvent.MemberUp) {
145 receiveMemberUp(((ClusterEvent.MemberUp) message).member());
147 } else if (message instanceof ClusterEvent.MemberRemoved) {
148 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
150 } else if ( message instanceof ClusterEvent.UnreachableMember){
151 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
159 * Remove member from local copy of member list. If member down is self, then stop the actor
161 * @param member who went down
163 void receiveMemberRemoveOrUnreachable(Member member) {
164 //if its self, then stop itself
165 if (selfAddress.equals(member.address())){
166 getContext().stop(getSelf());
170 clusterMembers.remove(member.address());
171 if(log.isDebugEnabled()) {
172 log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
177 * Add member to the local copy of member list if it doesnt already
180 void receiveMemberUp(Member member) {
182 if (selfAddress.equals(member.address())) {
183 return; //ignore up notification for self
186 if (!clusterMembers.contains(member.address())) {
187 clusterMembers.add(member.address());
189 if(log.isDebugEnabled()) {
190 log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
195 * Sends Gossip status to other members in the cluster. <br/>
196 * 1. If there are no member, ignore the tick. </br>
197 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
198 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
200 void receiveGossipTick(){
201 if (clusterMembers.size() == 0) {
202 return; //no members to send gossip status to
205 Address remoteMemberToGossipTo;
207 if (clusterMembers.size() == 1) {
208 remoteMemberToGossipTo = clusterMembers.get(0);
210 Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
211 remoteMemberToGossipTo = clusterMembers.get(randomIndex);
213 if(log.isDebugEnabled()) {
214 log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
216 getLocalStatusAndSendTo(remoteMemberToGossipTo);
220 * Process gossip status received from a remote gossiper. Remote versions are compared with
221 * the local copy. <p>
225 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
226 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
227 * <li>If both are same, noop</li>
230 * @param status bucket versions from a remote member
232 void receiveGossipStatus(GossipStatus status){
233 //Don't accept messages from non-members
234 if (!clusterMembers.contains(status.from())) {
238 final ActorRef sender = getSender();
239 Future<Object> futureReply =
240 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
242 futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
247 * Sends the received buckets in the envelope to the parent Bucket store.
249 * @param envelope contains buckets from a remote gossiper
251 void receiveGossip(GossipEnvelope envelope){
252 //TODO: Add more validations
253 if (!selfAddress.equals(envelope.to())) {
254 if(log.isDebugEnabled()) {
255 log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
260 updateRemoteBuckets(envelope.getBuckets());
265 * Helper to send received buckets to bucket store
269 void updateRemoteBuckets(Map<Address, Bucket> buckets) {
271 UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
272 getContext().parent().tell(updateRemoteBuckets, getSelf());
276 * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
278 * @param remote remote node to send Buckets to
279 * @param addresses node addresses whose buckets needs to be sent
281 void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
283 Future<Object> futureReply =
284 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
285 futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
289 * Gets bucket versions from bucket store and sends to the supplied address
291 * @param remoteActorSystemAddress remote gossiper to send to
293 void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
295 //Get local status from bucket store and send to remote
296 Future<Object> futureReply =
297 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
299 //Find gossiper on remote system
300 ActorSelection remoteRef = getContext().system().actorSelection(
301 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
303 if(log.isDebugEnabled()) {
304 log.debug("Sending bucket versions to [{}]", remoteRef);
307 futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
312 * Helper to send bucket versions received from local store
313 * @param remote remote gossiper to send versions to
314 * @param localVersions bucket versions received from local store
316 void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
318 GossipStatus status = new GossipStatus(selfAddress, localVersions);
319 remote.tell(status, getSelf());
322 void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
324 GossipStatus status = new GossipStatus(selfAddress, localVersions);
325 remote.tell(status, getSelf());
329 /// Private factories to create mappers
332 private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
334 return new Mapper<Object, Void>() {
336 public Void apply(Object replyMessage) {
337 if (replyMessage instanceof GetBucketVersionsReply) {
338 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
339 Map<Address, Long> localVersions = reply.getVersions();
341 sendGossipStatusTo(remote, localVersions);
350 * Process bucket versions received from
351 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
352 * Then this method compares remote bucket versions with local bucket versions.
354 * <li>The buckets that are newer locally, send
355 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
357 * <li>The buckets that are older locally, send
358 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
359 * to remote so that remote sends GossipEnvelop.
362 * @param sender the remote member
363 * @param status bucket versions from a remote member
364 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
367 private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
369 final Map<Address, Long> remoteVersions = status.getVersions();
371 return new Mapper<Object, Void>() {
373 public Void apply(Object replyMessage) {
374 if (replyMessage instanceof GetBucketVersionsReply) {
375 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
376 Map<Address, Long> localVersions = reply.getVersions();
378 //diff between remote list and local
379 Set<Address> localIsOlder = new HashSet<>();
380 localIsOlder.addAll(remoteVersions.keySet());
381 localIsOlder.removeAll(localVersions.keySet());
383 //diff between local list and remote
384 Set<Address> localIsNewer = new HashSet<>();
385 localIsNewer.addAll(localVersions.keySet());
386 localIsNewer.removeAll(remoteVersions.keySet());
389 for (Address address : remoteVersions.keySet()){
391 if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
392 continue; //this condition is taken care of by above diffs
394 if (localVersions.get(address) < remoteVersions.get(address)) {
395 localIsOlder.add(address);
396 } else if (localVersions.get(address) > remoteVersions.get(address)) {
397 localIsNewer.add(address);
401 if (!localIsOlder.isEmpty()) {
402 sendGossipStatusTo(sender, localVersions );
405 if (!localIsNewer.isEmpty()) {
406 sendGossipTo(sender, localIsNewer);//send newer buckets to remote
416 * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
417 * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
418 * These buckets are sent to a remote member encapsulated in
419 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
421 * @param sender the remote member that sent
422 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
423 * in reply to which bucket is being sent back
424 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
427 private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
429 return new Mapper<Object, Void>() {
431 public Void apply(Object msg) {
432 if (msg instanceof GetBucketsByMembersReply) {
433 Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
434 if(log.isDebugEnabled()) {
435 log.debug("Buckets to send from {}: {}", selfAddress, buckets);
437 GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
438 sender.tell(envelope, getSelf());
448 List<Address> getClusterMembers() {
449 return clusterMembers;
452 void setClusterMembers(List<Address> clusterMembers) {
453 this.clusterMembers = clusterMembers;
456 Address getSelfAddress() {
460 void setSelfAddress(Address selfAddress) {
461 this.selfAddress = selfAddress;