Merge "Fix for possible NPE if Bundle is stopped."
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.UntypedActor;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.event.Logging;
22 import akka.event.LoggingAdapter;
23 import akka.pattern.Patterns;
24 import scala.concurrent.Future;
25 import scala.concurrent.duration.FiniteDuration;
26
27 import java.util.ArrayList;
28 import java.util.HashSet;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.Set;
32 import java.util.concurrent.ThreadLocalRandom;
33 import java.util.concurrent.TimeUnit;
34
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
41 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
42 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
43
44 /**
45  * Gossiper that syncs bucket store across nodes in the cluster.
46  * <p/>
47  * It keeps a local scheduler that periodically sends Gossip ticks to
48  * itself to send bucket store's bucket versions to a randomly selected remote
49  * gossiper.
50  * <p/>
51  * When bucket versions are received from a remote gossiper, it is compared
52  * with bucket store's bucket versions. Which ever buckets are newer
53  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
54  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
55  * <p/>
56  * When a bucket is received from a remote gossiper, its sent to the bucket store
57  * for update.
58  *
59  */
60
61 public class Gossiper extends UntypedActor {
62
63     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
64
65     private Cluster cluster;
66
67     /**
68      * ActorSystem's address for the current cluster node.
69      */
70     private Address selfAddress;
71
72     /**
73      * All known cluster members
74      */
75     private List<Address> clusterMembers = new ArrayList<>();
76
77     private Cancellable gossipTask;
78
79     private Boolean autoStartGossipTicks = true;
80
81     public Gossiper(){}
82
83     /**
84      * Helpful for testing
85      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
86      *                             Gossip tick can be manually sent.
87      */
88     public Gossiper(Boolean autoStartGossipTicks){
89         this.autoStartGossipTicks = autoStartGossipTicks;
90     }
91
92     @Override
93     public void preStart(){
94         ActorRefProvider provider = getContext().provider();
95         selfAddress = provider.getDefaultAddress();
96
97         if ( provider instanceof ClusterActorRefProvider ) {
98             cluster = Cluster.get(getContext().system());
99             cluster.subscribe(getSelf(),
100                     ClusterEvent.initialStateAsEvents(),
101                     ClusterEvent.MemberEvent.class,
102                     ClusterEvent.UnreachableMember.class);
103         }
104
105         if (autoStartGossipTicks) {
106             gossipTask = getContext().system().scheduler().schedule(
107                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
108                     new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
109                     getSelf(),                                       //target
110                     new Messages.GossiperMessages.GossipTick(),      //message
111                     getContext().dispatcher(),                       //execution context
112                     getSelf()                                        //sender
113             );
114         }
115     }
116
117     @Override
118     public void postStop(){
119         if (cluster != null)
120             cluster.unsubscribe(getSelf());
121         if (gossipTask != null)
122             gossipTask.cancel();
123     }
124
125     @Override
126     public void onReceive(Object message) throws Exception {
127
128         log.debug("Received message: node[{}], message[{}]", selfAddress, message);
129
130         //Usually sent by self via gossip task defined above. But its not enforced.
131         //These ticks can be sent by another actor as well which is esp. useful while testing
132         if (message instanceof GossipTick)
133             receiveGossipTick();
134
135         //Message from remote gossiper with its bucket versions
136         else if (message instanceof GossipStatus)
137             receiveGossipStatus((GossipStatus) message);
138
139         //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
140         //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
141         //message with its local versions
142         else if (message instanceof GossipEnvelope)
143             receiveGossip((GossipEnvelope) message);
144
145         else if (message instanceof ClusterEvent.MemberUp) {
146             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
147
148         } else if (message instanceof ClusterEvent.MemberRemoved) {
149             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
150
151         } else if ( message instanceof ClusterEvent.UnreachableMember){
152             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
153
154         } else
155             unhandled(message);
156     }
157
158     /**
159      * Remove member from local copy of member list. If member down is self, then stop the actor
160      *
161      * @param member who went down
162      */
163     void receiveMemberRemoveOrUnreachable(Member member) {
164         //if its self, then stop itself
165         if (selfAddress.equals(member.address())){
166             getContext().stop(getSelf());
167             return;
168         }
169
170         clusterMembers.remove(member.address());
171         log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
172     }
173
174     /**
175      * Add member to the local copy of member list if it doesnt already
176      * @param member
177      */
178     void receiveMemberUp(Member member) {
179
180         if (selfAddress.equals(member.address()))
181             return; //ignore up notification for self
182
183         if (!clusterMembers.contains(member.address()))
184             clusterMembers.add(member.address());
185
186         log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
187     }
188
189     /**
190      * Sends Gossip status to other members in the cluster. <br/>
191      * 1. If there are no member, ignore the tick. </br>
192      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
193      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
194      */
195     void receiveGossipTick(){
196         if (clusterMembers.size() == 0) return; //no members to send gossip status to
197
198         Address remoteMemberToGossipTo = null;
199
200         if (clusterMembers.size() == 1)
201             remoteMemberToGossipTo = clusterMembers.get(0);
202         else {
203             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
204             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
205         }
206
207         log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
208         getLocalStatusAndSendTo(remoteMemberToGossipTo);
209     }
210
211     /**
212      * Process gossip status received from a remote gossiper. Remote versions are compared with
213      * the local copy. <p>
214      *
215      * For each bucket
216      * <ul>
217      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
218      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
219      *  <li>If both are same, noop</li>
220      * </ul>
221      *
222      * @param status bucket versions from a remote member
223      */
224     void receiveGossipStatus(GossipStatus status){
225         //Don't accept messages from non-members
226         if (!clusterMembers.contains(status.from()))
227             return;
228
229         final ActorRef sender = getSender();
230         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
231         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
232
233     }
234
235     /**
236      * Sends the received buckets in the envelope to the parent Bucket store.
237      *
238      * @param envelope contains buckets from a remote gossiper
239      */
240     void receiveGossip(GossipEnvelope envelope){
241         //TODO: Add more validations
242         if (!selfAddress.equals(envelope.to())) {
243             log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
244             return;
245         }
246
247         updateRemoteBuckets(envelope.getBuckets());
248
249     }
250
251     /**
252      * Helper to send received buckets to bucket store
253      *
254      * @param buckets
255      */
256     void updateRemoteBuckets(Map<Address, Bucket> buckets) {
257
258         UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
259         getContext().parent().tell(updateRemoteBuckets, getSelf());
260     }
261
262     /**
263      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
264      *
265      * @param remote     remote node to send Buckets to
266      * @param addresses  node addresses whose buckets needs to be sent
267      */
268     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
269
270         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
271         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
272     }
273
274     /**
275      * Gets bucket versions from bucket store and sends to the supplied address
276      *
277      * @param remoteActorSystemAddress remote gossiper to send to
278      */
279     void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
280
281         //Get local status from bucket store and send to remote
282         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
283         ActorSelection remoteRef = getContext().system().actorSelection(
284                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
285
286         log.debug("Sending bucket versions to [{}]", remoteRef);
287
288         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
289
290     }
291
292     /**
293      * Helper to send bucket versions received from local store
294      * @param remote        remote gossiper to send versions to
295      * @param localVersions bucket versions received from local store
296      */
297     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
298
299         GossipStatus status = new GossipStatus(selfAddress, localVersions);
300         remote.tell(status, getSelf());
301     }
302
303     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
304
305         GossipStatus status = new GossipStatus(selfAddress, localVersions);
306         remote.tell(status, getSelf());
307     }
308
309     ///
310     /// Private factories to create mappers
311     ///
312
313     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
314
315         return new Mapper<Object, Void>() {
316             @Override
317             public Void apply(Object replyMessage) {
318                 if (replyMessage instanceof GetBucketVersionsReply) {
319                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
320                     Map<Address, Long> localVersions = reply.getVersions();
321
322                     sendGossipStatusTo(remote, localVersions);
323
324                 }
325                 return null;
326             }
327         };
328     }
329
330     /**
331      * Process bucket versions received from
332      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
333      * Then this method compares remote bucket versions with local bucket versions.
334      * <ul>
335      *     <li>The buckets that are newer locally, send
336      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
337      *     to remote
338      *     <li>The buckets that are older locally, send
339      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
340      *     to remote so that remote sends GossipEnvelop.
341      * </ul>
342      *
343      * @param sender the remote member
344      * @param status bucket versions from a remote member
345      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
346      *
347      */
348     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
349
350         final Map<Address, Long> remoteVersions = status.getVersions();
351
352         return new Mapper<Object, Void>() {
353             @Override
354             public Void apply(Object replyMessage) {
355                 if (replyMessage instanceof GetBucketVersionsReply) {
356                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
357                     Map<Address, Long> localVersions = reply.getVersions();
358
359                     //diff between remote list and local
360                     Set<Address> localIsOlder = new HashSet<>();
361                     localIsOlder.addAll(remoteVersions.keySet());
362                     localIsOlder.removeAll(localVersions.keySet());
363
364                     //diff between local list and remote
365                     Set<Address> localIsNewer = new HashSet<>();
366                     localIsNewer.addAll(localVersions.keySet());
367                     localIsNewer.removeAll(remoteVersions.keySet());
368
369
370                     for (Address address : remoteVersions.keySet()){
371
372                         if (localVersions.get(address) == null || remoteVersions.get(address) == null)
373                             continue; //this condition is taken care of by above diffs
374                         if (localVersions.get(address) <  remoteVersions.get(address))
375                             localIsOlder.add(address);
376                         else if (localVersions.get(address) > remoteVersions.get(address))
377                             localIsNewer.add(address);
378                         else
379                             continue;
380                     }
381
382                     if (!localIsOlder.isEmpty())
383                         sendGossipStatusTo(sender, localVersions );
384
385                     if (!localIsNewer.isEmpty())
386                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
387
388                 }
389                 return null;
390             }
391         };
392     }
393
394     /**
395      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
396      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
397      * These buckets are sent to a remote member encapsulated in
398      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
399      *
400      * @param sender the remote member that sent
401      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
402      *               in reply to which bucket is being sent back
403      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
404      *
405      */
406     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
407
408         return new Mapper<Object, Void>() {
409             @Override
410             public Void apply(Object msg) {
411                 if (msg instanceof GetBucketsByMembersReply) {
412                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
413                     log.debug("Buckets to send from {}: {}", selfAddress, buckets);
414                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
415                     sender.tell(envelope, getSelf());
416                 }
417                 return null;
418             }
419         };
420     }
421
422     ///
423     ///Getter Setters
424     ///
425     List<Address> getClusterMembers() {
426         return clusterMembers;
427     }
428
429     void setClusterMembers(List<Address> clusterMembers) {
430         this.clusterMembers = clusterMembers;
431     }
432
433     Address getSelfAddress() {
434         return selfAddress;
435     }
436
437     void setSelfAddress(Address selfAddress) {
438         this.selfAddress = selfAddress;
439     }
440 }