2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Preconditions;
23 import java.util.ArrayList;
24 import java.util.HashSet;
25 import java.util.List;
28 import java.util.concurrent.ThreadLocalRandom;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
31 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.FiniteDuration;
46 * Gossiper that syncs bucket store across nodes in the cluster.
49 * It keeps a local scheduler that periodically sends Gossip ticks to
50 * itself to send bucket store's bucket versions to a randomly selected remote
54 * When bucket versions are received from a remote gossiper, it is compared
55 * with bucket store's bucket versions. Which ever buckets are newer
56 * locally, are sent to remote gossiper. If any bucket is older in bucket store,
57 * a gossip status is sent to remote gossiper so that it can send the newer buckets.
60 * When a bucket is received from a remote gossiper, its sent to the bucket store
63 public class Gossiper extends AbstractUntypedActorWithMetering {
65 private final Logger log = LoggerFactory.getLogger(getClass());
67 private Cluster cluster;
70 * ActorSystem's address for the current cluster node.
72 private Address selfAddress;
75 * All known cluster members.
77 private List<Address> clusterMembers = new ArrayList<>();
79 private Cancellable gossipTask;
81 private Boolean autoStartGossipTicks = true;
83 private final RemoteRpcProviderConfig config;
85 public Gossiper(RemoteRpcProviderConfig config) {
86 this.config = Preconditions.checkNotNull(config);
90 * Constructor for testing.
92 * @param autoStartGossipTicks used for turning off gossip ticks during testing.
93 * Gossip tick can be manually sent.
96 public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
98 this.autoStartGossipTicks = autoStartGossipTicks;
102 public void preStart() {
103 ActorRefProvider provider = getContext().provider();
104 selfAddress = provider.getDefaultAddress();
106 if ( provider instanceof ClusterActorRefProvider ) {
107 cluster = Cluster.get(getContext().system());
108 cluster.subscribe(getSelf(),
109 ClusterEvent.initialStateAsEvents(),
110 ClusterEvent.MemberEvent.class,
111 ClusterEvent.ReachableMember.class,
112 ClusterEvent.UnreachableMember.class);
115 if (autoStartGossipTicks) {
116 gossipTask = getContext().system().scheduler().schedule(
117 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
118 config.getGossipTickInterval(), //interval
120 new Messages.GossiperMessages.GossipTick(), //message
121 getContext().dispatcher(), //execution context
128 public void postStop() {
129 if (cluster != null) {
130 cluster.unsubscribe(getSelf());
132 if (gossipTask != null) {
137 @SuppressWarnings({ "rawtypes", "unchecked" })
139 protected void handleReceive(Object message) throws Exception {
140 //Usually sent by self via gossip task defined above. But its not enforced.
141 //These ticks can be sent by another actor as well which is esp. useful while testing
142 if (message instanceof GossipTick) {
144 } else if (message instanceof GossipStatus) {
145 // Message from remote gossiper with its bucket versions
146 receiveGossipStatus((GossipStatus) message);
147 } else if (message instanceof GossipEnvelope) {
148 // Message from remote gossiper with buckets. This is usually in response to GossipStatus
149 // message. The contained buckets are newer as determined by the remote gossiper by
150 // comparing the GossipStatus message with its local versions.
151 receiveGossip((GossipEnvelope) message);
152 } else if (message instanceof ClusterEvent.MemberUp) {
153 receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
155 } else if (message instanceof ClusterEvent.ReachableMember) {
156 receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
158 } else if (message instanceof ClusterEvent.MemberRemoved) {
159 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
161 } else if ( message instanceof ClusterEvent.UnreachableMember) {
162 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
170 * Remove member from local copy of member list. If member down is self, then stop the actor
172 * @param member who went down
174 void receiveMemberRemoveOrUnreachable(Member member) {
175 //if its self, then stop itself
176 if (selfAddress.equals(member.address())) {
177 getContext().stop(getSelf());
181 clusterMembers.remove(member.address());
182 log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
186 * Add member to the local copy of member list if it doesn't already.
188 * @param member the member to add
190 void receiveMemberUpOrReachable(final Member member) {
192 if (selfAddress.equals(member.address())) {
193 //ignore up notification for self
197 if (!clusterMembers.contains(member.address())) {
198 clusterMembers.add(member.address());
201 log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
205 * Sends Gossip status to other members in the cluster.
207 * 1. If there are no member, ignore the tick. <br>
208 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
209 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
211 void receiveGossipTick() {
212 if (clusterMembers.size() == 0) {
213 return; //no members to send gossip status to
216 Address remoteMemberToGossipTo;
218 if (clusterMembers.size() == 1) {
219 remoteMemberToGossipTo = clusterMembers.get(0);
221 Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
222 remoteMemberToGossipTo = clusterMembers.get(randomIndex);
225 log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
226 getLocalStatusAndSendTo(remoteMemberToGossipTo);
230 * Process gossip status received from a remote gossiper. Remote versions are compared with
235 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
236 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
237 * <li>If both are same, noop</li>
240 * @param status bucket versions from a remote member
242 void receiveGossipStatus(GossipStatus status) {
243 //Don't accept messages from non-members
244 if (!clusterMembers.contains(status.from())) {
248 final ActorRef sender = getSender();
249 Future<Object> futureReply =
250 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
252 futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
257 * Sends the received buckets in the envelope to the parent Bucket store.
259 * @param envelope contains buckets from a remote gossiper
261 <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
262 //TODO: Add more validations
263 if (!selfAddress.equals(envelope.to())) {
264 if (log.isTraceEnabled()) {
265 log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
271 updateRemoteBuckets(envelope.getBuckets());
276 * Helper to send received buckets to bucket store.
278 * @param buckets map of Buckets to update
280 <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
281 UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
282 getContext().parent().tell(updateRemoteBuckets, getSelf());
286 * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
288 * @param remote remote node to send Buckets to
289 * @param addresses node addresses whose buckets needs to be sent
291 void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
293 Future<Object> futureReply =
294 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
295 futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
299 * Gets bucket versions from bucket store and sends to the supplied address.
301 * @param remoteActorSystemAddress remote gossiper to send to
303 void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
305 //Get local status from bucket store and send to remote
306 Future<Object> futureReply =
307 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
309 //Find gossiper on remote system
310 ActorSelection remoteRef = getContext().system().actorSelection(
311 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
313 log.trace("Sending bucket versions to [{}]", remoteRef);
315 futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
320 * Helper to send bucket versions received from local store.
322 * @param remote remote gossiper to send versions to
323 * @param localVersions bucket versions received from local store
325 void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
327 GossipStatus status = new GossipStatus(selfAddress, localVersions);
328 remote.tell(status, getSelf());
331 void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
333 GossipStatus status = new GossipStatus(selfAddress, localVersions);
334 remote.tell(status, getSelf());
338 /// Private factories to create mappers
341 private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
343 return new Mapper<Object, Void>() {
345 public Void apply(Object replyMessage) {
346 if (replyMessage instanceof GetBucketVersionsReply) {
347 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
348 Map<Address, Long> localVersions = reply.getVersions();
350 sendGossipStatusTo(remote, localVersions);
359 * Process bucket versions received from
360 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
361 * Then this method compares remote bucket versions with local bucket versions.
363 * <li>The buckets that are newer locally, send
364 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
366 * <li>The buckets that are older locally, send
367 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
368 * to remote so that remote sends GossipEnvelop.
371 * @param sender the remote member
372 * @param status bucket versions from a remote member
373 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
376 private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
378 final Map<Address, Long> remoteVersions = status.getVersions();
380 return new Mapper<Object, Void>() {
382 public Void apply(Object replyMessage) {
383 if (replyMessage instanceof GetBucketVersionsReply) {
384 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
385 Map<Address, Long> localVersions = reply.getVersions();
387 //diff between remote list and local
388 Set<Address> localIsOlder = new HashSet<>();
389 localIsOlder.addAll(remoteVersions.keySet());
390 localIsOlder.removeAll(localVersions.keySet());
392 //diff between local list and remote
393 Set<Address> localIsNewer = new HashSet<>();
394 localIsNewer.addAll(localVersions.keySet());
395 localIsNewer.removeAll(remoteVersions.keySet());
398 for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
399 Address address = entry.getKey();
400 Long remoteVersion = entry.getValue();
401 Long localVersion = localVersions.get(address);
402 if (localVersion == null || remoteVersion == null) {
403 continue; //this condition is taken care of by above diffs
406 if (localVersion < remoteVersion) {
407 localIsOlder.add(address);
408 } else if (localVersion > remoteVersion) {
409 localIsNewer.add(address);
413 if (!localIsOlder.isEmpty()) {
414 sendGossipStatusTo(sender, localVersions );
417 if (!localIsNewer.isEmpty()) {
418 sendGossipTo(sender, localIsNewer);//send newer buckets to remote
428 * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
429 * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
430 * These buckets are sent to a remote member encapsulated in
431 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
433 * @param sender the remote member that sent
434 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
435 * in reply to which bucket is being sent back
436 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
439 private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
441 return new Mapper<Object, Void>() {
442 @SuppressWarnings({ "rawtypes", "unchecked" })
444 public Void apply(Object msg) {
445 if (msg instanceof GetBucketsByMembersReply) {
446 Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
447 log.trace("Buckets to send from {}: {}", selfAddress, buckets);
448 GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
449 sender.tell(envelope, getSelf());
459 List<Address> getClusterMembers() {
460 return clusterMembers;
463 void setClusterMembers(List<Address> clusterMembers) {
464 this.clusterMembers = clusterMembers;
467 Address getSelfAddress() {
471 void setSelfAddress(Address selfAddress) {
472 this.selfAddress = selfAddress;