BUG-7573: add BucketStore source monitoring
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import akka.dispatch.Mapper;
21 import akka.pattern.Patterns;
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.base.Preconditions;
24 import com.google.common.base.Verify;
25 import java.util.ArrayList;
26 import java.util.HashMap;
27 import java.util.HashSet;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31 import java.util.concurrent.ThreadLocalRandom;
32 import java.util.concurrent.TimeUnit;
33 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
34 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
41 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
42 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
43 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
44 import scala.concurrent.Future;
45 import scala.concurrent.duration.FiniteDuration;
46
47 /**
48  * Gossiper that syncs bucket store across nodes in the cluster.
49  * <p/>
50  * It keeps a local scheduler that periodically sends Gossip ticks to
51  * itself to send bucket store's bucket versions to a randomly selected remote
52  * gossiper.
53  * <p/>
54  * When bucket versions are received from a remote gossiper, it is compared
55  * with bucket store's bucket versions. Which ever buckets are newer
56  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
57  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
58  * <p/>
59  * When a bucket is received from a remote gossiper, its sent to the bucket store
60  * for update.
61  *
62  */
63
64 public class Gossiper extends AbstractUntypedActorWithMetering {
65     private final boolean autoStartGossipTicks;
66     private final RemoteRpcProviderConfig config;
67
68     /**
69      * All known cluster members.
70      */
71     private final List<Address> clusterMembers = new ArrayList<>();
72
73     /**
74      * Cached ActorSelections for remote peers.
75      */
76     private final Map<Address, ActorSelection> peers = new HashMap<>();
77
78     /**
79      * ActorSystem's address for the current cluster node.
80      */
81     private Address selfAddress;
82
83     private Cluster cluster;
84
85     private Cancellable gossipTask;
86
87     Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
88         this.config = Preconditions.checkNotNull(config);
89         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
90     }
91
92     Gossiper(final RemoteRpcProviderConfig config) {
93         this(config, Boolean.TRUE);
94     }
95
96     public static Props props(final RemoteRpcProviderConfig config) {
97         return Props.create(Gossiper.class, config);
98     }
99
100     static Props testProps(final RemoteRpcProviderConfig config) {
101         return Props.create(Gossiper.class, config, Boolean.FALSE);
102     }
103
104     @Override
105     public void preStart(){
106         ActorRefProvider provider = getContext().provider();
107         selfAddress = provider.getDefaultAddress();
108
109         if (provider instanceof ClusterActorRefProvider ) {
110             cluster = Cluster.get(getContext().system());
111             cluster.subscribe(getSelf(),
112                     ClusterEvent.initialStateAsEvents(),
113                     ClusterEvent.MemberEvent.class,
114                     ClusterEvent.ReachableMember.class,
115                     ClusterEvent.UnreachableMember.class);
116         }
117
118         if (autoStartGossipTicks) {
119             gossipTask = getContext().system().scheduler().schedule(
120                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
121                     config.getGossipTickInterval(),                 //interval
122                     getSelf(),                                      //target
123                     new Messages.GossiperMessages.GossipTick(),     //message
124                     getContext().dispatcher(),                      //execution context
125                     getSelf()                                       //sender
126             );
127         }
128     }
129
130     @Override
131     public void postStop(){
132         if (cluster != null) {
133             cluster.unsubscribe(getSelf());
134         }
135         if (gossipTask != null) {
136             gossipTask.cancel();
137         }
138     }
139
140     @Override
141     protected void handleReceive(final Object message) throws Exception {
142         //Usually sent by self via gossip task defined above. But its not enforced.
143         //These ticks can be sent by another actor as well which is esp. useful while testing
144         if (message instanceof GossipTick) {
145             receiveGossipTick();
146         } else if (message instanceof GossipStatus) {
147             // Message from remote gossiper with its bucket versions
148             receiveGossipStatus((GossipStatus) message);
149         } else if (message instanceof GossipEnvelope) {
150             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
151             // message. The contained buckets are newer as determined by the remote gossiper by
152             // comparing the GossipStatus message with its local versions.
153             receiveGossip((GossipEnvelope) message);
154         } else if (message instanceof ClusterEvent.MemberUp) {
155             receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
156
157         } else if (message instanceof ClusterEvent.ReachableMember) {
158             receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
159
160         } else if (message instanceof ClusterEvent.MemberRemoved) {
161             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
162
163         } else if (message instanceof ClusterEvent.UnreachableMember) {
164             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
165
166         } else {
167             unhandled(message);
168         }
169     }
170
171     /**
172      * Remove member from local copy of member list. If member down is self, then stop the actor
173      *
174      * @param member who went down
175      */
176     private void receiveMemberRemoveOrUnreachable(final Member member) {
177         //if its self, then stop itself
178         if (selfAddress.equals(member.address())){
179             getContext().stop(getSelf());
180             return;
181         }
182
183         removePeer(member.address());
184         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
185     }
186
187     private void addPeer(final Address address) {
188         if (!clusterMembers.contains(address)) {
189             clusterMembers.add(address);
190         }
191         peers.computeIfAbsent(address, input -> getContext().system()
192             .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
193     }
194
195     private void removePeer(final Address address) {
196         clusterMembers.remove(address);
197         peers.remove(address);
198         getContext().parent().tell(new RemoveRemoteBucket(address), ActorRef.noSender());
199     }
200
201     /**
202      * Add member to the local copy of member list if it doesnt already
203      * @param member
204      */
205     private void receiveMemberUpOrReachable(final Member member) {
206         //ignore up notification for self
207         if (selfAddress.equals(member.address())) {
208             return;
209         }
210
211         addPeer(member.address());
212         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
213     }
214
215     /**
216      * Sends Gossip status to other members in the cluster. <br/>
217      * 1. If there are no member, ignore the tick. </br>
218      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
219      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
220      */
221     @VisibleForTesting
222     void receiveGossipTick() {
223         final Address address;
224         switch (clusterMembers.size()) {
225             case 0:
226                 //no members to send gossip status to
227                 return;
228             case 1:
229                 address = clusterMembers.get(0);
230                 break;
231             default:
232                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
233                 address = clusterMembers.get(randomIndex);
234                 break;
235         }
236
237         LOG.trace("Gossiping to [{}]", address);
238         getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
239     }
240
241     /**
242      * Process gossip status received from a remote gossiper. Remote versions are compared with
243      * the local copy. <p>
244      *
245      * For each bucket
246      * <ul>
247      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
248      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
249      *  <li>If both are same, noop</li>
250      * </ul>
251      *
252      * @param status bucket versions from a remote member
253      */
254     @VisibleForTesting
255     void receiveGossipStatus(final GossipStatus status) {
256         // Don't accept messages from non-members
257         if (!peers.containsKey(status.from())) {
258             return;
259         }
260
261         final ActorRef sender = getSender();
262         Future<Object> futureReply =
263                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
264
265         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
266     }
267
268     /**
269      * Sends the received buckets in the envelope to the parent Bucket store.
270      *
271      * @param envelope contains buckets from a remote gossiper
272      */
273     @VisibleForTesting
274     <T extends BucketData<T>> void receiveGossip(final GossipEnvelope<T> envelope) {
275         //TODO: Add more validations
276         if (!selfAddress.equals(envelope.to())) {
277             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
278             return;
279         }
280
281         updateRemoteBuckets(envelope.getBuckets());
282     }
283
284     /**
285      * Helper to send received buckets to bucket store
286      *
287      * @param buckets
288      */
289     @VisibleForTesting
290     <T extends BucketData<T>> void updateRemoteBuckets(final Map<Address, Bucket<T>> buckets) {
291         getContext().parent().tell(new UpdateRemoteBuckets<>(buckets), getSelf());
292     }
293
294     /**
295      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
296      *
297      * @param remote     remote node to send Buckets to
298      * @param addresses  node addresses whose buckets needs to be sent
299      */
300     void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
301
302         Future<Object> futureReply =
303                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
304         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
305     }
306
307     /**
308      * Gets bucket versions from bucket store and sends to the supplied address
309      *
310      * @param remoteActorSystemAddress remote gossiper to send to
311      */
312     @VisibleForTesting
313     void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
314
315         //Get local status from bucket store and send to remote
316         Future<Object> futureReply =
317                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
318
319         LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
320
321         futureReply.map(getMapperToSendLocalStatus(remoteGossiper), getContext().dispatcher());
322     }
323
324     ///
325     /// Private factories to create mappers
326     ///
327
328     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
329
330         return new Mapper<Object, Void>() {
331             @Override
332             public Void apply(final Object replyMessage) {
333                 if (replyMessage instanceof GetBucketVersionsReply) {
334                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
335                     Map<Address, Long> localVersions = reply.getVersions();
336
337                     remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
338                 }
339                 return null;
340             }
341         };
342     }
343
344     /**
345      * Process bucket versions received from
346      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
347      * Then this method compares remote bucket versions with local bucket versions.
348      * <ul>
349      *     <li>The buckets that are newer locally, send
350      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
351      *     to remote
352      *     <li>The buckets that are older locally, send
353      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
354      *     to remote so that remote sends GossipEnvelop.
355      * </ul>
356      *
357      * @param sender the remote member
358      * @param status bucket versions from a remote member
359      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
360      *
361      */
362     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
363
364         final Map<Address, Long> remoteVersions = status.getVersions();
365
366         return new Mapper<Object, Void>() {
367             @Override
368             public Void apply(final Object replyMessage) {
369                 if (replyMessage instanceof GetBucketVersionsReply) {
370                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
371                     Map<Address, Long> localVersions = reply.getVersions();
372
373                     //diff between remote list and local
374                     Set<Address> localIsOlder = new HashSet<>();
375                     localIsOlder.addAll(remoteVersions.keySet());
376                     localIsOlder.removeAll(localVersions.keySet());
377
378                     //diff between local list and remote
379                     Set<Address> localIsNewer = new HashSet<>();
380                     localIsNewer.addAll(localVersions.keySet());
381                     localIsNewer.removeAll(remoteVersions.keySet());
382
383
384                     for (Map.Entry<Address, Long> entry : remoteVersions.entrySet()) {
385                         Address address = entry.getKey();
386                         Long remoteVersion = entry.getValue();
387                         Long localVersion = localVersions.get(address);
388                         if (localVersion == null || remoteVersion == null) {
389                             //this condition is taken care of by above diffs
390                             continue;
391                         }
392                         if (localVersions.get(address) <  remoteVersions.get(address)) {
393                             localIsOlder.add(address);
394                         } else if (localVersions.get(address) > remoteVersions.get(address)) {
395                             localIsNewer.add(address);
396                         }
397                     }
398
399                     if (!localIsOlder.isEmpty()) {
400                         sender.tell(new GossipStatus(selfAddress, localVersions), getSelf());
401                     }
402
403                     if (!localIsNewer.isEmpty()) {
404                         //send newer buckets to remote
405                         sendGossipTo(sender, localIsNewer);
406                     }
407                 }
408                 return null;
409             }
410         };
411     }
412
413     /**
414      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
415      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
416      * These buckets are sent to a remote member encapsulated in
417      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
418      *
419      * @param sender the remote member that sent
420      *               {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
421      *               in reply to which bucket is being sent back
422      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
423      *
424      */
425     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
426
427         return new Mapper<Object, Void>() {
428             @Override
429             public Void apply(final Object msg) {
430                 if (msg instanceof GetBucketsByMembersReply) {
431                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
432                     LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
433                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
434                     sender.tell(envelope, getSelf());
435                 }
436                 return null;
437             }
438         };
439     }
440
441     ///
442     ///Getter Setters
443     ///
444
445     @VisibleForTesting
446     void setClusterMembers(final Address... members) {
447         clusterMembers.clear();
448         peers.clear();
449
450         for (Address addr : members) {
451             addPeer(addr);
452         }
453     }
454 }