d26bda1c25fa481f2cb13a37efc819ada1d550ac
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Preconditions;
23 import java.util.ArrayList;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ThreadLocalRandom;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
31 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.FiniteDuration;
44
45 /**
46  * Gossiper that syncs bucket store across nodes in the cluster.
47  *
48  * <p>
49  * It keeps a local scheduler that periodically sends Gossip ticks to
50  * itself to send bucket store's bucket versions to a randomly selected remote
51  * gossiper.
52  *
53  * <p>
54  * When bucket versions are received from a remote gossiper, it is compared
55  * with bucket store's bucket versions. Which ever buckets are newer
56  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
57  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
58  *
59  * <p>
60  * When a bucket is received from a remote gossiper, its sent to the bucket store
61  * for update.
62  */
63 public class Gossiper extends AbstractUntypedActorWithMetering {
64
65     private final Logger log = LoggerFactory.getLogger(getClass());
66
67     private Cluster cluster;
68
69     /**
70      * ActorSystem's address for the current cluster node.
71      */
72     private Address selfAddress;
73
74     /**
75      * All known cluster members.
76      */
77     private List<Address> clusterMembers = new ArrayList<>();
78
79     private Cancellable gossipTask;
80
81     private Boolean autoStartGossipTicks = true;
82
83     private final RemoteRpcProviderConfig config;
84
85     public Gossiper(RemoteRpcProviderConfig config) {
86         this.config = Preconditions.checkNotNull(config);
87     }
88
89     /**
90      * Constructor for testing.
91      *
92      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
93      *                             Gossip tick can be manually sent.
94      */
95     @VisibleForTesting
96     public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
97         this(config);
98         this.autoStartGossipTicks = autoStartGossipTicks;
99     }
100
101     @Override
102     public void preStart() {
103         ActorRefProvider provider = getContext().provider();
104         selfAddress = provider.getDefaultAddress();
105
106         if ( provider instanceof ClusterActorRefProvider ) {
107             cluster = Cluster.get(getContext().system());
108             cluster.subscribe(getSelf(),
109                     ClusterEvent.initialStateAsEvents(),
110                     ClusterEvent.MemberEvent.class,
111                     ClusterEvent.UnreachableMember.class);
112         }
113
114         if (autoStartGossipTicks) {
115             gossipTask = getContext().system().scheduler().schedule(
116                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
117                     config.getGossipTickInterval(),                 //interval
118                     getSelf(),                                       //target
119                     new Messages.GossiperMessages.GossipTick(),      //message
120                     getContext().dispatcher(),                       //execution context
121                     getSelf()                                        //sender
122             );
123         }
124     }
125
126     @Override
127     public void postStop() {
128         if (cluster != null) {
129             cluster.unsubscribe(getSelf());
130         }
131         if (gossipTask != null) {
132             gossipTask.cancel();
133         }
134     }
135
136     @SuppressWarnings({ "rawtypes", "unchecked" })
137     @Override
138     protected void handleReceive(Object message) throws Exception {
139         //Usually sent by self via gossip task defined above. But its not enforced.
140         //These ticks can be sent by another actor as well which is esp. useful while testing
141         if (message instanceof GossipTick) {
142             receiveGossipTick();
143         } else if (message instanceof GossipStatus) {
144             // Message from remote gossiper with its bucket versions
145             receiveGossipStatus((GossipStatus) message);
146         } else if (message instanceof GossipEnvelope) {
147             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
148             // message. The contained buckets are newer as determined by the remote gossiper by
149             // comparing the GossipStatus message with its local versions.
150             receiveGossip((GossipEnvelope) message);
151         } else if (message instanceof ClusterEvent.MemberUp) {
152             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
153
154         } else if (message instanceof ClusterEvent.MemberRemoved) {
155             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
156
157         } else if ( message instanceof ClusterEvent.UnreachableMember) {
158             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
159
160         } else {
161             unhandled(message);
162         }
163     }
164
165     /**
166      * Remove member from local copy of member list. If member down is self, then stop the actor
167      *
168      * @param member who went down
169      */
170     void receiveMemberRemoveOrUnreachable(Member member) {
171         //if its self, then stop itself
172         if (selfAddress.equals(member.address())) {
173             getContext().stop(getSelf());
174             return;
175         }
176
177         clusterMembers.remove(member.address());
178         log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
179     }
180
181     /**
182      * Add member to the local copy of member list if it doesn't already.
183      *
184      * @param member the member to add
185      */
186     void receiveMemberUp(Member member) {
187
188         if (selfAddress.equals(member.address())) {
189             return; //ignore up notification for self
190         }
191
192         if (!clusterMembers.contains(member.address())) {
193             clusterMembers.add(member.address());
194         }
195
196         log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
197     }
198
199     /**
200      * Sends Gossip status to other members in the cluster.
201      * <br>
202      * 1. If there are no member, ignore the tick. <br>
203      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
204      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
205      */
206     void receiveGossipTick() {
207         if (clusterMembers.size() == 0) {
208             return; //no members to send gossip status to
209         }
210
211         Address remoteMemberToGossipTo;
212
213         if (clusterMembers.size() == 1) {
214             remoteMemberToGossipTo = clusterMembers.get(0);
215         } else {
216             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
217             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
218         }
219
220         log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
221         getLocalStatusAndSendTo(remoteMemberToGossipTo);
222     }
223
224     /**
225      * Process gossip status received from a remote gossiper. Remote versions are compared with
226      * the local copy.
227      * <p/>
228      * For each bucket
229      * <ul>
230      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
231      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
232      *  <li>If both are same, noop</li>
233      * </ul>
234      *
235      * @param status bucket versions from a remote member
236      */
237     void receiveGossipStatus(GossipStatus status) {
238         //Don't accept messages from non-members
239         if (!clusterMembers.contains(status.from())) {
240             return;
241         }
242
243         final ActorRef sender = getSender();
244         Future<Object> futureReply =
245                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
246
247         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
248
249     }
250
251     /**
252      * Sends the received buckets in the envelope to the parent Bucket store.
253      *
254      * @param envelope contains buckets from a remote gossiper
255      */
256     <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
257         //TODO: Add more validations
258         if (!selfAddress.equals(envelope.to())) {
259             if (log.isTraceEnabled()) {
260                 log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
261                         envelope.to());
262             }
263             return;
264         }
265
266         updateRemoteBuckets(envelope.getBuckets());
267
268     }
269
270     /**
271      * Helper to send received buckets to bucket store.
272      *
273      * @param buckets map of Buckets to update
274      */
275     <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
276         UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
277         getContext().parent().tell(updateRemoteBuckets, getSelf());
278     }
279
280     /**
281      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
282      *
283      * @param remote     remote node to send Buckets to
284      * @param addresses  node addresses whose buckets needs to be sent
285      */
286     void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
287
288         Future<Object> futureReply =
289                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
290         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
291     }
292
293     /**
294      * Gets bucket versions from bucket store and sends to the supplied address.
295      *
296      * @param remoteActorSystemAddress remote gossiper to send to
297      */
298     void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
299
300         //Get local status from bucket store and send to remote
301         Future<Object> futureReply =
302                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
303
304         //Find gossiper on remote system
305         ActorSelection remoteRef = getContext().system().actorSelection(
306                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
307
308         log.trace("Sending bucket versions to [{}]", remoteRef);
309
310         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
311
312     }
313
314     /**
315      * Helper to send bucket versions received from local store.
316      *
317      * @param remote        remote gossiper to send versions to
318      * @param localVersions bucket versions received from local store
319      */
320     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
321
322         GossipStatus status = new GossipStatus(selfAddress, localVersions);
323         remote.tell(status, getSelf());
324     }
325
326     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
327
328         GossipStatus status = new GossipStatus(selfAddress, localVersions);
329         remote.tell(status, getSelf());
330     }
331
332     ///
333     /// Private factories to create mappers
334     ///
335
336     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
337
338         return new Mapper<Object, Void>() {
339             @Override
340             public Void apply(Object replyMessage) {
341                 if (replyMessage instanceof GetBucketVersionsReply) {
342                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
343                     Map<Address, Long> localVersions = reply.getVersions();
344
345                     sendGossipStatusTo(remote, localVersions);
346
347                 }
348                 return null;
349             }
350         };
351     }
352
353     /**
354      * Process bucket versions received from
355      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
356      * Then this method compares remote bucket versions with local bucket versions.
357      * <ul>
358      *     <li>The buckets that are newer locally, send
359      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
360      *     to remote
361      *     <li>The buckets that are older locally, send
362      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
363      *     to remote so that remote sends GossipEnvelop.
364      * </ul>
365      *
366      * @param sender the remote member
367      * @param status bucket versions from a remote member
368      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
369      *
370      */
371     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
372
373         final Map<Address, Long> remoteVersions = status.getVersions();
374
375         return new Mapper<Object, Void>() {
376             @Override
377             public Void apply(Object replyMessage) {
378                 if (replyMessage instanceof GetBucketVersionsReply) {
379                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
380                     Map<Address, Long> localVersions = reply.getVersions();
381
382                     //diff between remote list and local
383                     Set<Address> localIsOlder = new HashSet<>();
384                     localIsOlder.addAll(remoteVersions.keySet());
385                     localIsOlder.removeAll(localVersions.keySet());
386
387                     //diff between local list and remote
388                     Set<Address> localIsNewer = new HashSet<>();
389                     localIsNewer.addAll(localVersions.keySet());
390                     localIsNewer.removeAll(remoteVersions.keySet());
391
392
393                     for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
394                         Address address = entry.getKey();
395                         Long remoteVersion = entry.getValue();
396                         Long localVersion = localVersions.get(address);
397                         if (localVersion == null || remoteVersion == null) {
398                             continue; //this condition is taken care of by above diffs
399                         }
400
401                         if (localVersion < remoteVersion) {
402                             localIsOlder.add(address);
403                         } else if (localVersion > remoteVersion) {
404                             localIsNewer.add(address);
405                         }
406                     }
407
408                     if (!localIsOlder.isEmpty()) {
409                         sendGossipStatusTo(sender, localVersions );
410                     }
411
412                     if (!localIsNewer.isEmpty()) {
413                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
414                     }
415
416                 }
417                 return null;
418             }
419         };
420     }
421
422     /**
423      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
424      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
425      * These buckets are sent to a remote member encapsulated in
426      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
427      *
428      * @param sender the remote member that sent
429      *           {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
430      *           in reply to which bucket is being sent back
431      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
432      *
433      */
434     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
435
436         return new Mapper<Object, Void>() {
437             @SuppressWarnings({ "rawtypes", "unchecked" })
438             @Override
439             public Void apply(Object msg) {
440                 if (msg instanceof GetBucketsByMembersReply) {
441                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
442                     log.trace("Buckets to send from {}: {}", selfAddress, buckets);
443                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
444                     sender.tell(envelope, getSelf());
445                 }
446                 return null;
447             }
448         };
449     }
450
451     ///
452     ///Getter Setters
453     ///
454     List<Address> getClusterMembers() {
455         return clusterMembers;
456     }
457
458     void setClusterMembers(List<Address> clusterMembers) {
459         this.clusterMembers = clusterMembers;
460     }
461
462     Address getSelfAddress() {
463         return selfAddress;
464     }
465
466     void setSelfAddress(Address selfAddress) {
467         this.selfAddress = selfAddress;
468     }
469 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.