Merge "Bug 1894: Add LISP configuration options to etc/custom.properties in Karaf"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.event.Logging;
21 import akka.event.LoggingAdapter;
22 import akka.pattern.Patterns;
23 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
24 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
25 import scala.concurrent.Future;
26 import scala.concurrent.duration.FiniteDuration;
27
28 import java.util.ArrayList;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ThreadLocalRandom;
34 import java.util.concurrent.TimeUnit;
35
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
38 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
39 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
40 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
41 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
42 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
43 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
44
45 /**
46  * Gossiper that syncs bucket store across nodes in the cluster.
47  * <p/>
48  * It keeps a local scheduler that periodically sends Gossip ticks to
49  * itself to send bucket store's bucket versions to a randomly selected remote
50  * gossiper.
51  * <p/>
52  * When bucket versions are received from a remote gossiper, it is compared
53  * with bucket store's bucket versions. Which ever buckets are newer
54  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
55  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
56  * <p/>
57  * When a bucket is received from a remote gossiper, its sent to the bucket store
58  * for update.
59  *
60  */
61
62 public class Gossiper extends AbstractUntypedActorWithMetering {
63
64     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
65
66     private Cluster cluster;
67
68     /**
69      * ActorSystem's address for the current cluster node.
70      */
71     private Address selfAddress;
72
73     /**
74      * All known cluster members
75      */
76     private List<Address> clusterMembers = new ArrayList<>();
77
78     private Cancellable gossipTask;
79
80     private Boolean autoStartGossipTicks = true;
81
82     private RemoteRpcProviderConfig config;
83
84     public Gossiper(){
85         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
86     }
87
88     /**
89      * Helpful for testing
90      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
91      *                             Gossip tick can be manually sent.
92      */
93     public Gossiper(Boolean autoStartGossipTicks){
94         this.autoStartGossipTicks = autoStartGossipTicks;
95     }
96
97     @Override
98     public void preStart(){
99         ActorRefProvider provider = getContext().provider();
100         selfAddress = provider.getDefaultAddress();
101
102         if ( provider instanceof ClusterActorRefProvider ) {
103             cluster = Cluster.get(getContext().system());
104             cluster.subscribe(getSelf(),
105                     ClusterEvent.initialStateAsEvents(),
106                     ClusterEvent.MemberEvent.class,
107                     ClusterEvent.UnreachableMember.class);
108         }
109
110         if (autoStartGossipTicks) {
111             gossipTask = getContext().system().scheduler().schedule(
112                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
113                     config.getGossipTickInterval(),                 //interval
114                     getSelf(),                                       //target
115                     new Messages.GossiperMessages.GossipTick(),      //message
116                     getContext().dispatcher(),                       //execution context
117                     getSelf()                                        //sender
118             );
119         }
120     }
121
122     @Override
123     public void postStop(){
124         if (cluster != null)
125             cluster.unsubscribe(getSelf());
126         if (gossipTask != null)
127             gossipTask.cancel();
128     }
129
130     @Override
131     protected void handleReceive(Object message) throws Exception {
132         //Usually sent by self via gossip task defined above. But its not enforced.
133         //These ticks can be sent by another actor as well which is esp. useful while testing
134         if (message instanceof GossipTick)
135             receiveGossipTick();
136
137             //Message from remote gossiper with its bucket versions
138         else if (message instanceof GossipStatus)
139             receiveGossipStatus((GossipStatus) message);
140
141             //Message from remote gossiper with buckets. This is usually in response to GossipStatus message
142             //The contained buckets are newer as determined by the remote gossiper by comparing the GossipStatus
143             //message with its local versions
144         else if (message instanceof GossipEnvelope)
145             receiveGossip((GossipEnvelope) message);
146
147         else if (message instanceof ClusterEvent.MemberUp) {
148             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
149
150         } else if (message instanceof ClusterEvent.MemberRemoved) {
151             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
152
153         } else if ( message instanceof ClusterEvent.UnreachableMember){
154             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
155
156         } else
157             unhandled(message);
158     }
159
160     /**
161      * Remove member from local copy of member list. If member down is self, then stop the actor
162      *
163      * @param member who went down
164      */
165     void receiveMemberRemoveOrUnreachable(Member member) {
166         //if its self, then stop itself
167         if (selfAddress.equals(member.address())){
168             getContext().stop(getSelf());
169             return;
170         }
171
172         clusterMembers.remove(member.address());
173         if(log.isDebugEnabled()) {
174             log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
175         }
176     }
177
178     /**
179      * Add member to the local copy of member list if it doesnt already
180      * @param member
181      */
182     void receiveMemberUp(Member member) {
183
184         if (selfAddress.equals(member.address()))
185             return; //ignore up notification for self
186
187         if (!clusterMembers.contains(member.address()))
188             clusterMembers.add(member.address());
189         if(log.isDebugEnabled()) {
190             log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
191         }
192     }
193
194     /**
195      * Sends Gossip status to other members in the cluster. <br/>
196      * 1. If there are no member, ignore the tick. </br>
197      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
198      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
199      */
200     void receiveGossipTick(){
201         if (clusterMembers.size() == 0) return; //no members to send gossip status to
202
203         Address remoteMemberToGossipTo;
204
205         if (clusterMembers.size() == 1)
206             remoteMemberToGossipTo = clusterMembers.get(0);
207         else {
208             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
209             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
210         }
211         if(log.isDebugEnabled()) {
212             log.debug("Gossiping to [{}]", remoteMemberToGossipTo);
213         }
214         getLocalStatusAndSendTo(remoteMemberToGossipTo);
215     }
216
217     /**
218      * Process gossip status received from a remote gossiper. Remote versions are compared with
219      * the local copy. <p>
220      *
221      * For each bucket
222      * <ul>
223      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
224      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
225      *  <li>If both are same, noop</li>
226      * </ul>
227      *
228      * @param status bucket versions from a remote member
229      */
230     void receiveGossipStatus(GossipStatus status){
231         //Don't accept messages from non-members
232         if (!clusterMembers.contains(status.from()))
233             return;
234
235         final ActorRef sender = getSender();
236         Future<Object> futureReply =
237                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
238
239         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
240
241     }
242
243     /**
244      * Sends the received buckets in the envelope to the parent Bucket store.
245      *
246      * @param envelope contains buckets from a remote gossiper
247      */
248     void receiveGossip(GossipEnvelope envelope){
249         //TODO: Add more validations
250         if (!selfAddress.equals(envelope.to())) {
251             if(log.isDebugEnabled()) {
252                 log.debug("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
253             }
254             return;
255         }
256
257         updateRemoteBuckets(envelope.getBuckets());
258
259     }
260
261     /**
262      * Helper to send received buckets to bucket store
263      *
264      * @param buckets
265      */
266     void updateRemoteBuckets(Map<Address, Bucket> buckets) {
267
268         UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
269         getContext().parent().tell(updateRemoteBuckets, getSelf());
270     }
271
272     /**
273      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
274      *
275      * @param remote     remote node to send Buckets to
276      * @param addresses  node addresses whose buckets needs to be sent
277      */
278     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
279
280         Future<Object> futureReply =
281                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
282         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
283     }
284
285     /**
286      * Gets bucket versions from bucket store and sends to the supplied address
287      *
288      * @param remoteActorSystemAddress remote gossiper to send to
289      */
290     void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
291
292         //Get local status from bucket store and send to remote
293         Future<Object> futureReply =
294                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
295
296         //Find gossiper on remote system
297         ActorSelection remoteRef = getContext().system().actorSelection(
298                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
299
300         if(log.isDebugEnabled()) {
301             log.debug("Sending bucket versions to [{}]", remoteRef);
302         }
303
304         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
305
306     }
307
308     /**
309      * Helper to send bucket versions received from local store
310      * @param remote        remote gossiper to send versions to
311      * @param localVersions bucket versions received from local store
312      */
313     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
314
315         GossipStatus status = new GossipStatus(selfAddress, localVersions);
316         remote.tell(status, getSelf());
317     }
318
319     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
320
321         GossipStatus status = new GossipStatus(selfAddress, localVersions);
322         remote.tell(status, getSelf());
323     }
324
325     ///
326     /// Private factories to create mappers
327     ///
328
329     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
330
331         return new Mapper<Object, Void>() {
332             @Override
333             public Void apply(Object replyMessage) {
334                 if (replyMessage instanceof GetBucketVersionsReply) {
335                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
336                     Map<Address, Long> localVersions = reply.getVersions();
337
338                     sendGossipStatusTo(remote, localVersions);
339
340                 }
341                 return null;
342             }
343         };
344     }
345
346     /**
347      * Process bucket versions received from
348      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
349      * Then this method compares remote bucket versions with local bucket versions.
350      * <ul>
351      *     <li>The buckets that are newer locally, send
352      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
353      *     to remote
354      *     <li>The buckets that are older locally, send
355      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
356      *     to remote so that remote sends GossipEnvelop.
357      * </ul>
358      *
359      * @param sender the remote member
360      * @param status bucket versions from a remote member
361      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
362      *
363      */
364     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
365
366         final Map<Address, Long> remoteVersions = status.getVersions();
367
368         return new Mapper<Object, Void>() {
369             @Override
370             public Void apply(Object replyMessage) {
371                 if (replyMessage instanceof GetBucketVersionsReply) {
372                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
373                     Map<Address, Long> localVersions = reply.getVersions();
374
375                     //diff between remote list and local
376                     Set<Address> localIsOlder = new HashSet<>();
377                     localIsOlder.addAll(remoteVersions.keySet());
378                     localIsOlder.removeAll(localVersions.keySet());
379
380                     //diff between local list and remote
381                     Set<Address> localIsNewer = new HashSet<>();
382                     localIsNewer.addAll(localVersions.keySet());
383                     localIsNewer.removeAll(remoteVersions.keySet());
384
385
386                     for (Address address : remoteVersions.keySet()){
387
388                         if (localVersions.get(address) == null || remoteVersions.get(address) == null)
389                             continue; //this condition is taken care of by above diffs
390                         if (localVersions.get(address) <  remoteVersions.get(address))
391                             localIsOlder.add(address);
392                         else if (localVersions.get(address) > remoteVersions.get(address))
393                             localIsNewer.add(address);
394                     }
395
396                     if (!localIsOlder.isEmpty())
397                         sendGossipStatusTo(sender, localVersions );
398
399                     if (!localIsNewer.isEmpty())
400                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
401
402                 }
403                 return null;
404             }
405         };
406     }
407
408     /**
409      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
410      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
411      * These buckets are sent to a remote member encapsulated in
412      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
413      *
414      * @param sender the remote member that sent
415      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
416      *               in reply to which bucket is being sent back
417      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
418      *
419      */
420     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
421
422         return new Mapper<Object, Void>() {
423             @Override
424             public Void apply(Object msg) {
425                 if (msg instanceof GetBucketsByMembersReply) {
426                     Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
427                     if(log.isDebugEnabled()) {
428                         log.debug("Buckets to send from {}: {}", selfAddress, buckets);
429                     }
430                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
431                     sender.tell(envelope, getSelf());
432                 }
433                 return null;
434             }
435         };
436     }
437
438     ///
439     ///Getter Setters
440     ///
441     List<Address> getClusterMembers() {
442         return clusterMembers;
443     }
444
445     void setClusterMembers(List<Address> clusterMembers) {
446         this.clusterMembers = clusterMembers;
447     }
448
449     Address getSelfAddress() {
450         return selfAddress;
451     }
452
453     void setSelfAddress(Address selfAddress) {
454         this.selfAddress = selfAddress;
455     }
456 }