275200f8d5328d73930538ab9fc9d650837d18ca
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Verify;
23 import com.google.common.collect.Maps;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Map.Entry;
30 import java.util.Set;
31 import java.util.concurrent.ThreadLocalRandom;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
34 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
35 import scala.concurrent.duration.FiniteDuration;
36
37 /**
38  * Gossiper that syncs bucket store across nodes in the cluster.
39  *
40  * <p>
41  * It keeps a local scheduler that periodically sends Gossip ticks to
42  * itself to send bucket store's bucket versions to a randomly selected remote
43  * gossiper.
44  *
45  * <p>
46  * When bucket versions are received from a remote gossiper, it is compared
47  * with bucket store's bucket versions. Which ever buckets are newer
48  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
49  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
50  *
51  * <p>
52  * When a bucket is received from a remote gossiper, its sent to the bucket store
53  * for update.
54  */
55 public class Gossiper extends AbstractUntypedActorWithMetering {
56     private static final Object GOSSIP_TICK = new Object() {
57         @Override
58         public String toString() {
59             return "gossip tick";
60         }
61     };
62
63     private final boolean autoStartGossipTicks;
64     private final RemoteRpcProviderConfig config;
65
66     /**
67      * All known cluster members.
68      */
69     private final List<Address> clusterMembers = new ArrayList<>();
70
71     /**
72      * Cached ActorSelections for remote peers.
73      */
74     private final Map<Address, ActorSelection> peers = new HashMap<>();
75
76     /**
77      * ActorSystem's address for the current cluster node.
78      */
79     private Address selfAddress;
80
81     private Cluster cluster;
82
83     private Cancellable gossipTask;
84
85     private BucketStoreAccess bucketStore;
86
87     Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
88         this.config = Preconditions.checkNotNull(config);
89         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
90     }
91
92     Gossiper(final RemoteRpcProviderConfig config) {
93         this(config, Boolean.TRUE);
94     }
95
96     public static Props props(final RemoteRpcProviderConfig config) {
97         return Props.create(Gossiper.class, config);
98     }
99
100     static Props testProps(final RemoteRpcProviderConfig config) {
101         return Props.create(Gossiper.class, config, Boolean.FALSE);
102     }
103
104     @Override
105     public void preStart() {
106         ActorRefProvider provider = getContext().provider();
107         selfAddress = provider.getDefaultAddress();
108
109         bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
110
111         if (provider instanceof ClusterActorRefProvider) {
112             cluster = Cluster.get(getContext().system());
113             cluster.subscribe(getSelf(),
114                     ClusterEvent.initialStateAsEvents(),
115                     ClusterEvent.MemberEvent.class,
116                     ClusterEvent.ReachableMember.class,
117                     ClusterEvent.UnreachableMember.class);
118         }
119
120         if (autoStartGossipTicks) {
121             gossipTask = getContext().system().scheduler().schedule(
122                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
123                     config.getGossipTickInterval(),                 //interval
124                     getSelf(),                                      //target
125                     GOSSIP_TICK,                                    //message
126                     getContext().dispatcher(),                      //execution context
127                     getSelf()                                       //sender
128             );
129         }
130     }
131
132     @Override
133     public void postStop() {
134         if (cluster != null) {
135             cluster.unsubscribe(getSelf());
136         }
137         if (gossipTask != null) {
138             gossipTask.cancel();
139         }
140     }
141
142     @Override
143     protected void handleReceive(final Object message) throws Exception {
144         //Usually sent by self via gossip task defined above. But its not enforced.
145         //These ticks can be sent by another actor as well which is esp. useful while testing
146         if (GOSSIP_TICK.equals(message)) {
147             receiveGossipTick();
148         } else if (message instanceof GossipStatus) {
149             // Message from remote gossiper with its bucket versions
150             receiveGossipStatus((GossipStatus) message);
151         } else if (message instanceof GossipEnvelope) {
152             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
153             // message. The contained buckets are newer as determined by the remote gossiper by
154             // comparing the GossipStatus message with its local versions.
155             receiveGossip((GossipEnvelope) message);
156         } else if (message instanceof ClusterEvent.MemberUp) {
157             receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
158
159         } else if (message instanceof ClusterEvent.ReachableMember) {
160             receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
161
162         } else if (message instanceof ClusterEvent.MemberRemoved) {
163             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
164
165         } else if (message instanceof ClusterEvent.UnreachableMember) {
166             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
167
168         } else {
169             unhandled(message);
170         }
171     }
172
173     /**
174      * Remove member from local copy of member list. If member down is self, then stop the actor
175      *
176      * @param member who went down
177      */
178     private void receiveMemberRemoveOrUnreachable(final Member member) {
179         LOG.debug("Received memberDown or Unreachable: {}", member);
180
181         //if its self, then stop itself
182         if (selfAddress.equals(member.address())) {
183             getContext().stop(getSelf());
184             return;
185         }
186
187         removePeer(member.address());
188         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
189     }
190
191     private void addPeer(final Address address) {
192         if (!clusterMembers.contains(address)) {
193             clusterMembers.add(address);
194         }
195         peers.computeIfAbsent(address, input -> getContext().system()
196             .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
197     }
198
199     private void removePeer(final Address address) {
200         clusterMembers.remove(address);
201         peers.remove(address);
202         bucketStore.removeRemoteBucket(address);
203     }
204
205     /**
206      * Add member to the local copy of member list if it doesn't already.
207      *
208      * @param member the member to add
209      */
210     private void receiveMemberUpOrReachable(final Member member) {
211         LOG.debug("Received memberUp or reachable: {}", member);
212
213         //ignore up notification for self
214         if (selfAddress.equals(member.address())) {
215             return;
216         }
217
218         addPeer(member.address());
219         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
220     }
221
222     /**
223      * Sends Gossip status to other members in the cluster.
224      * <br>
225      * 1. If there are no member, ignore the tick. <br>
226      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
227      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
228      */
229     @VisibleForTesting
230     void receiveGossipTick() {
231         final Address address;
232         switch (clusterMembers.size()) {
233             case 0:
234                 //no members to send gossip status to
235                 return;
236             case 1:
237                 address = clusterMembers.get(0);
238                 break;
239             default:
240                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
241                 address = clusterMembers.get(randomIndex);
242                 break;
243         }
244
245         LOG.trace("Gossiping to [{}]", address);
246         getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
247     }
248
249     /**
250      * Process gossip status received from a remote gossiper. Remote versions are compared with
251      * the local copy.
252      * <p/>
253      * For each bucket
254      * <ul>
255      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
256      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
257      *  <li>If both are same, noop</li>
258      * </ul>
259      *
260      * @param status bucket versions from a remote member
261      */
262     @VisibleForTesting
263     void receiveGossipStatus(final GossipStatus status) {
264         // Don't accept messages from non-members
265         if (peers.containsKey(status.from())) {
266             // FIXME: sender should be part of GossipStatus
267             final ActorRef sender = getSender();
268             bucketStore.getBucketVersions(versions ->  processRemoteStatus(sender, status, versions));
269         }
270     }
271
272     private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
273             final Map<Address, Long> localVersions) {
274         final Map<Address, Long> remoteVersions = status.versions();
275
276         //diff between remote list and local
277         final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
278         localIsOlder.removeAll(localVersions.keySet());
279
280         //diff between local list and remote
281         final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
282         localIsNewer.removeAll(remoteVersions.keySet());
283
284
285         for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
286             Address address = entry.getKey();
287             Long remoteVersion = entry.getValue();
288             Long localVersion = localVersions.get(address);
289             if (localVersion == null || remoteVersion == null) {
290                 //this condition is taken care of by above diffs
291                 continue;
292             }
293
294             if (localVersion < remoteVersion) {
295                 localIsOlder.add(address);
296             } else if (localVersion > remoteVersion) {
297                 localIsNewer.add(address);
298             }
299         }
300
301         if (!localIsOlder.isEmpty()) {
302             remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
303         }
304
305         if (!localIsNewer.isEmpty()) {
306             //send newer buckets to remote
307             bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
308                 LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
309                 remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
310             });
311         }
312     }
313
314     /**
315      * Sends the received buckets in the envelope to the parent Bucket store.
316      *
317      * @param envelope contains buckets from a remote gossiper
318      */
319     @VisibleForTesting
320     void receiveGossip(final GossipEnvelope envelope) {
321         //TODO: Add more validations
322         if (!selfAddress.equals(envelope.to())) {
323             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
324             return;
325         }
326
327         updateRemoteBuckets(envelope.buckets());
328     }
329
330     /**
331      * Helper to send received buckets to bucket store.
332      *
333      * @param buckets map of Buckets to update
334      */
335     @VisibleForTesting
336     void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
337         // filter this so we only handle buckets for known peers
338         bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey));
339     }
340
341     /**
342      * Gets bucket versions from bucket store and sends to the supplied address.
343      *
344      * @param remoteActorSystemAddress remote gossiper to send to
345      */
346     @VisibleForTesting
347     void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
348         bucketStore.getBucketVersions(versions -> {
349             LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
350             /*
351              * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
352              *      but can we identify which bucket is the local one?
353              */
354             remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
355         });
356     }
357
358     ///
359     ///Getter Setters
360     ///
361
362     @VisibleForTesting
363     void setClusterMembers(final Address... members) {
364         clusterMembers.clear();
365         peers.clear();
366
367         for (Address addr : members) {
368             addPeer(addr);
369         }
370     }
371 }