2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Verify;
23 import com.google.common.collect.Maps;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
29 import java.util.Map.Entry;
31 import java.util.concurrent.ThreadLocalRandom;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
34 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
35 import scala.concurrent.duration.FiniteDuration;
38 * Gossiper that syncs bucket store across nodes in the cluster.
41 * It keeps a local scheduler that periodically sends Gossip ticks to
42 * itself to send bucket store's bucket versions to a randomly selected remote
46 * When bucket versions are received from a remote gossiper, it is compared
47 * with bucket store's bucket versions. Which ever buckets are newer
48 * locally, are sent to remote gossiper. If any bucket is older in bucket store,
49 * a gossip status is sent to remote gossiper so that it can send the newer buckets.
52 * When a bucket is received from a remote gossiper, its sent to the bucket store
55 public class Gossiper extends AbstractUntypedActorWithMetering {
56 private static final Object GOSSIP_TICK = new Object() {
58 public String toString() {
63 private final boolean autoStartGossipTicks;
64 private final RemoteOpsProviderConfig config;
67 * All known cluster members.
69 private final List<Address> clusterMembers = new ArrayList<>();
72 * Cached ActorSelections for remote peers.
74 private final Map<Address, ActorSelection> peers = new HashMap<>();
77 * ActorSystem's address for the current cluster node.
79 private Address selfAddress;
81 private Cluster cluster;
83 private Cancellable gossipTask;
85 private BucketStoreAccess bucketStore;
87 Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) {
88 this.config = Preconditions.checkNotNull(config);
89 this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
92 Gossiper(final RemoteOpsProviderConfig config) {
93 this(config, Boolean.TRUE);
96 public static Props props(final RemoteOpsProviderConfig config) {
97 return Props.create(Gossiper.class, config);
100 static Props testProps(final RemoteOpsProviderConfig config) {
101 return Props.create(Gossiper.class, config, Boolean.FALSE);
105 public void preStart() {
106 ActorRefProvider provider = getContext().provider();
107 selfAddress = provider.getDefaultAddress();
109 bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
111 if (provider instanceof ClusterActorRefProvider) {
112 cluster = Cluster.get(getContext().system());
113 cluster.subscribe(getSelf(),
114 ClusterEvent.initialStateAsEvents(),
115 ClusterEvent.MemberEvent.class,
116 ClusterEvent.ReachableMember.class,
117 ClusterEvent.UnreachableMember.class);
120 if (autoStartGossipTicks) {
121 gossipTask = getContext().system().scheduler().schedule(
122 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
123 config.getGossipTickInterval(), //interval
125 GOSSIP_TICK, //message
126 getContext().dispatcher(), //execution context
133 public void postStop() {
134 if (cluster != null) {
135 cluster.unsubscribe(getSelf());
137 if (gossipTask != null) {
143 protected void handleReceive(final Object message) {
144 //Usually sent by self via gossip task defined above. But its not enforced.
145 //These ticks can be sent by another actor as well which is esp. useful while testing
146 if (GOSSIP_TICK.equals(message)) {
148 } else if (message instanceof GossipStatus) {
149 // Message from remote gossiper with its bucket versions
150 receiveGossipStatus((GossipStatus) message);
151 } else if (message instanceof GossipEnvelope) {
152 // Message from remote gossiper with buckets. This is usually in response to GossipStatus
153 // message. The contained buckets are newer as determined by the remote gossiper by
154 // comparing the GossipStatus message with its local versions.
155 receiveGossip((GossipEnvelope) message);
156 } else if (message instanceof ClusterEvent.MemberUp) {
157 receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
159 } else if (message instanceof ClusterEvent.ReachableMember) {
160 receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
162 } else if (message instanceof ClusterEvent.MemberRemoved) {
163 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
165 } else if (message instanceof ClusterEvent.UnreachableMember) {
166 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
174 * Remove member from local copy of member list. If member down is self, then stop the actor
176 * @param member who went down
178 private void receiveMemberRemoveOrUnreachable(final Member member) {
179 LOG.debug("Received memberDown or Unreachable: {}", member);
181 //if its self, then stop itself
182 if (selfAddress.equals(member.address())) {
183 getContext().stop(getSelf());
187 removePeer(member.address());
188 LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
191 private void addPeer(final Address address) {
192 if (!clusterMembers.contains(address)) {
193 clusterMembers.add(address);
195 peers.computeIfAbsent(address, input -> getContext().system()
196 .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
199 private void removePeer(final Address address) {
200 clusterMembers.remove(address);
201 peers.remove(address);
202 bucketStore.removeRemoteBucket(address);
206 * Add member to the local copy of member list if it doesn't already.
208 * @param member the member to add
210 private void receiveMemberUpOrReachable(final Member member) {
211 LOG.debug("Received memberUp or reachable: {}", member);
213 //ignore up notification for self
214 if (selfAddress.equals(member.address())) {
218 addPeer(member.address());
219 LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
223 * Sends Gossip status to other members in the cluster.
225 * 1. If there are no member, ignore the tick. <br>
226 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
227 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
230 void receiveGossipTick() {
231 final Address address;
232 switch (clusterMembers.size()) {
234 //no members to send gossip status to
237 address = clusterMembers.get(0);
240 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
241 address = clusterMembers.get(randomIndex);
245 LOG.trace("Gossiping to [{}]", address);
246 getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
250 * Process gossip status received from a remote gossiper. Remote versions are compared with
255 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
256 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
257 * <li>If both are same, noop</li>
260 * @param status bucket versions from a remote member
263 void receiveGossipStatus(final GossipStatus status) {
264 // Don't accept messages from non-members
265 if (peers.containsKey(status.from())) {
266 // FIXME: sender should be part of GossipStatus
267 final ActorRef sender = getSender();
268 bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions));
272 private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
273 final Map<Address, Long> localVersions) {
274 final Map<Address, Long> remoteVersions = status.versions();
276 //diff between remote list and local
277 final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
278 localIsOlder.removeAll(localVersions.keySet());
280 //diff between local list and remote
281 final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
282 localIsNewer.removeAll(remoteVersions.keySet());
285 for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
286 Address address = entry.getKey();
287 Long remoteVersion = entry.getValue();
288 Long localVersion = localVersions.get(address);
289 if (localVersion == null || remoteVersion == null) {
290 //this condition is taken care of by above diffs
294 if (localVersion < remoteVersion) {
295 localIsOlder.add(address);
296 } else if (localVersion > remoteVersion) {
297 localIsNewer.add(address);
301 if (!localIsOlder.isEmpty()) {
302 remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
305 if (!localIsNewer.isEmpty()) {
306 //send newer buckets to remote
307 bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
308 LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
309 remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
315 * Sends the received buckets in the envelope to the parent Bucket store.
317 * @param envelope contains buckets from a remote gossiper
320 void receiveGossip(final GossipEnvelope envelope) {
321 //TODO: Add more validations
322 if (!selfAddress.equals(envelope.to())) {
323 LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
327 updateRemoteBuckets(envelope.buckets());
331 * Helper to send received buckets to bucket store.
333 * @param buckets map of Buckets to update
336 void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
337 // filter this so we only handle buckets for known peers
338 bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey));
342 * Gets bucket versions from bucket store and sends to the supplied address.
344 * @param remoteActorSystemAddress remote gossiper to send to
347 void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
348 bucketStore.getBucketVersions(versions -> {
349 LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
351 * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
352 * but can we identify which bucket is the local one?
354 remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
363 void setClusterMembers(final Address... members) {
364 clusterMembers.clear();
367 for (Address addr : members) {