2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.pattern.Patterns;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashSet;
27 import java.util.List;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
32 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
33 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
41 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
42 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
43 import scala.concurrent.Future;
44 import scala.concurrent.duration.FiniteDuration;
47 * Gossiper that syncs bucket store across nodes in the cluster.
50 * It keeps a local scheduler that periodically sends Gossip ticks to
51 * itself to send bucket store's bucket versions to a randomly selected remote
55 * When bucket versions are received from a remote gossiper, it is compared
56 * with bucket store's bucket versions. Which ever buckets are newer
57 * locally, are sent to remote gossiper. If any bucket is older in bucket store,
58 * a gossip status is sent to remote gossiper so that it can send the newer buckets.
61 * When a bucket is received from a remote gossiper, its sent to the bucket store
64 public class Gossiper extends AbstractUntypedActorWithMetering {
65 private final boolean autoStartGossipTicks;
66 private final RemoteRpcProviderConfig config;
69 * All known cluster members.
71 private final List<Address> clusterMembers = new ArrayList<>();
74 * ActorSystem's address for the current cluster node.
76 private Address selfAddress;
78 private Cluster cluster;
80 private Cancellable gossipTask;
82 Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
83 this.config = Preconditions.checkNotNull(config);
84 this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
87 Gossiper(final RemoteRpcProviderConfig config) {
88 this(config, Boolean.TRUE);
91 public static Props props(final RemoteRpcProviderConfig config) {
92 return Props.create(Gossiper.class, config);
95 static Props testProps(final RemoteRpcProviderConfig config) {
96 return Props.create(Gossiper.class, config, Boolean.FALSE);
100 public void preStart() {
101 ActorRefProvider provider = getContext().provider();
102 selfAddress = provider.getDefaultAddress();
104 if (provider instanceof ClusterActorRefProvider ) {
105 cluster = Cluster.get(getContext().system());
106 cluster.subscribe(getSelf(),
107 ClusterEvent.initialStateAsEvents(),
108 ClusterEvent.MemberEvent.class,
109 ClusterEvent.ReachableMember.class,
110 ClusterEvent.UnreachableMember.class);
113 if (autoStartGossipTicks) {
114 gossipTask = getContext().system().scheduler().schedule(
115 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
116 config.getGossipTickInterval(), //interval
118 new Messages.GossiperMessages.GossipTick(), //message
119 getContext().dispatcher(), //execution context
126 public void postStop() {
127 if (cluster != null) {
128 cluster.unsubscribe(getSelf());
130 if (gossipTask != null) {
135 @SuppressWarnings({ "rawtypes", "unchecked" })
137 protected void handleReceive(final Object message) throws Exception {
138 //Usually sent by self via gossip task defined above. But its not enforced.
139 //These ticks can be sent by another actor as well which is esp. useful while testing
140 if (message instanceof GossipTick) {
142 } else if (message instanceof GossipStatus) {
143 // Message from remote gossiper with its bucket versions
144 receiveGossipStatus((GossipStatus) message);
145 } else if (message instanceof GossipEnvelope) {
146 // Message from remote gossiper with buckets. This is usually in response to GossipStatus
147 // message. The contained buckets are newer as determined by the remote gossiper by
148 // comparing the GossipStatus message with its local versions.
149 receiveGossip((GossipEnvelope) message);
150 } else if (message instanceof ClusterEvent.MemberUp) {
151 receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
153 } else if (message instanceof ClusterEvent.ReachableMember) {
154 receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
156 } else if (message instanceof ClusterEvent.MemberRemoved) {
157 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
159 } else if (message instanceof ClusterEvent.UnreachableMember) {
160 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
168 * Remove member from local copy of member list. If member down is self, then stop the actor
170 * @param member who went down
172 private void receiveMemberRemoveOrUnreachable(final Member member) {
173 //if its self, then stop itself
174 if (selfAddress.equals(member.address())) {
175 getContext().stop(getSelf());
179 clusterMembers.remove(member.address());
180 LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
182 getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
186 * Add member to the local copy of member list if it doesn't already.
188 * @param member the member to add
190 private void receiveMemberUpOrReachable(final Member member) {
191 //ignore up notification for self
192 if (selfAddress.equals(member.address())) {
196 if (!clusterMembers.contains(member.address())) {
197 clusterMembers.add(member.address());
200 LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
204 * Sends Gossip status to other members in the cluster.
206 * 1. If there are no member, ignore the tick. <br>
207 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
208 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
211 void receiveGossipTick() {
212 final Address remoteMemberToGossipTo;
213 switch (clusterMembers.size()) {
215 //no members to send gossip status to
218 remoteMemberToGossipTo = clusterMembers.get(0);
221 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
222 remoteMemberToGossipTo = clusterMembers.get(randomIndex);
226 LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
227 getLocalStatusAndSendTo(remoteMemberToGossipTo);
231 * Process gossip status received from a remote gossiper. Remote versions are compared with
236 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
237 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
238 * <li>If both are same, noop</li>
241 * @param status bucket versions from a remote member
244 void receiveGossipStatus(final GossipStatus status) {
245 //Don't accept messages from non-members
246 if (!clusterMembers.contains(status.from())) {
250 final ActorRef sender = getSender();
251 Future<Object> futureReply =
252 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
254 futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
258 * Sends the received buckets in the envelope to the parent Bucket store.
260 * @param envelope contains buckets from a remote gossiper
263 <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
264 //TODO: Add more validations
265 if (!selfAddress.equals(envelope.to())) {
266 LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
270 updateRemoteBuckets(envelope.getBuckets());
274 * Helper to send received buckets to bucket store.
276 * @param buckets map of Buckets to update
279 <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
280 getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
284 * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
286 * @param remote remote node to send Buckets to
287 * @param addresses node addresses whose buckets needs to be sent
289 void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
291 Future<Object> futureReply =
292 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
293 futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
297 * Gets bucket versions from bucket store and sends to the supplied address.
299 * @param remoteActorSystemAddress remote gossiper to send to
302 void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
304 //Get local status from bucket store and send to remote
305 Future<Object> futureReply =
306 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
308 //Find gossiper on remote system
309 ActorSelection remoteRef = getContext().system().actorSelection(
310 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
312 LOG.trace("Sending bucket versions to [{}]", remoteRef);
314 futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
318 * Helper to send bucket versions received from local store.
320 * @param remote remote gossiper to send versions to
321 * @param localVersions bucket versions received from local store
323 void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
325 GossipStatus status = new GossipStatus(selfAddress, localVersions);
326 remote.tell(status, getSelf());
329 void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
331 GossipStatus status = new GossipStatus(selfAddress, localVersions);
332 remote.tell(status, getSelf());
336 /// Private factories to create mappers
339 private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
341 return new Mapper<Object, Void>() {
343 public Void apply(final Object replyMessage) {
344 if (replyMessage instanceof GetBucketVersionsReply) {
345 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
346 Map<Address, Long> localVersions = reply.getVersions();
348 sendGossipStatusTo(remote, localVersions);
357 * Process bucket versions received from
358 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
359 * Then this method compares remote bucket versions with local bucket versions.
361 * <li>The buckets that are newer locally, send
362 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
364 * <li>The buckets that are older locally, send
365 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
366 * to remote so that remote sends GossipEnvelop.
369 * @param sender the remote member
370 * @param status bucket versions from a remote member
371 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
374 private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
376 final Map<Address, Long> remoteVersions = status.getVersions();
378 return new Mapper<Object, Void>() {
380 public Void apply(final Object replyMessage) {
381 if (replyMessage instanceof GetBucketVersionsReply) {
382 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
383 Map<Address, Long> localVersions = reply.getVersions();
385 //diff between remote list and local
386 Set<Address> localIsOlder = new HashSet<>();
387 localIsOlder.addAll(remoteVersions.keySet());
388 localIsOlder.removeAll(localVersions.keySet());
390 //diff between local list and remote
391 Set<Address> localIsNewer = new HashSet<>();
392 localIsNewer.addAll(localVersions.keySet());
393 localIsNewer.removeAll(remoteVersions.keySet());
396 for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
397 Address address = entry.getKey();
398 Long remoteVersion = entry.getValue();
399 Long localVersion = localVersions.get(address);
400 if (localVersion == null || remoteVersion == null) {
401 //this condition is taken care of by above diffs
405 if (localVersion < remoteVersion) {
406 localIsOlder.add(address);
407 } else if (localVersion > remoteVersion) {
408 localIsNewer.add(address);
412 if (!localIsOlder.isEmpty()) {
413 sendGossipStatusTo(sender, localVersions);
416 if (!localIsNewer.isEmpty()) {
417 //send newer buckets to remote
418 sendGossipTo(sender, localIsNewer);
427 * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
428 * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
429 * These buckets are sent to a remote member encapsulated in
430 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
432 * @param sender the remote member that sent
433 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
434 * in reply to which bucket is being sent back
435 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
438 private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
440 return new Mapper<Object, Void>() {
441 @SuppressWarnings({ "rawtypes", "unchecked" })
443 public Void apply(final Object msg) {
444 if (msg instanceof GetBucketsByMembersReply) {
445 Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
446 LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
447 GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
448 sender.tell(envelope, getSelf());
460 void setClusterMembers(final Address... members) {
461 clusterMembers.clear();
462 clusterMembers.addAll(Arrays.asList(members));