Fix modernization issues
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import static com.google.common.base.Verify.verifyNotNull;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorRefProvider;
15 import akka.actor.ActorSelection;
16 import akka.actor.Address;
17 import akka.actor.Cancellable;
18 import akka.actor.Props;
19 import akka.cluster.Cluster;
20 import akka.cluster.ClusterActorRefProvider;
21 import akka.cluster.ClusterEvent;
22 import akka.cluster.Member;
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.collect.Maps;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.concurrent.ThreadLocalRandom;
33 import java.util.concurrent.TimeUnit;
34 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
35 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
36 import scala.concurrent.duration.FiniteDuration;
37
38 /**
39  * Gossiper that syncs bucket store across nodes in the cluster.
40  *
41  * <p>
42  * It keeps a local scheduler that periodically sends Gossip ticks to
43  * itself to send bucket store's bucket versions to a randomly selected remote
44  * gossiper.
45  *
46  * <p>
47  * When bucket versions are received from a remote gossiper, it is compared
48  * with bucket store's bucket versions. Which ever buckets are newer
49  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
50  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
51  *
52  * <p>
53  * When a bucket is received from a remote gossiper, its sent to the bucket store
54  * for update.
55  */
56 public class Gossiper extends AbstractUntypedActorWithMetering {
57     private static final Object GOSSIP_TICK = new Object() {
58         @Override
59         public String toString() {
60             return "gossip tick";
61         }
62     };
63
64     private final boolean autoStartGossipTicks;
65     private final RemoteOpsProviderConfig config;
66
67     /**
68      * All known cluster members.
69      */
70     private final List<Address> clusterMembers = new ArrayList<>();
71
72     /**
73      * Cached ActorSelections for remote peers.
74      */
75     private final Map<Address, ActorSelection> peers = new HashMap<>();
76
77     /**
78      * ActorSystem's address for the current cluster node.
79      */
80     private Address selfAddress;
81
82     private Cluster cluster;
83
84     private Cancellable gossipTask;
85
86     private BucketStoreAccess bucketStore;
87
88     Gossiper(final RemoteOpsProviderConfig config, final Boolean autoStartGossipTicks) {
89         this.config = requireNonNull(config);
90         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
91     }
92
93     Gossiper(final RemoteOpsProviderConfig config) {
94         this(config, Boolean.TRUE);
95     }
96
97     public static Props props(final RemoteOpsProviderConfig config) {
98         return Props.create(Gossiper.class, config);
99     }
100
101     static Props testProps(final RemoteOpsProviderConfig config) {
102         return Props.create(Gossiper.class, config, Boolean.FALSE);
103     }
104
105     @Override
106     public void preStart() {
107         ActorRefProvider provider = getContext().provider();
108         selfAddress = provider.getDefaultAddress();
109
110         bucketStore = new BucketStoreAccess(getContext().parent(), getContext().dispatcher(), config.getAskDuration());
111
112         if (provider instanceof ClusterActorRefProvider) {
113             cluster = Cluster.get(getContext().system());
114             cluster.subscribe(getSelf(),
115                     ClusterEvent.initialStateAsEvents(),
116                     ClusterEvent.MemberEvent.class,
117                     ClusterEvent.ReachableMember.class,
118                     ClusterEvent.UnreachableMember.class);
119         }
120
121         if (autoStartGossipTicks) {
122             gossipTask = getContext().system().scheduler().schedule(
123                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
124                     config.getGossipTickInterval(),                 //interval
125                     getSelf(),                                      //target
126                     GOSSIP_TICK,                                    //message
127                     getContext().dispatcher(),                      //execution context
128                     getSelf()                                       //sender
129             );
130         }
131     }
132
133     @Override
134     public void postStop() {
135         if (cluster != null) {
136             cluster.unsubscribe(getSelf());
137         }
138         if (gossipTask != null) {
139             gossipTask.cancel();
140         }
141     }
142
143     @Override
144     protected void handleReceive(final Object message) {
145         //Usually sent by self via gossip task defined above. But its not enforced.
146         //These ticks can be sent by another actor as well which is esp. useful while testing
147         if (GOSSIP_TICK.equals(message)) {
148             receiveGossipTick();
149         } else if (message instanceof GossipStatus) {
150             // Message from remote gossiper with its bucket versions
151             receiveGossipStatus((GossipStatus) message);
152         } else if (message instanceof GossipEnvelope) {
153             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
154             // message. The contained buckets are newer as determined by the remote gossiper by
155             // comparing the GossipStatus message with its local versions.
156             receiveGossip((GossipEnvelope) message);
157         } else if (message instanceof ClusterEvent.MemberUp) {
158             receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
159
160         } else if (message instanceof ClusterEvent.ReachableMember) {
161             receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
162
163         } else if (message instanceof ClusterEvent.MemberRemoved) {
164             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
165
166         } else if (message instanceof ClusterEvent.UnreachableMember) {
167             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
168
169         } else {
170             unhandled(message);
171         }
172     }
173
174     /**
175      * Remove member from local copy of member list. If member down is self, then stop the actor
176      *
177      * @param member who went down
178      */
179     private void receiveMemberRemoveOrUnreachable(final Member member) {
180         LOG.debug("Received memberDown or Unreachable: {}", member);
181
182         //if its self, then stop itself
183         if (selfAddress.equals(member.address())) {
184             getContext().stop(getSelf());
185             return;
186         }
187
188         removePeer(member.address());
189         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
190     }
191
192     private void addPeer(final Address address) {
193         if (!clusterMembers.contains(address)) {
194             clusterMembers.add(address);
195         }
196         peers.computeIfAbsent(address, input -> getContext().system()
197             .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
198     }
199
200     private void removePeer(final Address address) {
201         clusterMembers.remove(address);
202         peers.remove(address);
203         bucketStore.removeRemoteBucket(address);
204     }
205
206     /**
207      * Add member to the local copy of member list if it doesn't already.
208      *
209      * @param member the member to add
210      */
211     private void receiveMemberUpOrReachable(final Member member) {
212         LOG.debug("Received memberUp or reachable: {}", member);
213
214         //ignore up notification for self
215         if (selfAddress.equals(member.address())) {
216             return;
217         }
218
219         addPeer(member.address());
220         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
221     }
222
223     /**
224      * Sends Gossip status to other members in the cluster.
225      * <br>
226      * 1. If there are no member, ignore the tick. <br>
227      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
228      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
229      */
230     @VisibleForTesting
231     void receiveGossipTick() {
232         final Address address;
233         switch (clusterMembers.size()) {
234             case 0:
235                 //no members to send gossip status to
236                 return;
237             case 1:
238                 address = clusterMembers.get(0);
239                 break;
240             default:
241                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
242                 address = clusterMembers.get(randomIndex);
243                 break;
244         }
245
246         LOG.trace("Gossiping to [{}]", address);
247         getLocalStatusAndSendTo(verifyNotNull(peers.get(address)));
248     }
249
250     /**
251      * Process gossip status received from a remote gossiper. Remote versions are compared with
252      * the local copy.
253      * <p/>
254      * For each bucket
255      * <ul>
256      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
257      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
258      *  <li>If both are same, noop</li>
259      * </ul>
260      *
261      * @param status bucket versions from a remote member
262      */
263     @VisibleForTesting
264     void receiveGossipStatus(final GossipStatus status) {
265         // Don't accept messages from non-members
266         if (peers.containsKey(status.from())) {
267             // FIXME: sender should be part of GossipStatus
268             final ActorRef sender = getSender();
269             bucketStore.getBucketVersions(versions ->  processRemoteStatus(sender, status, versions));
270         }
271     }
272
273     private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
274             final Map<Address, Long> localVersions) {
275         final Map<Address, Long> remoteVersions = status.versions();
276
277         //diff between remote list and local
278         final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
279         localIsOlder.removeAll(localVersions.keySet());
280
281         //diff between local list and remote
282         final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
283         localIsNewer.removeAll(remoteVersions.keySet());
284
285
286         for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
287             Address address = entry.getKey();
288             Long remoteVersion = entry.getValue();
289             Long localVersion = localVersions.get(address);
290             if (localVersion == null || remoteVersion == null) {
291                 //this condition is taken care of by above diffs
292                 continue;
293             }
294
295             if (localVersion < remoteVersion) {
296                 localIsOlder.add(address);
297             } else if (localVersion > remoteVersion) {
298                 localIsNewer.add(address);
299             }
300         }
301
302         if (!localIsOlder.isEmpty()) {
303             remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
304         }
305
306         if (!localIsNewer.isEmpty()) {
307             //send newer buckets to remote
308             bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
309                 LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
310                 remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
311             });
312         }
313     }
314
315     /**
316      * Sends the received buckets in the envelope to the parent Bucket store.
317      *
318      * @param envelope contains buckets from a remote gossiper
319      */
320     @VisibleForTesting
321     void receiveGossip(final GossipEnvelope envelope) {
322         //TODO: Add more validations
323         if (!selfAddress.equals(envelope.to())) {
324             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
325             return;
326         }
327
328         updateRemoteBuckets(envelope.buckets());
329     }
330
331     /**
332      * Helper to send received buckets to bucket store.
333      *
334      * @param buckets map of Buckets to update
335      */
336     @VisibleForTesting
337     void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
338         // filter this so we only handle buckets for known peers
339         bucketStore.updateRemoteBuckets(Maps.filterKeys(buckets, peers::containsKey));
340     }
341
342     /**
343      * Gets bucket versions from bucket store and sends to the supplied address.
344      *
345      * @param remoteActorSystemAddress remote gossiper to send to
346      */
347     @VisibleForTesting
348     void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
349         bucketStore.getBucketVersions(versions -> {
350             LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
351             /*
352              * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
353              *      but can we identify which bucket is the local one?
354              */
355             remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
356         });
357     }
358
359     ///
360     ///Getter Setters
361     ///
362
363     @VisibleForTesting
364     void setClusterMembers(final Address... members) {
365         clusterMembers.clear();
366         peers.clear();
367
368         for (Address addr : members) {
369             addPeer(addr);
370         }
371     }
372 }