261da12ae13652889c280b6ef5deddb6d8b25944
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.pattern.Patterns;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
32 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
33 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
41 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
42 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
43 import scala.concurrent.Future;
44 import scala.concurrent.duration.FiniteDuration;
45
46 /**
47  * Gossiper that syncs bucket store across nodes in the cluster.
48  * <p/>
49  * It keeps a local scheduler that periodically sends Gossip ticks to
50  * itself to send bucket store's bucket versions to a randomly selected remote
51  * gossiper.
52  * <p/>
53  * When bucket versions are received from a remote gossiper, it is compared
54  * with bucket store's bucket versions. Which ever buckets are newer
55  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
56  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
57  * <p/>
58  * When a bucket is received from a remote gossiper, its sent to the bucket store
59  * for update.
60  *
61  */
62
63 public class Gossiper extends AbstractUntypedActorWithMetering {
64     private final boolean autoStartGossipTicks;
65     private final RemoteRpcProviderConfig config;
66
67     /**
68      * All known cluster members.
69      */
70     private final List<Address> clusterMembers = new ArrayList<>();
71
72     /**
73      * ActorSystem's address for the current cluster node.
74      */
75     private Address selfAddress;
76
77     private Cluster cluster;
78
79     private Cancellable gossipTask;
80
81     Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
82         this.config = Preconditions.checkNotNull(config);
83         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
84     }
85
86     Gossiper(final RemoteRpcProviderConfig config) {
87         this(config, Boolean.TRUE);
88     }
89
90     public static Props props(final RemoteRpcProviderConfig config) {
91         return Props.create(Gossiper.class, config);
92     }
93
94     static Props testProps(final RemoteRpcProviderConfig config) {
95         return Props.create(Gossiper.class, config, Boolean.FALSE);
96     }
97
98     @Override
99     public void preStart(){
100         ActorRefProvider provider = getContext().provider();
101         selfAddress = provider.getDefaultAddress();
102
103         if (provider instanceof ClusterActorRefProvider ) {
104             cluster = Cluster.get(getContext().system());
105             cluster.subscribe(getSelf(),
106                     ClusterEvent.initialStateAsEvents(),
107                     ClusterEvent.MemberEvent.class,
108                     ClusterEvent.ReachableMember.class,
109                     ClusterEvent.UnreachableMember.class);
110         }
111
112         if (autoStartGossipTicks) {
113             gossipTask = getContext().system().scheduler().schedule(
114                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
115                     config.getGossipTickInterval(),                 //interval
116                     getSelf(),                                      //target
117                     new Messages.GossiperMessages.GossipTick(),     //message
118                     getContext().dispatcher(),                      //execution context
119                     getSelf()                                       //sender
120             );
121         }
122     }
123
124     @Override
125     public void postStop(){
126         if (cluster != null) {
127             cluster.unsubscribe(getSelf());
128         }
129         if (gossipTask != null) {
130             gossipTask.cancel();
131         }
132     }
133
134     @Override
135     protected void handleReceive(final Object message) throws Exception {
136         //Usually sent by self via gossip task defined above. But its not enforced.
137         //These ticks can be sent by another actor as well which is esp. useful while testing
138         if (message instanceof GossipTick) {
139             receiveGossipTick();
140         } else if (message instanceof GossipStatus) {
141             // Message from remote gossiper with its bucket versions
142             receiveGossipStatus((GossipStatus) message);
143         } else if (message instanceof GossipEnvelope) {
144             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
145             // message. The contained buckets are newer as determined by the remote gossiper by
146             // comparing the GossipStatus message with its local versions.
147             receiveGossip((GossipEnvelope) message);
148         } else if (message instanceof ClusterEvent.MemberUp) {
149             receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
150
151         } else if (message instanceof ClusterEvent.ReachableMember) {
152             receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
153
154         } else if (message instanceof ClusterEvent.MemberRemoved) {
155             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
156
157         } else if (message instanceof ClusterEvent.UnreachableMember) {
158             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
159
160         } else {
161             unhandled(message);
162         }
163     }
164
165     /**
166      * Remove member from local copy of member list. If member down is self, then stop the actor
167      *
168      * @param member who went down
169      */
170     private void receiveMemberRemoveOrUnreachable(final Member member) {
171         //if its self, then stop itself
172         if (selfAddress.equals(member.address())){
173             getContext().stop(getSelf());
174             return;
175         }
176
177         clusterMembers.remove(member.address());
178         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
179
180         getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
181     }
182
183     /**
184      * Add member to the local copy of member list if it doesnt already
185      * @param member
186      */
187     private void receiveMemberUpOrReachable(final Member member) {
188         //ignore up notification for self
189         if (selfAddress.equals(member.address())) {
190             return;
191         }
192
193         if (!clusterMembers.contains(member.address())) {
194             clusterMembers.add(member.address());
195         }
196
197         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
198     }
199
200     /**
201      * Sends Gossip status to other members in the cluster. <br/>
202      * 1. If there are no member, ignore the tick. </br>
203      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
204      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
205      */
206     @VisibleForTesting
207     void receiveGossipTick() {
208         final Address remoteMemberToGossipTo;
209         switch (clusterMembers.size()) {
210             case 0:
211                 //no members to send gossip status to
212                 return;
213             case 1:
214                 remoteMemberToGossipTo = clusterMembers.get(0);
215                 break;
216             default:
217                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
218                 remoteMemberToGossipTo = clusterMembers.get(randomIndex);
219                 break;
220         }
221
222         LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
223         getLocalStatusAndSendTo(remoteMemberToGossipTo);
224     }
225
226     /**
227      * Process gossip status received from a remote gossiper. Remote versions are compared with
228      * the local copy. <p>
229      *
230      * For each bucket
231      * <ul>
232      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
233      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
234      *  <li>If both are same, noop</li>
235      * </ul>
236      *
237      * @param status bucket versions from a remote member
238      */
239     @VisibleForTesting
240     void receiveGossipStatus(final GossipStatus status) {
241         //Don't accept messages from non-members
242         if (!clusterMembers.contains(status.from())) {
243             return;
244         }
245
246         final ActorRef sender = getSender();
247         Future<Object> futureReply =
248                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
249
250         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
251     }
252
253     /**
254      * Sends the received buckets in the envelope to the parent Bucket store.
255      *
256      * @param envelope contains buckets from a remote gossiper
257      */
258     @VisibleForTesting
259     <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
260         //TODO: Add more validations
261         if (!selfAddress.equals(envelope.to())) {
262             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
263             return;
264         }
265
266         updateRemoteBuckets(envelope.getBuckets());
267     }
268
269     /**
270      * Helper to send received buckets to bucket store
271      *
272      * @param buckets
273      */
274     @VisibleForTesting
275     <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
276         getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
277     }
278
279     /**
280      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
281      *
282      * @param remote     remote node to send Buckets to
283      * @param addresses  node addresses whose buckets needs to be sent
284      */
285     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
286
287         Future<Object> futureReply =
288                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
289         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
290     }
291
292     /**
293      * Gets bucket versions from bucket store and sends to the supplied address
294      *
295      * @param remoteActorSystemAddress remote gossiper to send to
296      */
297     @VisibleForTesting
298     void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
299
300         //Get local status from bucket store and send to remote
301         Future<Object> futureReply =
302                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
303
304         //Find gossiper on remote system
305         ActorSelection remoteRef = getContext().system().actorSelection(
306                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
307
308         LOG.trace("Sending bucket versions to [{}]", remoteRef);
309
310         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
311     }
312
313     /**
314      * Helper to send bucket versions received from local store
315      * @param remote        remote gossiper to send versions to
316      * @param localVersions bucket versions received from local store
317      */
318     void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
319
320         GossipStatus status = new GossipStatus(selfAddress, localVersions);
321         remote.tell(status, getSelf());
322     }
323
324     void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
325
326         GossipStatus status = new GossipStatus(selfAddress, localVersions);
327         remote.tell(status, getSelf());
328     }
329
330     ///
331     /// Private factories to create mappers
332     ///
333
334     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
335
336         return new Mapper<Object, Void>() {
337             @Override
338             public Void apply(final Object replyMessage) {
339                 if (replyMessage instanceof GetBucketVersionsReply) {
340                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
341                     Map<Address, Long> localVersions = reply.getVersions();
342
343                     sendGossipStatusTo(remote, localVersions);
344
345                 }
346                 return null;
347             }
348         };
349     }
350
351     /**
352      * Process bucket versions received from
353      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
354      * Then this method compares remote bucket versions with local bucket versions.
355      * <ul>
356      *     <li>The buckets that are newer locally, send
357      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
358      *     to remote
359      *     <li>The buckets that are older locally, send
360      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
361      *     to remote so that remote sends GossipEnvelop.
362      * </ul>
363      *
364      * @param sender the remote member
365      * @param status bucket versions from a remote member
366      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
367      *
368      */
369     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
370
371         final Map<Address, Long> remoteVersions = status.getVersions();
372
373         return new Mapper<Object, Void>() {
374             @Override
375             public Void apply(final Object replyMessage) {
376                 if (replyMessage instanceof GetBucketVersionsReply) {
377                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
378                     Map<Address, Long> localVersions = reply.getVersions();
379
380                     //diff between remote list and local
381                     Set<Address> localIsOlder = new HashSet<>();
382                     localIsOlder.addAll(remoteVersions.keySet());
383                     localIsOlder.removeAll(localVersions.keySet());
384
385                     //diff between local list and remote
386                     Set<Address> localIsNewer = new HashSet<>();
387                     localIsNewer.addAll(localVersions.keySet());
388                     localIsNewer.removeAll(remoteVersions.keySet());
389
390
391                     for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
392                         Address address = entry.getKey();
393                         Long remoteVersion = entry.getValue();
394                         Long localVersion = localVersions.get(address);
395                         if (localVersion == null || remoteVersion == null) {
396                             //this condition is taken care of by above diffs
397                             continue;
398                         }
399                         if (localVersions.get(address) <  remoteVersions.get(address)) {
400                             localIsOlder.add(address);
401                         } else if (localVersions.get(address) > remoteVersions.get(address)) {
402                             localIsNewer.add(address);
403                         }
404                     }
405
406                     if (!localIsOlder.isEmpty()) {
407                         sendGossipStatusTo(sender, localVersions);
408                     }
409
410                     if (!localIsNewer.isEmpty()) {
411                         //send newer buckets to remote
412                         sendGossipTo(sender, localIsNewer);
413                     }
414                 }
415                 return null;
416             }
417         };
418     }
419
420     /**
421      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
422      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
423      * These buckets are sent to a remote member encapsulated in
424      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
425      *
426      * @param sender the remote member that sent
427      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
428      *               in reply to which bucket is being sent back
429      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
430      *
431      */
432     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
433
434         return new Mapper<Object, Void>() {
435             @Override
436             public Void apply(final Object msg) {
437                 if (msg instanceof GetBucketsByMembersReply) {
438                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
439                     LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
440                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
441                     sender.tell(envelope, getSelf());
442                 }
443                 return null;
444             }
445         };
446     }
447
448     ///
449     ///Getter Setters
450     ///
451
452     @VisibleForTesting
453     void setClusterMembers(final Address... members) {
454         clusterMembers.clear();
455         clusterMembers.addAll(Arrays.asList(members));
456     }
457 }