Fixed possible NPE in TopologyManager FlowCapableTopologyExporter
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorSelection;
12 import akka.actor.Address;
13 import akka.actor.Cancellable;
14 import akka.actor.UntypedActor;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterEvent;
17 import akka.cluster.Member;
18 import akka.dispatch.Mapper;
19 import akka.event.Logging;
20 import akka.event.LoggingAdapter;
21 import akka.pattern.Patterns;
22 import scala.concurrent.Future;
23 import scala.concurrent.duration.FiniteDuration;
24
25 import java.util.ArrayList;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
32
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
41
42 /**
43  * Gossiper that syncs bucket store across nodes in the cluster.
44  * <p/>
45  * It keeps a local scheduler that periodically sends Gossip ticks to
46  * itself to send bucket store's bucket versions to a randomly selected remote
47  * gossiper.
48  * <p/>
49  * When bucket versions are received from a remote gossiper, it is compared
50  * with bucket store's bucket versions. Which ever buckets are newer
51  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
52  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
53  * <p/>
54  * When a bucket is received from a remote gossiper, its sent to the bucket store
55  * for update.
56  *
57  */
58
59 public class Gossiper extends UntypedActor {
60
61     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
62
63     Cluster cluster = Cluster.get(getContext().system());
64
65     /**
66      * ActorSystem's address for the current cluster node.
67      */
68     private Address selfAddress = cluster.selfAddress();
69
70     /**
71      * All known cluster members
72      */
73     private List<Address> clusterMembers = new ArrayList<>();
74
75     private Cancellable gossipTask;
76
77     private Boolean autoStartGossipTicks = true;
78
79     public Gossiper(){}
80
81     /**
82      * Helpful for testing
83      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
84      *                             Gossip tick can be manually sent.
85      */
86     public Gossiper(Boolean autoStartGossipTicks){
87         this.autoStartGossipTicks = autoStartGossipTicks;
88     }
89
90     @Override
91     public void preStart(){
92
93         cluster.subscribe(getSelf(),
94                           ClusterEvent.initialStateAsEvents(),
95                           ClusterEvent.MemberEvent.class,
96                           ClusterEvent.UnreachableMember.class);
97
98         if (autoStartGossipTicks) {
99             gossipTask = getContext().system().scheduler().schedule(
100                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
101                     new FiniteDuration(500, TimeUnit.MILLISECONDS), //interval
102                     getSelf(),                                       //target
103                     new Messages.GossiperMessages.GossipTick(),      //message
104                     getContext().dispatcher(),                       //execution context
105                     getSelf()                                        //sender
106             );
107         }
108     }
109
110     @Override
111     public void postStop(){
112         if (cluster != null)
113             cluster.unsubscribe(getSelf());
114         if (gossipTask != null)
115             gossipTask.cancel();
116     }
117
118     @Override
119     public void onReceive(Object message) throws Exception {
120
121         log.debug("Received message: node[{}], message[{}]", selfAddress, message);
122
123         //Usually sent by self via gossip task defined above. But its not enforced.
124         //These ticks can be sent by another actor as well which is esp. useful while testing
125         if (message instanceof GossipTick)
126             receiveGossipTick();
127
128         //Message from remote gossiper with its bucket versions
129         else if (message instanceof GossipStatus)
130             receiveGossipStatus((GossipStatus) message);
131
132         //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
133         //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
134         //message with its local versions
135         else if (message instanceof GossipEnvelope)
136             receiveGossip((GossipEnvelope) message);
137
138         else if (message instanceof ClusterEvent.MemberUp) {
139             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
140
141         } else if (message instanceof ClusterEvent.MemberRemoved) {
142             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
143
144         } else if ( message instanceof ClusterEvent.UnreachableMember){
145             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
146
147         } else
148             unhandled(message);
149     }
150
151     /**
152      * Remove member from local copy of member list. If member down is self, then stop the actor
153      *
154      * @param member who went down
155      */
156     void receiveMemberRemoveOrUnreachable(Member member) {
157         //if its self, then stop itself
158         if (selfAddress.equals(member.address())){
159             getContext().stop(getSelf());
160             return;
161         }
162
163         clusterMembers.remove(member.address());
164         log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
165     }
166
167     /**
168      * Add member to the local copy of member list if it doesnt already
169      * @param member
170      */
171     void receiveMemberUp(Member member) {
172
173         if (selfAddress.equals(member.address()))
174             return; //ignore up notification for self
175
176         if (!clusterMembers.contains(member.address()))
177             clusterMembers.add(member.address());
178
179         log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
180     }
181
182     /**
183      * Sends Gossip status to other members in the cluster. <br/>
184      * 1. If there are no member, ignore the tick. </br>
185      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
186      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
187      */
188     void receiveGossipTick(){
189         if (clusterMembers.size() == 0) return; //no members to send gossip status to
190
191         Address remoteMemberToGossipTo = null;
192
193         if (clusterMembers.size() == 1)
194             remoteMemberToGossipTo = clusterMembers.get(0);
195         else {
196             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
197             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
198         }
199
200         log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
201         getLocalStatusAndSendTo(remoteMemberToGossipTo);
202     }
203
204     /**
205      * Process gossip status received from a remote gossiper. Remote versions are compared with
206      * the local copy. <p>
207      *
208      * For each bucket
209      * <ul>
210      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
211      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
212      *  <li>If both are same, noop</li>
213      * </ul>
214      *
215      * @param status bucket versions from a remote member
216      */
217     void receiveGossipStatus(GossipStatus status){
218         //Don't accept messages from non-members
219         if (!clusterMembers.contains(status.from()))
220             return;
221
222         final ActorRef sender = getSender();
223         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
224         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
225
226     }
227
228     /**
229      * Sends the received buckets in the envelope to the parent Bucket store.
230      *
231      * @param envelope contains buckets from a remote gossiper
232      */
233     void receiveGossip(GossipEnvelope envelope){
234         //TODO: Add more validations
235         if (!selfAddress.equals(envelope.to())) {
236             log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
237             return;
238         }
239
240         updateRemoteBuckets(envelope.getBuckets());
241
242     }
243
244     /**
245      * Helper to send received buckets to bucket store
246      *
247      * @param buckets
248      */
249     void updateRemoteBuckets(Map<Address, Bucket> buckets) {
250
251         UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
252         getContext().parent().tell(updateRemoteBuckets, getSelf());
253     }
254
255     /**
256      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
257      *
258      * @param remote     remote node to send Buckets to
259      * @param addresses  node addresses whose buckets needs to be sent
260      */
261     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
262
263         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), 1000);
264         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
265     }
266
267     /**
268      * Gets bucket versions from bucket store and sends to the supplied address
269      *
270      * @param remoteActorSystemAddress remote gossiper to send to
271      */
272     void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
273
274         //Get local status from bucket store and send to remote
275         Future<Object> futureReply = Patterns.ask(getContext().parent(), new GetBucketVersions(), 1000);
276         ActorSelection remoteRef = getContext().system().actorSelection(
277                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
278
279         log.debug("Sending bucket versions to [{}]", remoteRef);
280
281         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
282
283     }
284
285     /**
286      * Helper to send bucket versions received from local store
287      * @param remote        remote gossiper to send versions to
288      * @param localVersions bucket versions received from local store
289      */
290     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
291
292         GossipStatus status = new GossipStatus(selfAddress, localVersions);
293         remote.tell(status, getSelf());
294     }
295
296     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
297
298         GossipStatus status = new GossipStatus(selfAddress, localVersions);
299         remote.tell(status, getSelf());
300     }
301
302     ///
303     /// Private factories to create mappers
304     ///
305
306     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
307
308         return new Mapper<Object, Void>() {
309             @Override
310             public Void apply(Object replyMessage) {
311                 if (replyMessage instanceof GetBucketVersionsReply) {
312                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
313                     Map<Address, Long> localVersions = reply.getVersions();
314
315                     sendGossipStatusTo(remote, localVersions);
316
317                 }
318                 return null;
319             }
320         };
321     }
322
323     /**
324      * Process bucket versions received from
325      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
326      * Then this method compares remote bucket versions with local bucket versions.
327      * <ul>
328      *     <li>The buckets that are newer locally, send
329      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
330      *     to remote
331      *     <li>The buckets that are older locally, send
332      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
333      *     to remote so that remote sends GossipEnvelop.
334      * </ul>
335      *
336      * @param sender the remote member
337      * @param status bucket versions from a remote member
338      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
339      *
340      */
341     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
342
343         final Map<Address, Long> remoteVersions = status.getVersions();
344
345         return new Mapper<Object, Void>() {
346             @Override
347             public Void apply(Object replyMessage) {
348                 if (replyMessage instanceof GetBucketVersionsReply) {
349                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
350                     Map<Address, Long> localVersions = reply.getVersions();
351
352                     //diff between remote list and local
353                     Set<Address> localIsOlder = new HashSet<>();
354                     localIsOlder.addAll(remoteVersions.keySet());
355                     localIsOlder.removeAll(localVersions.keySet());
356
357                     //diff between local list and remote
358                     Set<Address> localIsNewer = new HashSet<>();
359                     localIsNewer.addAll(localVersions.keySet());
360                     localIsNewer.removeAll(remoteVersions.keySet());
361
362
363                     for (Address address : remoteVersions.keySet()){
364
365                         if (localVersions.get(address) == null || remoteVersions.get(address) == null)
366                             continue; //this condition is taken care of by above diffs
367                         if (localVersions.get(address) <  remoteVersions.get(address))
368                             localIsOlder.add(address);
369                         else if (localVersions.get(address) > remoteVersions.get(address))
370                             localIsNewer.add(address);
371                         else
372                             continue;
373                     }
374
375                     if (!localIsOlder.isEmpty())
376                         sendGossipStatusTo(sender, localVersions );
377
378                     if (!localIsNewer.isEmpty())
379                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
380
381                 }
382                 return null;
383             }
384         };
385     }
386
387     /**
388      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
389      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
390      * These buckets are sent to a remote member encapsulated in
391      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
392      *
393      * @param sender the remote member that sent
394      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
395      *               in reply to which bucket is being sent back
396      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
397      *
398      */
399     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
400
401         return new Mapper<Object, Void>() {
402             @Override
403             public Void apply(Object msg) {
404                 if (msg instanceof GetBucketsByMembersReply) {
405                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
406                     log.debug("Buckets to send from {}: {}", selfAddress, buckets);
407                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
408                     sender.tell(envelope, getSelf());
409                 }
410                 return null;
411             }
412         };
413     }
414
415     ///
416     ///Getter Setters
417     ///
418     List<Address> getClusterMembers() {
419         return clusterMembers;
420     }
421
422     void setClusterMembers(List<Address> clusterMembers) {
423         this.clusterMembers = clusterMembers;
424     }
425
426     Address getSelfAddress() {
427         return selfAddress;
428     }
429
430     void setSelfAddress(Address selfAddress) {
431         this.selfAddress = selfAddress;
432     }
433 }