0b64136c49f9bda7bb331c9f5111d2146b91da9d
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Address;
13 import akka.actor.Cancellable;
14 import akka.actor.UntypedActor;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.Member;
18 import akka.dispatch.Mapper;
19 import akka.event.Logging;
20 import akka.event.LoggingAdapter;
21 import akka.pattern.Patterns;
22 import scala.concurrent.Future;
23 import scala.concurrent.duration.FiniteDuration;
24
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
32
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
41
42 /**
43  * Gossiper that syncs bucket store across nodes in the cluster.
44  * <p>
45  * It keeps a local scheduler that periodically sends Gossip ticks to itself to send bucket store's bucket versions
46  * to a randomly selected remote gossiper.
47  * <p>
48  * When bucket versions are received from a remote gossiper, it is compared with bucket store's bucket versions.
49  * Which ever buckets are newer locally, are sent to remote gossiper. If any bucket is older in bucket store, a
50  * gossip status is sent to remote gossiper so that it can send the newer buckets.
51  * <p>
52  * When a bucket is received from a remote gossiper, its sent to the bucket store for update.
53  *
54  */
55
56 public class Gossiper extends UntypedActor {
57
58     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
59
60     Cluster cluster = Cluster.get(getContext().system());
61
62     /**
63      * ActorSystem's address for the current cluster node.
64      */
65     private Address selfAddress = cluster.selfAddress();
66
67     /**
68      * All known cluster members
69      */
70     private List<Address> clusterMembers = new ArrayList<>();
71
72     private Cancellable gossipTask;
73
74     private Boolean autoStartGossipTicks = true;
75
76     public Gossiper(){}
77
78     /**
79      * Helpful for testing
80      * @param autoStartGossipTicks used for turning off gossip ticks during testing. Gossip tick can be manually sent.
81      */
82     public Gossiper(Boolean autoStartGossipTicks){
83         this.autoStartGossipTicks = autoStartGossipTicks;
84     }
85
86     @Override
87     public void preStart(){
88
89         cluster.subscribe(getSelf(),
90                           ClusterEvent.initialStateAsEvents(),
91                           ClusterEvent.MemberEvent.class,
92                           ClusterEvent.UnreachableMember.class);
93
94         if (autoStartGossipTicks) {
95             gossipTask = getContext().system().scheduler().schedule(
96                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
97                     new FiniteDuration(500, TimeUnit.MILLISECONDS),         //interval
98                     getSelf(),                                       //target
99                     new Messages.GossiperMessages.GossipTick(),      //message
100                     getContext().dispatcher(),                       //execution context
101                     getSelf()                                        //sender
102             );
103         }
104     }
105
106     @Override
107     public void postStop(){
108         if (cluster != null)
109             cluster.unsubscribe(getSelf());
110         if (gossipTask != null)
111             gossipTask.cancel();
112     }
113
114     @Override
115     public void onReceive(Object message) throws Exception {
116
117         log.debug("Received message: node[{}], message[{}]", selfAddress, message);
118
119         //Usually sent by self via gossip task defined above. But its not enforced.
120         //These ticks can be sent by another actor as well which is esp. useful while testing
121         if (message instanceof GossipTick)
122             receiveGossipTick();
123
124         //Message from remote gossiper with its bucket versions
125         else if (message instanceof GossipStatus)
126             receiveGossipStatus((GossipStatus) message);
127
128         //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
129         //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
130         //message with its local versions
131         else if (message instanceof GossipEnvelope)
132             receiveGossip((GossipEnvelope) message);
133
134         else if (message instanceof ClusterEvent.MemberUp) {
135             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
136
137         } else if (message instanceof ClusterEvent.MemberRemoved) {
138             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
139
140         } else if ( message instanceof ClusterEvent.UnreachableMember){
141             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
142
143         } else
144             unhandled(message);
145     }
146
147     /**
148      * Remove member from local copy of member list. If member down is self, then stop the actor
149      *
150      * @param member who went down
151      */
152     void receiveMemberRemoveOrUnreachable(Member member) {
153         //if its self, then stop itself
154         if (selfAddress.equals(member.address())){
155             getContext().stop(getSelf());
156             return;
157         }
158
159         clusterMembers.remove(member.address());
160         log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
161     }
162
163     /**
164      * Add member to the local copy of member list if it doesnt already
165      * @param member
166      */
167     void receiveMemberUp(Member member) {
168
169         if (selfAddress.equals(member.address()))
170             return; //ignore up notification for self
171
172         if (!clusterMembers.contains(member.address()))
173             clusterMembers.add(member.address());
174
175         log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
176     }
177
178     /**
179      * Sends Gossip status to other members in the cluster. <br/>
180      * 1. If there are no member, ignore the tick. </br>
181      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
182      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
183      */
184     void receiveGossipTick(){
185         if (clusterMembers.size() == 0) return; //no members to send gossip status to
186
187         Address remoteMemberToGossipTo = null;
188
189         if (clusterMembers.size() == 1)
190             remoteMemberToGossipTo = clusterMembers.get(0);
191         else {
192             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
193             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
194         }
195
196         log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
197         getLocalStatusAndSendTo(remoteMemberToGossipTo);
198     }
199
200     /**
201      * Process gossip status received from a remote gossiper. Remote versions are compared with
202      * the local copy. <p>
203      *
204      * For each bucket
205      * <ul>
206      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
207      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
208      *  <li>If both are same, noop</li>
209      * </ul>
210      *
211      * @param status bucket versions from a remote member
212      */
213     void receiveGossipStatus(GossipStatus status){
214         //Dont want to accept messages from non-members
215         if (!clusterMembers.contains(status.from()))
216             return;
217
218         final ActorRef sender = getSender();
219
220         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
221
222         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
223
224     }
225
226     /**
227      * Sends the received buckets in the envelope to the parent Bucket store.
228      *
229      * @param envelope contains buckets from a remote gossiper
230      */
231     void receiveGossip(GossipEnvelope envelope){
232         //TODO: Add more validations
233         if (!selfAddress.equals(envelope.to())) {
234             log.info("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
235             return;
236         }
237         if (envelope.getBuckets() == null)
238             return; //nothing to do
239
240         updateRemoteBuckets(envelope.getBuckets());
241
242     }
243
244     /**
245      * Helper to send received buckets to bucket store
246      *
247      * @param buckets
248      */
249     void updateRemoteBuckets(Map<Address, Bucket> buckets) {
250
251         if (buckets == null || buckets.isEmpty())
252             return; //nothing to merge
253
254         UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
255
256         getContext().parent().tell(updateRemoteBuckets, getSelf());
257     }
258
259     /**
260      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
261      *
262      * @param remote     remote node to send Buckets to
263      * @param addresses  node addresses whose buckets needs to be sent
264      */
265     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
266
267         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
268
269         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
270
271     }
272
273     /**
274      * Gets bucket versions from bucket store and sends to the supplied address
275      *
276      * @param remoteActorSystemAddress remote gossiper to send to
277      */
278     void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
279
280         //Get local status from bucket store and send to remote
281         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
282
283         ActorSelection remoteRef = getContext().system().actorSelection(
284                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
285
286         log.debug("Sending bucket versions to [{}]", remoteRef);
287
288         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
289
290     }
291
292     /**
293      * Helper to send bucket versions received from local store
294      * @param remote        remote gossiper to send versions to
295      * @param localVersions bucket versions received from local store
296      */
297     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
298
299         GossipStatus status = new GossipStatus(selfAddress, localVersions);
300         remote.tell(status, getSelf());
301     }
302
303     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
304
305         GossipStatus status = new GossipStatus(selfAddress, localVersions);
306         remote.tell(status, getSelf());
307     }
308
309     ///
310     /// Private factories to create mappers
311     ///
312
313     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
314
315         return new Mapper<Object, Void>() {
316             @Override
317             public Void apply(Object replyMessage) {
318                 if (replyMessage instanceof GetBucketVersionsReply) {
319                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
320                     Map<Address, Long> localVersions = reply.getVersions();
321
322                     sendGossipStatusTo(remote, localVersions);
323
324                 }
325                 return null;
326             }
327         };
328     }
329
330     /**
331      * Process bucket versions received from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
332      * Then this method compares remote bucket versions with local bucket versions.
333      * <ul>
334      *     <li>The buckets that are newer locally, send
335      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope} to remote
336      *     <li>The buckets that are older locally, send
337      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus} to remote so that
338      *     remote sends GossipEnvelop.
339      * </ul>
340      *
341      * @param sender the remote member
342      * @param status bucket versions from a remote member
343      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
344      *
345      */
346     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
347
348         final Map<Address, Long> remoteVersions = status.getVersions();
349
350         return new Mapper<Object, Void>() {
351             @Override
352             public Void apply(Object replyMessage) {
353                 if (replyMessage instanceof GetBucketVersionsReply) {
354                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
355                     Map<Address, Long> localVersions = reply.getVersions();
356
357                     //diff between remote list and local
358                     Set<Address> localIsOlder = new HashSet<>();
359                     localIsOlder.addAll(remoteVersions.keySet());
360                     localIsOlder.removeAll(localVersions.keySet());
361
362                     //diff between local list and remote
363                     Set<Address> localIsNewer = new HashSet<>();
364                     localIsNewer.addAll(localVersions.keySet());
365                     localIsNewer.removeAll(remoteVersions.keySet());
366
367
368                     for (Address address : remoteVersions.keySet()){
369
370                         if (localVersions.get(address) == null || remoteVersions.get(address) == null)
371                             continue; //this condition is taken care of by above diffs
372                         if (localVersions.get(address) <  remoteVersions.get(address))
373                             localIsOlder.add(address);
374                         else if (localVersions.get(address) > remoteVersions.get(address))
375                             localIsNewer.add(address);
376                         else
377                             continue;
378                     }
379
380                     if (!localIsOlder.isEmpty())
381                         sendGossipStatusTo(sender, localVersions );
382
383                     if (!localIsNewer.isEmpty())
384                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
385
386                 }
387                 return null;
388             }
389         };
390     }
391
392     /**
393      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore} that contains
394      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}. These buckets are sent to a remote member encapsulated
395      * in {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
396      *
397      * @param sender the remote member that sent
398      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
399      *               in reply to which bucket is being sent back
400      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
401      *
402      */
403     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
404
405         return new Mapper<Object, Void>() {
406             @Override
407             public Void apply(Object msg) {
408                 if (msg instanceof GetBucketsByMembersReply) {
409                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
410                     log.info("Buckets to send from {}: {}", selfAddress, buckets);
411                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
412                     sender.tell(envelope, getSelf());
413                 }
414                 return null;
415             }
416         };
417     }
418
419     ///
420     ///Getter Setters
421     ///
422     List<Address> getClusterMembers() {
423         return clusterMembers;
424     }
425
426     void setClusterMembers(List<Address> clusterMembers) {
427         this.clusterMembers = clusterMembers;
428     }
429
430     Address getSelfAddress() {
431         return selfAddress;
432     }
433
434     void setSelfAddress(Address selfAddress) {
435         this.selfAddress = selfAddress;
436     }
437 }