2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.pattern.Patterns;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Verify;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
31 import java.util.concurrent.ThreadLocalRandom;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
34 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
41 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
42 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
43 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
44 import scala.concurrent.Future;
45 import scala.concurrent.duration.FiniteDuration;
48 * Gossiper that syncs bucket store across nodes in the cluster.
51 * It keeps a local scheduler that periodically sends Gossip ticks to
52 * itself to send bucket store's bucket versions to a randomly selected remote
56 * When bucket versions are received from a remote gossiper, it is compared
57 * with bucket store's bucket versions. Which ever buckets are newer
58 * locally, are sent to remote gossiper. If any bucket is older in bucket store,
59 * a gossip status is sent to remote gossiper so that it can send the newer buckets.
62 * When a bucket is received from a remote gossiper, its sent to the bucket store
65 public class Gossiper extends AbstractUntypedActorWithMetering {
66 private final boolean autoStartGossipTicks;
67 private final RemoteRpcProviderConfig config;
70 * All known cluster members.
72 private final List<Address> clusterMembers = new ArrayList<>();
75 * Cached ActorSelections for remote peers.
77 private final Map<Address, ActorSelection> peers = new HashMap<>();
80 * ActorSystem's address for the current cluster node.
82 private Address selfAddress;
84 private Cluster cluster;
86 private Cancellable gossipTask;
88 Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
89 this.config = Preconditions.checkNotNull(config);
90 this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
93 Gossiper(final RemoteRpcProviderConfig config) {
94 this(config, Boolean.TRUE);
97 public static Props props(final RemoteRpcProviderConfig config) {
98 return Props.create(Gossiper.class, config);
101 static Props testProps(final RemoteRpcProviderConfig config) {
102 return Props.create(Gossiper.class, config, Boolean.FALSE);
106 public void preStart() {
107 ActorRefProvider provider = getContext().provider();
108 selfAddress = provider.getDefaultAddress();
110 if (provider instanceof ClusterActorRefProvider ) {
111 cluster = Cluster.get(getContext().system());
112 cluster.subscribe(getSelf(),
113 ClusterEvent.initialStateAsEvents(),
114 ClusterEvent.MemberEvent.class,
115 ClusterEvent.ReachableMember.class,
116 ClusterEvent.UnreachableMember.class);
119 if (autoStartGossipTicks) {
120 gossipTask = getContext().system().scheduler().schedule(
121 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
122 config.getGossipTickInterval(), //interval
124 new Messages.GossiperMessages.GossipTick(), //message
125 getContext().dispatcher(), //execution context
132 public void postStop() {
133 if (cluster != null) {
134 cluster.unsubscribe(getSelf());
136 if (gossipTask != null) {
141 @SuppressWarnings({ "rawtypes", "unchecked" })
143 protected void handleReceive(final Object message) throws Exception {
144 //Usually sent by self via gossip task defined above. But its not enforced.
145 //These ticks can be sent by another actor as well which is esp. useful while testing
146 if (message instanceof GossipTick) {
148 } else if (message instanceof GossipStatus) {
149 // Message from remote gossiper with its bucket versions
150 receiveGossipStatus((GossipStatus) message);
151 } else if (message instanceof GossipEnvelope) {
152 // Message from remote gossiper with buckets. This is usually in response to GossipStatus
153 // message. The contained buckets are newer as determined by the remote gossiper by
154 // comparing the GossipStatus message with its local versions.
155 receiveGossip((GossipEnvelope) message);
156 } else if (message instanceof ClusterEvent.MemberUp) {
157 receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
159 } else if (message instanceof ClusterEvent.ReachableMember) {
160 receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
162 } else if (message instanceof ClusterEvent.MemberRemoved) {
163 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
165 } else if (message instanceof ClusterEvent.UnreachableMember) {
166 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
174 * Remove member from local copy of member list. If member down is self, then stop the actor
176 * @param member who went down
178 private void receiveMemberRemoveOrUnreachable(final Member member) {
179 //if its self, then stop itself
180 if (selfAddress.equals(member.address())) {
181 getContext().stop(getSelf());
185 removePeer(member.address());
186 LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
189 private void addPeer(final Address address) {
190 if (!clusterMembers.contains(address)) {
191 clusterMembers.add(address);
193 peers.computeIfAbsent(address, input -> getContext().system()
194 .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
197 private void removePeer(final Address address) {
198 clusterMembers.remove(address);
199 peers.remove(address);
200 getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
204 * Add member to the local copy of member list if it doesn't already.
206 * @param member the member to add
208 private void receiveMemberUpOrReachable(final Member member) {
209 //ignore up notification for self
210 if (selfAddress.equals(member.address())) {
214 addPeer(member.address());
215 LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
219 * Sends Gossip status to other members in the cluster.
221 * 1. If there are no member, ignore the tick. <br>
222 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
223 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
226 void receiveGossipTick() {
227 final Address address;
228 switch (clusterMembers.size()) {
230 //no members to send gossip status to
233 address = clusterMembers.get(0);
236 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
237 address = clusterMembers.get(randomIndex);
241 LOG.trace("Gossiping to [{}]", address);
242 getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
246 * Process gossip status received from a remote gossiper. Remote versions are compared with
251 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
252 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
253 * <li>If both are same, noop</li>
256 * @param status bucket versions from a remote member
259 void receiveGossipStatus(final GossipStatus status) {
260 // Don't accept messages from non-members
261 if (!peers.containsKey(status.from())) {
265 final ActorRef sender = getSender();
266 Future<Object> futureReply =
267 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
269 futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
273 * Sends the received buckets in the envelope to the parent Bucket store.
275 * @param envelope contains buckets from a remote gossiper
278 <T extends BucketData<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
279 //TODO: Add more validations
280 if (!selfAddress.equals(envelope.to())) {
281 LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
285 updateRemoteBuckets(envelope.getBuckets());
289 * Helper to send received buckets to bucket store.
291 * @param buckets map of Buckets to update
294 <T extends BucketData<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
295 getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
299 * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
301 * @param remote remote node to send Buckets to
302 * @param addresses node addresses whose buckets needs to be sent
304 void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
306 Future<Object> futureReply =
307 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
308 futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
312 * Gets bucket versions from bucket store and sends to the supplied address.
314 * @param remoteActorSystemAddress remote gossiper to send to
317 void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
319 //Get local status from bucket store and send to remote
320 Future<Object> futureReply =
321 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
323 LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
325 futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
329 /// Private factories to create mappers
332 private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
334 return new Mapper<Object, Void>() {
336 public Void apply(final Object replyMessage) {
337 if (replyMessage instanceof GetBucketVersionsReply) {
338 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
339 Map<Address, Long> localVersions = reply.getVersions();
341 remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
349 * Process bucket versions received from
350 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
351 * Then this method compares remote bucket versions with local bucket versions.
353 * <li>The buckets that are newer locally, send
354 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
356 * <li>The buckets that are older locally, send
357 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
358 * to remote so that remote sends GossipEnvelop.
361 * @param sender the remote member
362 * @param status bucket versions from a remote member
363 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
366 private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
368 final Map<Address, Long> remoteVersions = status.getVersions();
370 return new Mapper<Object, Void>() {
372 public Void apply(final Object replyMessage) {
373 if (replyMessage instanceof GetBucketVersionsReply) {
374 GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
375 Map<Address, Long> localVersions = reply.getVersions();
377 //diff between remote list and local
378 Set<Address> localIsOlder = new HashSet<>();
379 localIsOlder.addAll(remoteVersions.keySet());
380 localIsOlder.removeAll(localVersions.keySet());
382 //diff between local list and remote
383 Set<Address> localIsNewer = new HashSet<>();
384 localIsNewer.addAll(localVersions.keySet());
385 localIsNewer.removeAll(remoteVersions.keySet());
388 for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
389 Address address = entry.getKey();
390 Long remoteVersion = entry.getValue();
391 Long localVersion = localVersions.get(address);
392 if (localVersion == null || remoteVersion == null) {
393 //this condition is taken care of by above diffs
397 if (localVersion < remoteVersion) {
398 localIsOlder.add(address);
399 } else if (localVersion > remoteVersion) {
400 localIsNewer.add(address);
404 if (!localIsOlder.isEmpty()) {
405 sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
408 if (!localIsNewer.isEmpty()) {
409 //send newer buckets to remote
410 sendGossipTo(sender, localIsNewer);
419 * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
420 * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
421 * These buckets are sent to a remote member encapsulated in
422 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
424 * @param sender the remote member that sent
425 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
426 * in reply to which bucket is being sent back
427 * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
430 private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
432 return new Mapper<Object, Void>() {
433 @SuppressWarnings({ "rawtypes", "unchecked" })
435 public Void apply(final Object msg) {
436 if (msg instanceof GetBucketsByMembersReply) {
437 Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
438 LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
439 GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
440 sender.tell(envelope, getSelf());
452 void setClusterMembers(final Address... members) {
453 clusterMembers.clear();
456 for (Address addr : members) {