Merge "BUG-2627: do not duplicate descriptions"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import java.util.ArrayList;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.ThreadLocalRandom;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
29 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.FiniteDuration;
42
43 /**
44  * Gossiper that syncs bucket store across nodes in the cluster.
45  * <p/>
46  * It keeps a local scheduler that periodically sends Gossip ticks to
47  * itself to send bucket store's bucket versions to a randomly selected remote
48  * gossiper.
49  * <p/>
50  * When bucket versions are received from a remote gossiper, it is compared
51  * with bucket store's bucket versions. Which ever buckets are newer
52  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
53  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
54  * <p/>
55  * When a bucket is received from a remote gossiper, its sent to the bucket store
56  * for update.
57  *
58  */
59
60 public class Gossiper extends AbstractUntypedActorWithMetering {
61
62     private final Logger log = LoggerFactory.getLogger(getClass());
63
64     private Cluster cluster;
65
66     /**
67      * ActorSystem's address for the current cluster node.
68      */
69     private Address selfAddress;
70
71     /**
72      * All known cluster members
73      */
74     private List<Address> clusterMembers = new ArrayList<>();
75
76     private Cancellable gossipTask;
77
78     private Boolean autoStartGossipTicks = true;
79
80     private RemoteRpcProviderConfig config;
81
82     public Gossiper(){
83         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
84     }
85
86     /**
87      * Helpful for testing
88      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
89      *                             Gossip tick can be manually sent.
90      */
91     public Gossiper(Boolean autoStartGossipTicks){
92         this.autoStartGossipTicks = autoStartGossipTicks;
93     }
94
95     @Override
96     public void preStart(){
97         ActorRefProvider provider = getContext().provider();
98         selfAddress = provider.getDefaultAddress();
99
100         if ( provider instanceof ClusterActorRefProvider ) {
101             cluster = Cluster.get(getContext().system());
102             cluster.subscribe(getSelf(),
103                     ClusterEvent.initialStateAsEvents(),
104                     ClusterEvent.MemberEvent.class,
105                     ClusterEvent.UnreachableMember.class);
106         }
107
108         if (autoStartGossipTicks) {
109             gossipTask = getContext().system().scheduler().schedule(
110                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
111                     config.getGossipTickInterval(),                 //interval
112                     getSelf(),                                       //target
113                     new Messages.GossiperMessages.GossipTick(),      //message
114                     getContext().dispatcher(),                       //execution context
115                     getSelf()                                        //sender
116             );
117         }
118     }
119
120     @Override
121     public void postStop(){
122         if (cluster != null) {
123             cluster.unsubscribe(getSelf());
124         }
125         if (gossipTask != null) {
126             gossipTask.cancel();
127         }
128     }
129
130     @Override
131     protected void handleReceive(Object message) throws Exception {
132         //Usually sent by self via gossip task defined above. But its not enforced.
133         //These ticks can be sent by another actor as well which is esp. useful while testing
134         if (message instanceof GossipTick) {
135             receiveGossipTick();
136         } else if (message instanceof GossipStatus) {
137             // Message from remote gossiper with its bucket versions
138             receiveGossipStatus((GossipStatus) message);
139         } else if (message instanceof GossipEnvelope) {
140             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
141             // message. The contained buckets are newer as determined by the remote gossiper by
142             // comparing the GossipStatus message with its local versions.
143             receiveGossip((GossipEnvelope) message);
144         } else if (message instanceof ClusterEvent.MemberUp) {
145             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
146
147         } else if (message instanceof ClusterEvent.MemberRemoved) {
148             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
149
150         } else if ( message instanceof ClusterEvent.UnreachableMember){
151             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
152
153         } else {
154             unhandled(message);
155         }
156     }
157
158     /**
159      * Remove member from local copy of member list. If member down is self, then stop the actor
160      *
161      * @param member who went down
162      */
163     void receiveMemberRemoveOrUnreachable(Member member) {
164         //if its self, then stop itself
165         if (selfAddress.equals(member.address())){
166             getContext().stop(getSelf());
167             return;
168         }
169
170         clusterMembers.remove(member.address());
171         if(log.isDebugEnabled()) {
172             log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
173         }
174     }
175
176     /**
177      * Add member to the local copy of member list if it doesnt already
178      * @param member
179      */
180     void receiveMemberUp(Member member) {
181
182         if (selfAddress.equals(member.address())) {
183             return; //ignore up notification for self
184         }
185
186         if (!clusterMembers.contains(member.address())) {
187             clusterMembers.add(member.address());
188         }
189         if(log.isDebugEnabled()) {
190             log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
191         }
192     }
193
194     /**
195      * Sends Gossip status to other members in the cluster. <br/>
196      * 1. If there are no member, ignore the tick. </br>
197      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
198      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
199      */
200     void receiveGossipTick(){
201         if (clusterMembers.size() == 0) {
202             return; //no members to send gossip status to
203         }
204
205         Address remoteMemberToGossipTo;
206
207         if (clusterMembers.size() == 1) {
208             remoteMemberToGossipTo = clusterMembers.get(0);
209         } else {
210             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
211             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
212         }
213         if(log.isDebugEnabled()) {
214             log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
215         }
216         getLocalStatusAndSendTo(remoteMemberToGossipTo);
217     }
218
219     /**
220      * Process gossip status received from a remote gossiper. Remote versions are compared with
221      * the local copy. <p>
222      *
223      * For each bucket
224      * <ul>
225      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
226      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
227      *  <li>If both are same, noop</li>
228      * </ul>
229      *
230      * @param status bucket versions from a remote member
231      */
232     void receiveGossipStatus(GossipStatus status){
233         //Don't accept messages from non-members
234         if (!clusterMembers.contains(status.from())) {
235             return;
236         }
237
238         final ActorRef sender = getSender();
239         Future<Object> futureReply =
240                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
241
242         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
243
244     }
245
246     /**
247      * Sends the received buckets in the envelope to the parent Bucket store.
248      *
249      * @param envelope contains buckets from a remote gossiper
250      */
251     void receiveGossip(GossipEnvelope envelope){
252         //TODO: Add more validations
253         if (!selfAddress.equals(envelope.to())) {
254             if(log.isDebugEnabled()) {
255                 log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
256             }
257             return;
258         }
259
260         updateRemoteBuckets(envelope.getBuckets());
261
262     }
263
264     /**
265      * Helper to send received buckets to bucket store
266      *
267      * @param buckets
268      */
269     void updateRemoteBuckets(Map<Address, Bucket> buckets) {
270
271         UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
272         getContext().parent().tell(updateRemoteBuckets, getSelf());
273     }
274
275     /**
276      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
277      *
278      * @param remote     remote node to send Buckets to
279      * @param addresses  node addresses whose buckets needs to be sent
280      */
281     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
282
283         Future<Object> futureReply =
284                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
285         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
286     }
287
288     /**
289      * Gets bucket versions from bucket store and sends to the supplied address
290      *
291      * @param remoteActorSystemAddress remote gossiper to send to
292      */
293     void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
294
295         //Get local status from bucket store and send to remote
296         Future<Object> futureReply =
297                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
298
299         //Find gossiper on remote system
300         ActorSelection remoteRef = getContext().system().actorSelection(
301                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
302
303         if(log.isDebugEnabled()) {
304             log.debug("Sending bucket versions to [{}]", remoteRef);
305         }
306
307         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
308
309     }
310
311     /**
312      * Helper to send bucket versions received from local store
313      * @param remote        remote gossiper to send versions to
314      * @param localVersions bucket versions received from local store
315      */
316     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
317
318         GossipStatus status = new GossipStatus(selfAddress, localVersions);
319         remote.tell(status, getSelf());
320     }
321
322     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
323
324         GossipStatus status = new GossipStatus(selfAddress, localVersions);
325         remote.tell(status, getSelf());
326     }
327
328     ///
329     /// Private factories to create mappers
330     ///
331
332     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
333
334         return new Mapper<Object, Void>() {
335             @Override
336             public Void apply(Object replyMessage) {
337                 if (replyMessage instanceof GetBucketVersionsReply) {
338                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
339                     Map<Address, Long> localVersions = reply.getVersions();
340
341                     sendGossipStatusTo(remote, localVersions);
342
343                 }
344                 return null;
345             }
346         };
347     }
348
349     /**
350      * Process bucket versions received from
351      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
352      * Then this method compares remote bucket versions with local bucket versions.
353      * <ul>
354      *     <li>The buckets that are newer locally, send
355      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
356      *     to remote
357      *     <li>The buckets that are older locally, send
358      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
359      *     to remote so that remote sends GossipEnvelop.
360      * </ul>
361      *
362      * @param sender the remote member
363      * @param status bucket versions from a remote member
364      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
365      *
366      */
367     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
368
369         final Map<Address, Long> remoteVersions = status.getVersions();
370
371         return new Mapper<Object, Void>() {
372             @Override
373             public Void apply(Object replyMessage) {
374                 if (replyMessage instanceof GetBucketVersionsReply) {
375                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
376                     Map<Address, Long> localVersions = reply.getVersions();
377
378                     //diff between remote list and local
379                     Set<Address> localIsOlder = new HashSet<>();
380                     localIsOlder.addAll(remoteVersions.keySet());
381                     localIsOlder.removeAll(localVersions.keySet());
382
383                     //diff between local list and remote
384                     Set<Address> localIsNewer = new HashSet<>();
385                     localIsNewer.addAll(localVersions.keySet());
386                     localIsNewer.removeAll(remoteVersions.keySet());
387
388
389                     for (Address address : remoteVersions.keySet()){
390
391                         if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
392                             continue; //this condition is taken care of by above diffs
393                         }
394                         if (localVersions.get(address) <  remoteVersions.get(address)) {
395                             localIsOlder.add(address);
396                         } else if (localVersions.get(address) > remoteVersions.get(address)) {
397                             localIsNewer.add(address);
398                         }
399                     }
400
401                     if (!localIsOlder.isEmpty()) {
402                         sendGossipStatusTo(sender, localVersions );
403                     }
404
405                     if (!localIsNewer.isEmpty()) {
406                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
407                     }
408
409                 }
410                 return null;
411             }
412         };
413     }
414
415     /**
416      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
417      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
418      * These buckets are sent to a remote member encapsulated in
419      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
420      *
421      * @param sender the remote member that sent
422      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
423      *               in reply to which bucket is being sent back
424      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
425      *
426      */
427     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
428
429         return new Mapper<Object, Void>() {
430             @Override
431             public Void apply(Object msg) {
432                 if (msg instanceof GetBucketsByMembersReply) {
433                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
434                     if(log.isDebugEnabled()) {
435                         log.debug("Buckets to send from {}: {}", selfAddress, buckets);
436                     }
437                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
438                     sender.tell(envelope, getSelf());
439                 }
440                 return null;
441             }
442         };
443     }
444
445     ///
446     ///Getter Setters
447     ///
448     List<Address> getClusterMembers() {
449         return clusterMembers;
450     }
451
452     void setClusterMembers(List<Address> clusterMembers) {
453         this.clusterMembers = clusterMembers;
454     }
455
456     Address getSelfAddress() {
457         return selfAddress;
458     }
459
460     void setSelfAddress(Address selfAddress) {
461         this.selfAddress = selfAddress;
462     }
463 }