BUG-6937: Add ReachableMember case to Gossiper
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Preconditions;
23 import java.util.ArrayList;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ThreadLocalRandom;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
31 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.FiniteDuration;
44
45 /**
46  * Gossiper that syncs bucket store across nodes in the cluster.
47  *
48  * <p>
49  * It keeps a local scheduler that periodically sends Gossip ticks to
50  * itself to send bucket store's bucket versions to a randomly selected remote
51  * gossiper.
52  *
53  * <p>
54  * When bucket versions are received from a remote gossiper, it is compared
55  * with bucket store's bucket versions. Which ever buckets are newer
56  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
57  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
58  *
59  * <p>
60  * When a bucket is received from a remote gossiper, its sent to the bucket store
61  * for update.
62  */
63 public class Gossiper extends AbstractUntypedActorWithMetering {
64
65     private final Logger log = LoggerFactory.getLogger(getClass());
66
67     private Cluster cluster;
68
69     /**
70      * ActorSystem's address for the current cluster node.
71      */
72     private Address selfAddress;
73
74     /**
75      * All known cluster members.
76      */
77     private List<Address> clusterMembers = new ArrayList<>();
78
79     private Cancellable gossipTask;
80
81     private Boolean autoStartGossipTicks = true;
82
83     private final RemoteRpcProviderConfig config;
84
85     public Gossiper(RemoteRpcProviderConfig config) {
86         this.config = Preconditions.checkNotNull(config);
87     }
88
89     /**
90      * Constructor for testing.
91      *
92      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
93      *                             Gossip tick can be manually sent.
94      */
95     @VisibleForTesting
96     public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
97         this(config);
98         this.autoStartGossipTicks = autoStartGossipTicks;
99     }
100
101     @Override
102     public void preStart() {
103         ActorRefProvider provider = getContext().provider();
104         selfAddress = provider.getDefaultAddress();
105
106         if ( provider instanceof ClusterActorRefProvider ) {
107             cluster = Cluster.get(getContext().system());
108             cluster.subscribe(getSelf(),
109                     ClusterEvent.initialStateAsEvents(),
110                     ClusterEvent.MemberEvent.class,
111                     ClusterEvent.ReachableMember.class,
112                     ClusterEvent.UnreachableMember.class);
113         }
114
115         if (autoStartGossipTicks) {
116             gossipTask = getContext().system().scheduler().schedule(
117                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
118                     config.getGossipTickInterval(),                 //interval
119                     getSelf(),                                       //target
120                     new Messages.GossiperMessages.GossipTick(),      //message
121                     getContext().dispatcher(),                       //execution context
122                     getSelf()                                        //sender
123             );
124         }
125     }
126
127     @Override
128     public void postStop() {
129         if (cluster != null) {
130             cluster.unsubscribe(getSelf());
131         }
132         if (gossipTask != null) {
133             gossipTask.cancel();
134         }
135     }
136
137     @SuppressWarnings({ "rawtypes", "unchecked" })
138     @Override
139     protected void handleReceive(Object message) throws Exception {
140         //Usually sent by self via gossip task defined above. But its not enforced.
141         //These ticks can be sent by another actor as well which is esp. useful while testing
142         if (message instanceof GossipTick) {
143             receiveGossipTick();
144         } else if (message instanceof GossipStatus) {
145             // Message from remote gossiper with its bucket versions
146             receiveGossipStatus((GossipStatus) message);
147         } else if (message instanceof GossipEnvelope) {
148             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
149             // message. The contained buckets are newer as determined by the remote gossiper by
150             // comparing the GossipStatus message with its local versions.
151             receiveGossip((GossipEnvelope) message);
152         } else if (message instanceof ClusterEvent.MemberUp) {
153             receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
154
155         } else if (message instanceof ClusterEvent.ReachableMember) {
156             receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
157
158         } else if (message instanceof ClusterEvent.MemberRemoved) {
159             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
160
161         } else if ( message instanceof ClusterEvent.UnreachableMember) {
162             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
163
164         } else {
165             unhandled(message);
166         }
167     }
168
169     /**
170      * Remove member from local copy of member list. If member down is self, then stop the actor
171      *
172      * @param member who went down
173      */
174     void receiveMemberRemoveOrUnreachable(Member member) {
175         //if its self, then stop itself
176         if (selfAddress.equals(member.address())) {
177             getContext().stop(getSelf());
178             return;
179         }
180
181         clusterMembers.remove(member.address());
182         log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
183     }
184
185     /**
186      * Add member to the local copy of member list if it doesn't already.
187      *
188      * @param member the member to add
189      */
190     void receiveMemberUpOrReachable(final Member member) {
191
192         if (selfAddress.equals(member.address())) {
193             //ignore up notification for self
194             return;
195         }
196
197         if (!clusterMembers.contains(member.address())) {
198             clusterMembers.add(member.address());
199         }
200
201         log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
202     }
203
204     /**
205      * Sends Gossip status to other members in the cluster.
206      * <br>
207      * 1. If there are no member, ignore the tick. <br>
208      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
209      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
210      */
211     void receiveGossipTick() {
212         if (clusterMembers.size() == 0) {
213             return; //no members to send gossip status to
214         }
215
216         Address remoteMemberToGossipTo;
217
218         if (clusterMembers.size() == 1) {
219             remoteMemberToGossipTo = clusterMembers.get(0);
220         } else {
221             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
222             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
223         }
224
225         log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
226         getLocalStatusAndSendTo(remoteMemberToGossipTo);
227     }
228
229     /**
230      * Process gossip status received from a remote gossiper. Remote versions are compared with
231      * the local copy.
232      * <p/>
233      * For each bucket
234      * <ul>
235      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
236      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
237      *  <li>If both are same, noop</li>
238      * </ul>
239      *
240      * @param status bucket versions from a remote member
241      */
242     void receiveGossipStatus(GossipStatus status) {
243         //Don't accept messages from non-members
244         if (!clusterMembers.contains(status.from())) {
245             return;
246         }
247
248         final ActorRef sender = getSender();
249         Future<Object> futureReply =
250                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
251
252         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
253
254     }
255
256     /**
257      * Sends the received buckets in the envelope to the parent Bucket store.
258      *
259      * @param envelope contains buckets from a remote gossiper
260      */
261     <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
262         //TODO: Add more validations
263         if (!selfAddress.equals(envelope.to())) {
264             if (log.isTraceEnabled()) {
265                 log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
266                         envelope.to());
267             }
268             return;
269         }
270
271         updateRemoteBuckets(envelope.getBuckets());
272
273     }
274
275     /**
276      * Helper to send received buckets to bucket store.
277      *
278      * @param buckets map of Buckets to update
279      */
280     <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
281         UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
282         getContext().parent().tell(updateRemoteBuckets, getSelf());
283     }
284
285     /**
286      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
287      *
288      * @param remote     remote node to send Buckets to
289      * @param addresses  node addresses whose buckets needs to be sent
290      */
291     void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
292
293         Future<Object> futureReply =
294                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
295         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
296     }
297
298     /**
299      * Gets bucket versions from bucket store and sends to the supplied address.
300      *
301      * @param remoteActorSystemAddress remote gossiper to send to
302      */
303     void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
304
305         //Get local status from bucket store and send to remote
306         Future<Object> futureReply =
307                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
308
309         //Find gossiper on remote system
310         ActorSelection remoteRef = getContext().system().actorSelection(
311                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
312
313         log.trace("Sending bucket versions to [{}]", remoteRef);
314
315         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
316
317     }
318
319     /**
320      * Helper to send bucket versions received from local store.
321      *
322      * @param remote        remote gossiper to send versions to
323      * @param localVersions bucket versions received from local store
324      */
325     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
326
327         GossipStatus status = new GossipStatus(selfAddress, localVersions);
328         remote.tell(status, getSelf());
329     }
330
331     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
332
333         GossipStatus status = new GossipStatus(selfAddress, localVersions);
334         remote.tell(status, getSelf());
335     }
336
337     ///
338     /// Private factories to create mappers
339     ///
340
341     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
342
343         return new Mapper<Object, Void>() {
344             @Override
345             public Void apply(Object replyMessage) {
346                 if (replyMessage instanceof GetBucketVersionsReply) {
347                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
348                     Map<Address, Long> localVersions = reply.getVersions();
349
350                     sendGossipStatusTo(remote, localVersions);
351
352                 }
353                 return null;
354             }
355         };
356     }
357
358     /**
359      * Process bucket versions received from
360      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
361      * Then this method compares remote bucket versions with local bucket versions.
362      * <ul>
363      *     <li>The buckets that are newer locally, send
364      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
365      *     to remote
366      *     <li>The buckets that are older locally, send
367      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
368      *     to remote so that remote sends GossipEnvelop.
369      * </ul>
370      *
371      * @param sender the remote member
372      * @param status bucket versions from a remote member
373      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
374      *
375      */
376     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
377
378         final Map<Address, Long> remoteVersions = status.getVersions();
379
380         return new Mapper<Object, Void>() {
381             @Override
382             public Void apply(Object replyMessage) {
383                 if (replyMessage instanceof GetBucketVersionsReply) {
384                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
385                     Map<Address, Long> localVersions = reply.getVersions();
386
387                     //diff between remote list and local
388                     Set<Address> localIsOlder = new HashSet<>();
389                     localIsOlder.addAll(remoteVersions.keySet());
390                     localIsOlder.removeAll(localVersions.keySet());
391
392                     //diff between local list and remote
393                     Set<Address> localIsNewer = new HashSet<>();
394                     localIsNewer.addAll(localVersions.keySet());
395                     localIsNewer.removeAll(remoteVersions.keySet());
396
397
398                     for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
399                         Address address = entry.getKey();
400                         Long remoteVersion = entry.getValue();
401                         Long localVersion = localVersions.get(address);
402                         if (localVersion == null || remoteVersion == null) {
403                             continue; //this condition is taken care of by above diffs
404                         }
405
406                         if (localVersion < remoteVersion) {
407                             localIsOlder.add(address);
408                         } else if (localVersion > remoteVersion) {
409                             localIsNewer.add(address);
410                         }
411                     }
412
413                     if (!localIsOlder.isEmpty()) {
414                         sendGossipStatusTo(sender, localVersions );
415                     }
416
417                     if (!localIsNewer.isEmpty()) {
418                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
419                     }
420
421                 }
422                 return null;
423             }
424         };
425     }
426
427     /**
428      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
429      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
430      * These buckets are sent to a remote member encapsulated in
431      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
432      *
433      * @param sender the remote member that sent
434      *           {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
435      *           in reply to which bucket is being sent back
436      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
437      *
438      */
439     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
440
441         return new Mapper<Object, Void>() {
442             @SuppressWarnings({ "rawtypes", "unchecked" })
443             @Override
444             public Void apply(Object msg) {
445                 if (msg instanceof GetBucketsByMembersReply) {
446                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
447                     log.trace("Buckets to send from {}: {}", selfAddress, buckets);
448                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
449                     sender.tell(envelope, getSelf());
450                 }
451                 return null;
452             }
453         };
454     }
455
456     ///
457     ///Getter Setters
458     ///
459     List<Address> getClusterMembers() {
460         return clusterMembers;
461     }
462
463     void setClusterMembers(List<Address> clusterMembers) {
464         this.clusterMembers = clusterMembers;
465     }
466
467     Address getSelfAddress() {
468         return selfAddress;
469     }
470
471     void setSelfAddress(Address selfAddress) {
472         this.selfAddress = selfAddress;
473     }
474 }