Merge "Small fix to xsql dependencies"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.UntypedActor;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.event.Logging;
22 import akka.event.LoggingAdapter;
23 import akka.pattern.Patterns;
24 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
25 import scala.concurrent.Future;
26 import scala.concurrent.duration.FiniteDuration;
27
28 import java.util.ArrayList;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ThreadLocalRandom;
34 import java.util.concurrent.TimeUnit;
35
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
41 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
42 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
43 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
44
45 /**
46  * Gossiper that syncs bucket store across nodes in the cluster.
47  * <p/>
48  * It keeps a local scheduler that periodically sends Gossip ticks to
49  * itself to send bucket store's bucket versions to a randomly selected remote
50  * gossiper.
51  * <p/>
52  * When bucket versions are received from a remote gossiper, it is compared
53  * with bucket store's bucket versions. Which ever buckets are newer
54  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
55  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
56  * <p/>
57  * When a bucket is received from a remote gossiper, its sent to the bucket store
58  * for update.
59  *
60  */
61
62 public class Gossiper extends UntypedActor {
63
64     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
65
66     private Cluster cluster;
67
68     /**
69      * ActorSystem's address for the current cluster node.
70      */
71     private Address selfAddress;
72
73     /**
74      * All known cluster members
75      */
76     private List<Address> clusterMembers = new ArrayList<>();
77
78     private Cancellable gossipTask;
79
80     private Boolean autoStartGossipTicks = true;
81
82     public Gossiper(){}
83
84     /**
85      * Helpful for testing
86      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
87      *                             Gossip tick can be manually sent.
88      */
89     public Gossiper(Boolean autoStartGossipTicks){
90         this.autoStartGossipTicks = autoStartGossipTicks;
91     }
92
93     @Override
94     public void preStart(){
95         ActorRefProvider provider = getContext().provider();
96         selfAddress = provider.getDefaultAddress();
97
98         if ( provider instanceof ClusterActorRefProvider ) {
99             cluster = Cluster.get(getContext().system());
100             cluster.subscribe(getSelf(),
101                     ClusterEvent.initialStateAsEvents(),
102                     ClusterEvent.MemberEvent.class,
103                     ClusterEvent.UnreachableMember.class);
104         }
105
106         if (autoStartGossipTicks) {
107             gossipTask = getContext().system().scheduler().schedule(
108                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
109                     ActorUtil.GOSSIP_TICK_INTERVAL,                 //interval
110                     getSelf(),                                       //target
111                     new Messages.GossiperMessages.GossipTick(),      //message
112                     getContext().dispatcher(),                       //execution context
113                     getSelf()                                        //sender
114             );
115         }
116     }
117
118     @Override
119     public void postStop(){
120         if (cluster != null)
121             cluster.unsubscribe(getSelf());
122         if (gossipTask != null)
123             gossipTask.cancel();
124     }
125
126     @Override
127     public void onReceive(Object message) throws Exception {
128
129         log.debug("Received message: node[{}], message[{}]", selfAddress, message);
130
131         //Usually sent by self via gossip task defined above. But its not enforced.
132         //These ticks can be sent by another actor as well which is esp. useful while testing
133         if (message instanceof GossipTick)
134             receiveGossipTick();
135
136         //Message from remote gossiper with its bucket versions
137         else if (message instanceof GossipStatus)
138             receiveGossipStatus((GossipStatus) message);
139
140         //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
141         //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
142         //message with its local versions
143         else if (message instanceof GossipEnvelope)
144             receiveGossip((GossipEnvelope) message);
145
146         else if (message instanceof ClusterEvent.MemberUp) {
147             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
148
149         } else if (message instanceof ClusterEvent.MemberRemoved) {
150             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
151
152         } else if ( message instanceof ClusterEvent.UnreachableMember){
153             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
154
155         } else
156             unhandled(message);
157     }
158
159     /**
160      * Remove member from local copy of member list. If member down is self, then stop the actor
161      *
162      * @param member who went down
163      */
164     void receiveMemberRemoveOrUnreachable(Member member) {
165         //if its self, then stop itself
166         if (selfAddress.equals(member.address())){
167             getContext().stop(getSelf());
168             return;
169         }
170
171         clusterMembers.remove(member.address());
172         log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
173     }
174
175     /**
176      * Add member to the local copy of member list if it doesnt already
177      * @param member
178      */
179     void receiveMemberUp(Member member) {
180
181         if (selfAddress.equals(member.address()))
182             return; //ignore up notification for self
183
184         if (!clusterMembers.contains(member.address()))
185             clusterMembers.add(member.address());
186
187         log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
188     }
189
190     /**
191      * Sends Gossip status to other members in the cluster. <br/>
192      * 1. If there are no member, ignore the tick. </br>
193      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
194      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
195      */
196     void receiveGossipTick(){
197         if (clusterMembers.size() == 0) return; //no members to send gossip status to
198
199         Address remoteMemberToGossipTo = null;
200
201         if (clusterMembers.size() == 1)
202             remoteMemberToGossipTo = clusterMembers.get(0);
203         else {
204             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
205             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
206         }
207
208         log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
209         getLocalStatusAndSendTo(remoteMemberToGossipTo);
210     }
211
212     /**
213      * Process gossip status received from a remote gossiper. Remote versions are compared with
214      * the local copy. <p>
215      *
216      * For each bucket
217      * <ul>
218      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
219      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
220      *  <li>If both are same, noop</li>
221      * </ul>
222      *
223      * @param status bucket versions from a remote member
224      */
225     void receiveGossipStatus(GossipStatus status){
226         //Don't accept messages from non-members
227         if (!clusterMembers.contains(status.from()))
228             return;
229
230         final ActorRef sender = getSender();
231         Future<Object> futureReply =
232                 Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
233
234         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
235
236     }
237
238     /**
239      * Sends the received buckets in the envelope to the parent Bucket store.
240      *
241      * @param envelope contains buckets from a remote gossiper
242      */
243     void receiveGossip(GossipEnvelope envelope){
244         //TODO: Add more validations
245         if (!selfAddress.equals(envelope.to())) {
246             log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
247             return;
248         }
249
250         updateRemoteBuckets(envelope.getBuckets());
251
252     }
253
254     /**
255      * Helper to send received buckets to bucket store
256      *
257      * @param buckets
258      */
259     void updateRemoteBuckets(Map<Address, Bucket> buckets) {
260
261         UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
262         getContext().parent().tell(updateRemoteBuckets, getSelf());
263     }
264
265     /**
266      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
267      *
268      * @param remote     remote node to send Buckets to
269      * @param addresses  node addresses whose buckets needs to be sent
270      */
271     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
272
273         Future<Object> futureReply =
274                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), ActorUtil.ASK_DURATION.toMillis());
275         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
276     }
277
278     /**
279      * Gets bucket versions from bucket store and sends to the supplied address
280      *
281      * @param remoteActorSystemAddress remote gossiper to send to
282      */
283     void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
284
285         //Get local status from bucket store and send to remote
286         Future<Object> futureReply =
287                 Patterns.ask(getContext().parent(), new GetBucketVersions(), ActorUtil.ASK_DURATION.toMillis());
288
289         //Find gossiper on remote system
290         ActorSelection remoteRef = getContext().system().actorSelection(
291                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
292
293         log.debug("Sending bucket versions to [{}]", remoteRef);
294
295         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
296
297     }
298
299     /**
300      * Helper to send bucket versions received from local store
301      * @param remote        remote gossiper to send versions to
302      * @param localVersions bucket versions received from local store
303      */
304     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
305
306         GossipStatus status = new GossipStatus(selfAddress, localVersions);
307         remote.tell(status, getSelf());
308     }
309
310     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
311
312         GossipStatus status = new GossipStatus(selfAddress, localVersions);
313         remote.tell(status, getSelf());
314     }
315
316     ///
317     /// Private factories to create mappers
318     ///
319
320     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
321
322         return new Mapper<Object, Void>() {
323             @Override
324             public Void apply(Object replyMessage) {
325                 if (replyMessage instanceof GetBucketVersionsReply) {
326                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
327                     Map<Address, Long> localVersions = reply.getVersions();
328
329                     sendGossipStatusTo(remote, localVersions);
330
331                 }
332                 return null;
333             }
334         };
335     }
336
337     /**
338      * Process bucket versions received from
339      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
340      * Then this method compares remote bucket versions with local bucket versions.
341      * <ul>
342      *     <li>The buckets that are newer locally, send
343      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
344      *     to remote
345      *     <li>The buckets that are older locally, send
346      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
347      *     to remote so that remote sends GossipEnvelop.
348      * </ul>
349      *
350      * @param sender the remote member
351      * @param status bucket versions from a remote member
352      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
353      *
354      */
355     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
356
357         final Map<Address, Long> remoteVersions = status.getVersions();
358
359         return new Mapper<Object, Void>() {
360             @Override
361             public Void apply(Object replyMessage) {
362                 if (replyMessage instanceof GetBucketVersionsReply) {
363                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
364                     Map<Address, Long> localVersions = reply.getVersions();
365
366                     //diff between remote list and local
367                     Set<Address> localIsOlder = new HashSet<>();
368                     localIsOlder.addAll(remoteVersions.keySet());
369                     localIsOlder.removeAll(localVersions.keySet());
370
371                     //diff between local list and remote
372                     Set<Address> localIsNewer = new HashSet<>();
373                     localIsNewer.addAll(localVersions.keySet());
374                     localIsNewer.removeAll(remoteVersions.keySet());
375
376
377                     for (Address address : remoteVersions.keySet()){
378
379                         if (localVersions.get(address) == null || remoteVersions.get(address) == null)
380                             continue; //this condition is taken care of by above diffs
381                         if (localVersions.get(address) <  remoteVersions.get(address))
382                             localIsOlder.add(address);
383                         else if (localVersions.get(address) > remoteVersions.get(address))
384                             localIsNewer.add(address);
385                         else
386                             continue;
387                     }
388
389                     if (!localIsOlder.isEmpty())
390                         sendGossipStatusTo(sender, localVersions );
391
392                     if (!localIsNewer.isEmpty())
393                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
394
395                 }
396                 return null;
397             }
398         };
399     }
400
401     /**
402      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
403      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
404      * These buckets are sent to a remote member encapsulated in
405      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
406      *
407      * @param sender the remote member that sent
408      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
409      *               in reply to which bucket is being sent back
410      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
411      *
412      */
413     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
414
415         return new Mapper<Object, Void>() {
416             @Override
417             public Void apply(Object msg) {
418                 if (msg instanceof GetBucketsByMembersReply) {
419                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
420                     log.debug("Buckets to send from {}: {}", selfAddress, buckets);
421                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
422                     sender.tell(envelope, getSelf());
423                 }
424                 return null;
425             }
426         };
427     }
428
429     ///
430     ///Getter Setters
431     ///
432     List<Address> getClusterMembers() {
433         return clusterMembers;
434     }
435
436     void setClusterMembers(List<Address> clusterMembers) {
437         this.clusterMembers = clusterMembers;
438     }
439
440     Address getSelfAddress() {
441         return selfAddress;
442     }
443
444     void setSelfAddress(Address selfAddress) {
445         this.selfAddress = selfAddress;
446     }
447 }