BUG-3128: cache ActorSelections
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.pattern.Patterns;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Verify;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.ThreadLocalRandom;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
34 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
41 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
42 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
43 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
44 import scala.concurrent.Future;
45 import scala.concurrent.duration.FiniteDuration;
46
47 /**
48  * Gossiper that syncs bucket store across nodes in the cluster.
49  *
50  * <p>
51  * It keeps a local scheduler that periodically sends Gossip ticks to
52  * itself to send bucket store's bucket versions to a randomly selected remote
53  * gossiper.
54  *
55  * <p>
56  * When bucket versions are received from a remote gossiper, it is compared
57  * with bucket store's bucket versions. Which ever buckets are newer
58  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
59  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
60  *
61  * <p>
62  * When a bucket is received from a remote gossiper, its sent to the bucket store
63  * for update.
64  */
65 public class Gossiper extends AbstractUntypedActorWithMetering {
66     private final boolean autoStartGossipTicks;
67     private final RemoteRpcProviderConfig config;
68
69     /**
70      * All known cluster members.
71      */
72     private final List<Address> clusterMembers = new ArrayList<>();
73
74     /**
75      * Cached ActorSelections for remote peers.
76      */
77     private final Map<Address, ActorSelection> peers = new HashMap<>();
78
79     /**
80      * ActorSystem's address for the current cluster node.
81      */
82     private Address selfAddress;
83
84     private Cluster cluster;
85
86     private Cancellable gossipTask;
87
88     Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
89         this.config = Preconditions.checkNotNull(config);
90         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
91     }
92
93     Gossiper(final RemoteRpcProviderConfig config) {
94         this(config, Boolean.TRUE);
95     }
96
97     public static Props props(final RemoteRpcProviderConfig config) {
98         return Props.create(Gossiper.class, config);
99     }
100
101     static Props testProps(final RemoteRpcProviderConfig config) {
102         return Props.create(Gossiper.class, config, Boolean.FALSE);
103     }
104
105     @Override
106     public void preStart() {
107         ActorRefProvider provider = getContext().provider();
108         selfAddress = provider.getDefaultAddress();
109
110         if (provider instanceof ClusterActorRefProvider ) {
111             cluster = Cluster.get(getContext().system());
112             cluster.subscribe(getSelf(),
113                     ClusterEvent.initialStateAsEvents(),
114                     ClusterEvent.MemberEvent.class,
115                     ClusterEvent.ReachableMember.class,
116                     ClusterEvent.UnreachableMember.class);
117         }
118
119         if (autoStartGossipTicks) {
120             gossipTask = getContext().system().scheduler().schedule(
121                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
122                     config.getGossipTickInterval(),                 //interval
123                     getSelf(),                                      //target
124                     new Messages.GossiperMessages.GossipTick(),     //message
125                     getContext().dispatcher(),                      //execution context
126                     getSelf()                                       //sender
127             );
128         }
129     }
130
131     @Override
132     public void postStop() {
133         if (cluster != null) {
134             cluster.unsubscribe(getSelf());
135         }
136         if (gossipTask != null) {
137             gossipTask.cancel();
138         }
139     }
140
141     @SuppressWarnings({ "rawtypes", "unchecked" })
142     @Override
143     protected void handleReceive(final Object message) throws Exception {
144         //Usually sent by self via gossip task defined above. But its not enforced.
145         //These ticks can be sent by another actor as well which is esp. useful while testing
146         if (message instanceof GossipTick) {
147             receiveGossipTick();
148         } else if (message instanceof GossipStatus) {
149             // Message from remote gossiper with its bucket versions
150             receiveGossipStatus((GossipStatus) message);
151         } else if (message instanceof GossipEnvelope) {
152             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
153             // message. The contained buckets are newer as determined by the remote gossiper by
154             // comparing the GossipStatus message with its local versions.
155             receiveGossip((GossipEnvelope) message);
156         } else if (message instanceof ClusterEvent.MemberUp) {
157             receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
158
159         } else if (message instanceof ClusterEvent.ReachableMember) {
160             receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
161
162         } else if (message instanceof ClusterEvent.MemberRemoved) {
163             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
164
165         } else if (message instanceof ClusterEvent.UnreachableMember) {
166             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
167
168         } else {
169             unhandled(message);
170         }
171     }
172
173     /**
174      * Remove member from local copy of member list. If member down is self, then stop the actor
175      *
176      * @param member who went down
177      */
178     private void receiveMemberRemoveOrUnreachable(final Member member) {
179         //if its self, then stop itself
180         if (selfAddress.equals(member.address())) {
181             getContext().stop(getSelf());
182             return;
183         }
184
185         removePeer(member.address());
186         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
187     }
188
189     private void addPeer(final Address address) {
190         if (!clusterMembers.contains(address)) {
191             clusterMembers.add(address);
192         }
193         peers.computeIfAbsent(address, input -> getContext().system()
194             .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
195     }
196
197     private void removePeer(final Address address) {
198         clusterMembers.remove(address);
199         peers.remove(address);
200         getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
201     }
202
203     /**
204      * Add member to the local copy of member list if it doesn't already.
205      *
206      * @param member the member to add
207      */
208     private void receiveMemberUpOrReachable(final Member member) {
209         //ignore up notification for self
210         if (selfAddress.equals(member.address())) {
211             return;
212         }
213
214         addPeer(member.address());
215         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
216     }
217
218     /**
219      * Sends Gossip status to other members in the cluster.
220      * <br>
221      * 1. If there are no member, ignore the tick. <br>
222      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
223      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
224      */
225     @VisibleForTesting
226     void receiveGossipTick() {
227         final Address address;
228         switch (clusterMembers.size()) {
229             case 0:
230                 //no members to send gossip status to
231                 return;
232             case 1:
233                 address = clusterMembers.get(0);
234                 break;
235             default:
236                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
237                 address = clusterMembers.get(randomIndex);
238                 break;
239         }
240
241         LOG.trace("Gossiping to [{}]", address);
242         getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
243     }
244
245     /**
246      * Process gossip status received from a remote gossiper. Remote versions are compared with
247      * the local copy.
248      * <p/>
249      * For each bucket
250      * <ul>
251      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
252      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
253      *  <li>If both are same, noop</li>
254      * </ul>
255      *
256      * @param status bucket versions from a remote member
257      */
258     @VisibleForTesting
259     void receiveGossipStatus(final GossipStatus status) {
260         // Don't accept messages from non-members
261         if (!peers.containsKey(status.from())) {
262             return;
263         }
264
265         final ActorRef sender = getSender();
266         Future<Object> futureReply =
267                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
268
269         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
270     }
271
272     /**
273      * Sends the received buckets in the envelope to the parent Bucket store.
274      *
275      * @param envelope contains buckets from a remote gossiper
276      */
277     @VisibleForTesting
278     <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
279         //TODO: Add more validations
280         if (!selfAddress.equals(envelope.to())) {
281             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
282             return;
283         }
284
285         updateRemoteBuckets(envelope.getBuckets());
286     }
287
288     /**
289      * Helper to send received buckets to bucket store.
290      *
291      * @param buckets map of Buckets to update
292      */
293     @VisibleForTesting
294     <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
295         getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
296     }
297
298     /**
299      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
300      *
301      * @param remote     remote node to send Buckets to
302      * @param addresses  node addresses whose buckets needs to be sent
303      */
304     void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
305
306         Future<Object> futureReply =
307                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
308         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
309     }
310
311     /**
312      * Gets bucket versions from bucket store and sends to the supplied address.
313      *
314      * @param remoteActorSystemAddress remote gossiper to send to
315      */
316     @VisibleForTesting
317     void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
318
319         //Get local status from bucket store and send to remote
320         Future<Object> futureReply =
321                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
322
323         LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
324
325         futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
326     }
327
328     ///
329     /// Private factories to create mappers
330     ///
331
332     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
333
334         return new Mapper<Object, Void>() {
335             @Override
336             public Void apply(final Object replyMessage) {
337                 if (replyMessage instanceof GetBucketVersionsReply) {
338                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
339                     Map<Address, Long> localVersions = reply.getVersions();
340
341                     remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
342                 }
343                 return null;
344             }
345         };
346     }
347
348     /**
349      * Process bucket versions received from
350      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
351      * Then this method compares remote bucket versions with local bucket versions.
352      * <ul>
353      *     <li>The buckets that are newer locally, send
354      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
355      *     to remote
356      *     <li>The buckets that are older locally, send
357      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
358      *     to remote so that remote sends GossipEnvelop.
359      * </ul>
360      *
361      * @param sender the remote member
362      * @param status bucket versions from a remote member
363      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
364      *
365      */
366     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
367
368         final Map<Address, Long> remoteVersions = status.getVersions();
369
370         return new Mapper<Object, Void>() {
371             @Override
372             public Void apply(final Object replyMessage) {
373                 if (replyMessage instanceof GetBucketVersionsReply) {
374                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
375                     Map<Address, Long> localVersions = reply.getVersions();
376
377                     //diff between remote list and local
378                     Set<Address> localIsOlder = new HashSet<>();
379                     localIsOlder.addAll(remoteVersions.keySet());
380                     localIsOlder.removeAll(localVersions.keySet());
381
382                     //diff between local list and remote
383                     Set<Address> localIsNewer = new HashSet<>();
384                     localIsNewer.addAll(localVersions.keySet());
385                     localIsNewer.removeAll(remoteVersions.keySet());
386
387
388                     for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
389                         Address address = entry.getKey();
390                         Long remoteVersion = entry.getValue();
391                         Long localVersion = localVersions.get(address);
392                         if (localVersion == null || remoteVersion == null) {
393                             //this condition is taken care of by above diffs
394                             continue;
395                         }
396
397                         if (localVersion < remoteVersion) {
398                             localIsOlder.add(address);
399                         } else if (localVersion > remoteVersion) {
400                             localIsNewer.add(address);
401                         }
402                     }
403
404                     if (!localIsOlder.isEmpty()) {
405                         sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
406                     }
407
408                     if (!localIsNewer.isEmpty()) {
409                         //send newer buckets to remote
410                         sendGossipTo(sender, localIsNewer);
411                     }
412                 }
413                 return null;
414             }
415         };
416     }
417
418     /**
419      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
420      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
421      * These buckets are sent to a remote member encapsulated in
422      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
423      *
424      * @param sender the remote member that sent
425      *           {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
426      *           in reply to which bucket is being sent back
427      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
428      *
429      */
430     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
431
432         return new Mapper<Object, Void>() {
433             @SuppressWarnings({ "rawtypes", "unchecked" })
434             @Override
435             public Void apply(final Object msg) {
436                 if (msg instanceof GetBucketsByMembersReply) {
437                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
438                     LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
439                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
440                     sender.tell(envelope, getSelf());
441                 }
442                 return null;
443             }
444         };
445     }
446
447     ///
448     ///Getter Setters
449     ///
450
451     @VisibleForTesting
452     void setClusterMembers(final Address... members) {
453         clusterMembers.clear();
454         peers.clear();
455
456         for (Address addr : members) {
457             addPeer(addr);
458         }
459     }
460 }