BUG-3128: rework sal-remoterpc-connector
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.pattern.Patterns;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashSet;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
32 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
33 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
41 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
42 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
43 import scala.concurrent.Future;
44 import scala.concurrent.duration.FiniteDuration;
45
46 /**
47  * Gossiper that syncs bucket store across nodes in the cluster.
48  *
49  * <p>
50  * It keeps a local scheduler that periodically sends Gossip ticks to
51  * itself to send bucket store's bucket versions to a randomly selected remote
52  * gossiper.
53  *
54  * <p>
55  * When bucket versions are received from a remote gossiper, it is compared
56  * with bucket store's bucket versions. Which ever buckets are newer
57  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
58  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
59  *
60  * <p>
61  * When a bucket is received from a remote gossiper, its sent to the bucket store
62  * for update.
63  */
64 public class Gossiper extends AbstractUntypedActorWithMetering {
65     private final boolean autoStartGossipTicks;
66     private final RemoteRpcProviderConfig config;
67
68     /**
69      * All known cluster members.
70      */
71     private final List<Address> clusterMembers = new ArrayList<>();
72
73     /**
74      * ActorSystem's address for the current cluster node.
75      */
76     private Address selfAddress;
77
78     private Cluster cluster;
79
80     private Cancellable gossipTask;
81
82     Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
83         this.config = Preconditions.checkNotNull(config);
84         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
85     }
86
87     Gossiper(final RemoteRpcProviderConfig config) {
88         this(config, Boolean.TRUE);
89     }
90
91     public static Props props(final RemoteRpcProviderConfig config) {
92         return Props.create(Gossiper.class, config);
93     }
94
95     static Props testProps(final RemoteRpcProviderConfig config) {
96         return Props.create(Gossiper.class, config, Boolean.FALSE);
97     }
98
99     @Override
100     public void preStart() {
101         ActorRefProvider provider = getContext().provider();
102         selfAddress = provider.getDefaultAddress();
103
104         if (provider instanceof ClusterActorRefProvider ) {
105             cluster = Cluster.get(getContext().system());
106             cluster.subscribe(getSelf(),
107                     ClusterEvent.initialStateAsEvents(),
108                     ClusterEvent.MemberEvent.class,
109                     ClusterEvent.ReachableMember.class,
110                     ClusterEvent.UnreachableMember.class);
111         }
112
113         if (autoStartGossipTicks) {
114             gossipTask = getContext().system().scheduler().schedule(
115                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
116                     config.getGossipTickInterval(),                 //interval
117                     getSelf(),                                      //target
118                     new Messages.GossiperMessages.GossipTick(),     //message
119                     getContext().dispatcher(),                      //execution context
120                     getSelf()                                       //sender
121             );
122         }
123     }
124
125     @Override
126     public void postStop() {
127         if (cluster != null) {
128             cluster.unsubscribe(getSelf());
129         }
130         if (gossipTask != null) {
131             gossipTask.cancel();
132         }
133     }
134
135     @SuppressWarnings({ "rawtypes", "unchecked" })
136     @Override
137     protected void handleReceive(final Object message) throws Exception {
138         //Usually sent by self via gossip task defined above. But its not enforced.
139         //These ticks can be sent by another actor as well which is esp. useful while testing
140         if (message instanceof GossipTick) {
141             receiveGossipTick();
142         } else if (message instanceof GossipStatus) {
143             // Message from remote gossiper with its bucket versions
144             receiveGossipStatus((GossipStatus) message);
145         } else if (message instanceof GossipEnvelope) {
146             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
147             // message. The contained buckets are newer as determined by the remote gossiper by
148             // comparing the GossipStatus message with its local versions.
149             receiveGossip((GossipEnvelope) message);
150         } else if (message instanceof ClusterEvent.MemberUp) {
151             receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
152
153         } else if (message instanceof ClusterEvent.ReachableMember) {
154             receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
155
156         } else if (message instanceof ClusterEvent.MemberRemoved) {
157             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
158
159         } else if (message instanceof ClusterEvent.UnreachableMember) {
160             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
161
162         } else {
163             unhandled(message);
164         }
165     }
166
167     /**
168      * Remove member from local copy of member list. If member down is self, then stop the actor
169      *
170      * @param member who went down
171      */
172     private void receiveMemberRemoveOrUnreachable(final Member member) {
173         //if its self, then stop itself
174         if (selfAddress.equals(member.address())) {
175             getContext().stop(getSelf());
176             return;
177         }
178
179         clusterMembers.remove(member.address());
180         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
181
182         getContext().parent().tell(new RemoveRemoteBucket(member.address()), ActorRef.noSender());
183     }
184
185     /**
186      * Add member to the local copy of member list if it doesn't already.
187      *
188      * @param member the member to add
189      */
190     private void receiveMemberUpOrReachable(final Member member) {
191         //ignore up notification for self
192         if (selfAddress.equals(member.address())) {
193             return;
194         }
195
196         if (!clusterMembers.contains(member.address())) {
197             clusterMembers.add(member.address());
198         }
199
200         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
201     }
202
203     /**
204      * Sends Gossip status to other members in the cluster.
205      * <br>
206      * 1. If there are no member, ignore the tick. <br>
207      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
208      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
209      */
210     @VisibleForTesting
211     void receiveGossipTick() {
212         final Address remoteMemberToGossipTo;
213         switch (clusterMembers.size()) {
214             case 0:
215                 //no members to send gossip status to
216                 return;
217             case 1:
218                 remoteMemberToGossipTo = clusterMembers.get(0);
219                 break;
220             default:
221                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
222                 remoteMemberToGossipTo = clusterMembers.get(randomIndex);
223                 break;
224         }
225
226         LOG.trace("Gossiping to [{}]", remoteMemberToGossipTo);
227         getLocalStatusAndSendTo(remoteMemberToGossipTo);
228     }
229
230     /**
231      * Process gossip status received from a remote gossiper. Remote versions are compared with
232      * the local copy.
233      * <p/>
234      * For each bucket
235      * <ul>
236      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
237      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
238      *  <li>If both are same, noop</li>
239      * </ul>
240      *
241      * @param status bucket versions from a remote member
242      */
243     @VisibleForTesting
244     void receiveGossipStatus(final GossipStatus status) {
245         //Don't accept messages from non-members
246         if (!clusterMembers.contains(status.from())) {
247             return;
248         }
249
250         final ActorRef sender = getSender();
251         Future<Object> futureReply =
252                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
253
254         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
255     }
256
257     /**
258      * Sends the received buckets in the envelope to the parent Bucket store.
259      *
260      * @param envelope contains buckets from a remote gossiper
261      */
262     @VisibleForTesting
263     <T extends Copier<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
264         //TODO: Add more validations
265         if (!selfAddress.equals(envelope.to())) {
266             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
267             return;
268         }
269
270         updateRemoteBuckets(envelope.getBuckets());
271     }
272
273     /**
274      * Helper to send received buckets to bucket store.
275      *
276      * @param buckets map of Buckets to update
277      */
278     @VisibleForTesting
279     <T extends Copier<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
280         getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
281     }
282
283     /**
284      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
285      *
286      * @param remote     remote node to send Buckets to
287      * @param addresses  node addresses whose buckets needs to be sent
288      */
289     void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
290
291         Future<Object> futureReply =
292                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
293         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
294     }
295
296     /**
297      * Gets bucket versions from bucket store and sends to the supplied address.
298      *
299      * @param remoteActorSystemAddress remote gossiper to send to
300      */
301     @VisibleForTesting
302     void getLocalStatusAndSendTo(final Address remoteActorSystemAddress) {
303
304         //Get local status from bucket store and send to remote
305         Future<Object> futureReply =
306                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
307
308         //Find gossiper on remote system
309         ActorSelection remoteRef = getContext().system().actorSelection(
310                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
311
312         LOG.trace("Sending bucket versions to [{}]", remoteRef);
313
314         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
315     }
316
317     /**
318      * Helper to send bucket versions received from local store.
319      *
320      * @param remote        remote gossiper to send versions to
321      * @param localVersions bucket versions received from local store
322      */
323     void sendGossipStatusTo(final ActorRef remote, final Map<Address, Long> localVersions) {
324
325         GossipStatus status = new GossipStatus(selfAddress, localVersions);
326         remote.tell(status, getSelf());
327     }
328
329     void sendGossipStatusTo(final ActorSelection remote, final Map<Address, Long> localVersions) {
330
331         GossipStatus status = new GossipStatus(selfAddress, localVersions);
332         remote.tell(status, getSelf());
333     }
334
335     ///
336     /// Private factories to create mappers
337     ///
338
339     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
340
341         return new Mapper<Object, Void>() {
342             @Override
343             public Void apply(final Object replyMessage) {
344                 if (replyMessage instanceof GetBucketVersionsReply) {
345                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
346                     Map<Address, Long> localVersions = reply.getVersions();
347
348                     sendGossipStatusTo(remote, localVersions);
349
350                 }
351                 return null;
352             }
353         };
354     }
355
356     /**
357      * Process bucket versions received from
358      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
359      * Then this method compares remote bucket versions with local bucket versions.
360      * <ul>
361      *     <li>The buckets that are newer locally, send
362      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
363      *     to remote
364      *     <li>The buckets that are older locally, send
365      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
366      *     to remote so that remote sends GossipEnvelop.
367      * </ul>
368      *
369      * @param sender the remote member
370      * @param status bucket versions from a remote member
371      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
372      *
373      */
374     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
375
376         final Map<Address, Long> remoteVersions = status.getVersions();
377
378         return new Mapper<Object, Void>() {
379             @Override
380             public Void apply(final Object replyMessage) {
381                 if (replyMessage instanceof GetBucketVersionsReply) {
382                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
383                     Map<Address, Long> localVersions = reply.getVersions();
384
385                     //diff between remote list and local
386                     Set<Address> localIsOlder = new HashSet<>();
387                     localIsOlder.addAll(remoteVersions.keySet());
388                     localIsOlder.removeAll(localVersions.keySet());
389
390                     //diff between local list and remote
391                     Set<Address> localIsNewer = new HashSet<>();
392                     localIsNewer.addAll(localVersions.keySet());
393                     localIsNewer.removeAll(remoteVersions.keySet());
394
395
396                     for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
397                         Address address = entry.getKey();
398                         Long remoteVersion = entry.getValue();
399                         Long localVersion = localVersions.get(address);
400                         if (localVersion == null || remoteVersion == null) {
401                             //this condition is taken care of by above diffs
402                             continue;
403                         }
404
405                         if (localVersion < remoteVersion) {
406                             localIsOlder.add(address);
407                         } else if (localVersion > remoteVersion) {
408                             localIsNewer.add(address);
409                         }
410                     }
411
412                     if (!localIsOlder.isEmpty()) {
413                         sendGossipStatusTo(sender, localVersions);
414                     }
415
416                     if (!localIsNewer.isEmpty()) {
417                         //send newer buckets to remote
418                         sendGossipTo(sender, localIsNewer);
419                     }
420                 }
421                 return null;
422             }
423         };
424     }
425
426     /**
427      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
428      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
429      * These buckets are sent to a remote member encapsulated in
430      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
431      *
432      * @param sender the remote member that sent
433      *           {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
434      *           in reply to which bucket is being sent back
435      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
436      *
437      */
438     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
439
440         return new Mapper<Object, Void>() {
441             @SuppressWarnings({ "rawtypes", "unchecked" })
442             @Override
443             public Void apply(final Object msg) {
444                 if (msg instanceof GetBucketsByMembersReply) {
445                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
446                     LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
447                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
448                     sender.tell(envelope, getSelf());
449                 }
450                 return null;
451             }
452         };
453     }
454
455     ///
456     ///Getter Setters
457     ///
458
459     @VisibleForTesting
460     void setClusterMembers(final Address... members) {
461         clusterMembers.clear();
462         clusterMembers.addAll(Arrays.asList(members));
463     }
464 }