2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Verify;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
28 import java.util.Map.Entry;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
32 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
33 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
34 import scala.concurrent.duration.FiniteDuration;
37 * Gossiper that syncs bucket store across nodes in the cluster.
40 * It keeps a local scheduler that periodically sends Gossip ticks to
41 * itself to send bucket store's bucket versions to a randomly selected remote
45 * When bucket versions are received from a remote gossiper, it is compared
46 * with bucket store's bucket versions. Which ever buckets are newer
47 * locally, are sent to remote gossiper. If any bucket is older in bucket store,
48 * a gossip status is sent to remote gossiper so that it can send the newer buckets.
51 * When a bucket is received from a remote gossiper, its sent to the bucket store
54 public class Gossiper extends AbstractUntypedActorWithMetering {
55 private static final Object GOSSIP_TICK = new Object() {
57 public String toString() {
62 private final boolean autoStartGossipTicks;
63 private final RemoteRpcProviderConfig config;
66 * All known cluster members.
68 private final List<Address> clusterMembers = new ArrayList<>();
71 * Cached ActorSelections for remote peers.
73 private final Map<Address, ActorSelection> peers = new HashMap<>();
76 * ActorSystem's address for the current cluster node.
78 private Address selfAddress;
80 private Cluster cluster;
82 private Cancellable gossipTask;
84 private BucketStoreAccess bucketStore;
86 Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
87 this.config = Preconditions.checkNotNull(config);
88 this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
91 Gossiper(final RemoteRpcProviderConfig config) {
92 this(config, Boolean.TRUE);
95 public static Props props(final RemoteRpcProviderConfig config) {
96 return Props.create(Gossiper.class, config);
99 static Props testProps(final RemoteRpcProviderConfig config) {
100 return Props.create(Gossiper.class, config, Boolean.FALSE);
104 public void preStart() {
105 ActorRefProvider provider = getContext().provider();
106 selfAddress = provider.getDefaultAddress();
108 bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
110 if (provider instanceof ClusterActorRefProvider ) {
111 cluster = Cluster.get(getContext().system());
112 cluster.subscribe(getSelf(),
113 ClusterEvent.initialStateAsEvents(),
114 ClusterEvent.MemberEvent.class,
115 ClusterEvent.ReachableMember.class,
116 ClusterEvent.UnreachableMember.class);
119 if (autoStartGossipTicks) {
120 gossipTask = getContext().system().scheduler().schedule(
121 new FiniteDuration(1, TimeUnit.SECONDS), //initial delay
122 config.getGossipTickInterval(), //interval
124 GOSSIP_TICK, //message
125 getContext().dispatcher(), //execution context
132 public void postStop() {
133 if (cluster != null) {
134 cluster.unsubscribe(getSelf());
136 if (gossipTask != null) {
142 protected void handleReceive(final Object message) throws Exception {
143 //Usually sent by self via gossip task defined above. But its not enforced.
144 //These ticks can be sent by another actor as well which is esp. useful while testing
145 if (GOSSIP_TICK.equals(message)) {
147 } else if (message instanceof GossipStatus) {
148 // Message from remote gossiper with its bucket versions
149 receiveGossipStatus((GossipStatus) message);
150 } else if (message instanceof GossipEnvelope) {
151 // Message from remote gossiper with buckets. This is usually in response to GossipStatus
152 // message. The contained buckets are newer as determined by the remote gossiper by
153 // comparing the GossipStatus message with its local versions.
154 receiveGossip((GossipEnvelope) message);
155 } else if (message instanceof ClusterEvent.MemberUp) {
156 receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
158 } else if (message instanceof ClusterEvent.ReachableMember) {
159 receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
161 } else if (message instanceof ClusterEvent.MemberRemoved) {
162 receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
164 } else if (message instanceof ClusterEvent.UnreachableMember) {
165 receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
173 * Remove member from local copy of member list. If member down is self, then stop the actor
175 * @param member who went down
177 private void receiveMemberRemoveOrUnreachable(final Member member) {
178 //if its self, then stop itself
179 if (selfAddress.equals(member.address())) {
180 getContext().stop(getSelf());
184 removePeer(member.address());
185 LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
188 private void addPeer(final Address address) {
189 if (!clusterMembers.contains(address)) {
190 clusterMembers.add(address);
192 peers.computeIfAbsent(address, input -> getContext().system()
193 .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
196 private void removePeer(final Address address) {
197 clusterMembers.remove(address);
198 peers.remove(address);
199 bucketStore.removeRemoteBucket(address);
203 * Add member to the local copy of member list if it doesn't already.
205 * @param member the member to add
207 private void receiveMemberUpOrReachable(final Member member) {
208 //ignore up notification for self
209 if (selfAddress.equals(member.address())) {
213 addPeer(member.address());
214 LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
218 * Sends Gossip status to other members in the cluster.
220 * 1. If there are no member, ignore the tick. <br>
221 * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
222 * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
225 void receiveGossipTick() {
226 final Address address;
227 switch (clusterMembers.size()) {
229 //no members to send gossip status to
232 address = clusterMembers.get(0);
235 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
236 address = clusterMembers.get(randomIndex);
240 LOG.trace("Gossiping to [{}]", address);
241 getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
245 * Process gossip status received from a remote gossiper. Remote versions are compared with
250 * <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
251 * <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
252 * <li>If both are same, noop</li>
255 * @param status bucket versions from a remote member
258 void receiveGossipStatus(final GossipStatus status) {
259 // Don't accept messages from non-members
260 if (peers.containsKey(status.from())) {
261 // FIXME: sender should be part of GossipStatus
262 final ActorRef sender = getSender();
263 bucketStore.getBucketVersions(versions -> processRemoteStatus(sender, status, versions));
267 private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
268 final Map<Address, Long> localVersions) {
269 final Map<Address, Long> remoteVersions = status.versions();
271 //diff between remote list and local
272 final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
273 localIsOlder.removeAll(localVersions.keySet());
275 //diff between local list and remote
276 final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
277 localIsNewer.removeAll(remoteVersions.keySet());
280 for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
281 Address address = entry.getKey();
282 Long remoteVersion = entry.getValue();
283 Long localVersion = localVersions.get(address);
284 if (localVersion == null || remoteVersion == null) {
285 //this condition is taken care of by above diffs
289 if (localVersion < remoteVersion) {
290 localIsOlder.add(address);
291 } else if (localVersion > remoteVersion) {
292 localIsNewer.add(address);
296 if (!localIsOlder.isEmpty()) {
297 remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
300 if (!localIsNewer.isEmpty()) {
301 //send newer buckets to remote
302 bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
303 LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
304 remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
310 * Sends the received buckets in the envelope to the parent Bucket store.
312 * @param envelope contains buckets from a remote gossiper
315 void receiveGossip(final GossipEnvelope envelope) {
316 //TODO: Add more validations
317 if (!selfAddress.equals(envelope.to())) {
318 LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
322 updateRemoteBuckets(envelope.buckets());
326 * Helper to send received buckets to bucket store.
328 * @param buckets map of Buckets to update
331 void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
332 bucketStore.updateRemoteBuckets(buckets);
336 * Gets bucket versions from bucket store and sends to the supplied address.
338 * @param remoteActorSystemAddress remote gossiper to send to
341 void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
342 bucketStore.getBucketVersions(versions -> {
343 LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
345 * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
346 * but can we identify which bucket is the local one?
348 remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
357 void setClusterMembers(final Address... members) {
358 clusterMembers.clear();
361 for (Address addr : members) {