Fix CS warnings in sal-remoterpc-connector and enable enforcement
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.cluster.Cluster;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.cluster.ClusterEvent;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Preconditions;
23 import java.util.ArrayList;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.ThreadLocalRandom;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
31 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
35 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
36 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
37 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipTick;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import scala.concurrent.Future;
43 import scala.concurrent.duration.FiniteDuration;
44
45 /**
46  * Gossiper that syncs bucket store across nodes in the cluster.
47  * <p/>
48  * It keeps a local scheduler that periodically sends Gossip ticks to
49  * itself to send bucket store's bucket versions to a randomly selected remote
50  * gossiper.
51  * <p/>
52  * When bucket versions are received from a remote gossiper, it is compared
53  * with bucket store's bucket versions. Which ever buckets are newer
54  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
55  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
56  * <p/>
57  * When a bucket is received from a remote gossiper, its sent to the bucket store
58  * for update.
59  *
60  */
61 public class Gossiper extends AbstractUntypedActorWithMetering {
62
63     private final Logger log = LoggerFactory.getLogger(getClass());
64
65     private Cluster cluster;
66
67     /**
68      * ActorSystem's address for the current cluster node.
69      */
70     private Address selfAddress;
71
72     /**
73      * All known cluster members.
74      */
75     private List<Address> clusterMembers = new ArrayList<>();
76
77     private Cancellable gossipTask;
78
79     private Boolean autoStartGossipTicks = true;
80
81     private final RemoteRpcProviderConfig config;
82
83     public Gossiper(RemoteRpcProviderConfig config) {
84         this.config = Preconditions.checkNotNull(config);
85     }
86
87     /**
88      * Constructor for testing.
89      *
90      * @param autoStartGossipTicks used for turning off gossip ticks during testing.
91      *                             Gossip tick can be manually sent.
92      */
93     @VisibleForTesting
94     public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
95         this(config);
96         this.autoStartGossipTicks = autoStartGossipTicks;
97     }
98
99     @Override
100     public void preStart() {
101         ActorRefProvider provider = getContext().provider();
102         selfAddress = provider.getDefaultAddress();
103
104         if ( provider instanceof ClusterActorRefProvider ) {
105             cluster = Cluster.get(getContext().system());
106             cluster.subscribe(getSelf(),
107                     ClusterEvent.initialStateAsEvents(),
108                     ClusterEvent.MemberEvent.class,
109                     ClusterEvent.UnreachableMember.class);
110         }
111
112         if (autoStartGossipTicks) {
113             gossipTask = getContext().system().scheduler().schedule(
114                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
115                     config.getGossipTickInterval(),                 //interval
116                     getSelf(),                                       //target
117                     new Messages.GossiperMessages.GossipTick(),      //message
118                     getContext().dispatcher(),                       //execution context
119                     getSelf()                                        //sender
120             );
121         }
122     }
123
124     @Override
125     public void postStop() {
126         if (cluster != null) {
127             cluster.unsubscribe(getSelf());
128         }
129         if (gossipTask != null) {
130             gossipTask.cancel();
131         }
132     }
133
134     @SuppressWarnings({ "rawtypes", "unchecked" })
135     @Override
136     protected void handleReceive(Object message) throws Exception {
137         //Usually sent by self via gossip task defined above. But its not enforced.
138         //These ticks can be sent by another actor as well which is esp. useful while testing
139         if (message instanceof GossipTick) {
140             receiveGossipTick();
141         } else if (message instanceof GossipStatus) {
142             // Message from remote gossiper with its bucket versions
143             receiveGossipStatus((GossipStatus) message);
144         } else if (message instanceof GossipEnvelope) {
145             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
146             // message. The contained buckets are newer as determined by the remote gossiper by
147             // comparing the GossipStatus message with its local versions.
148             receiveGossip((GossipEnvelope) message);
149         } else if (message instanceof ClusterEvent.MemberUp) {
150             receiveMemberUp(((ClusterEvent.MemberUp) message).member());
151
152         } else if (message instanceof ClusterEvent.MemberRemoved) {
153             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
154
155         } else if ( message instanceof ClusterEvent.UnreachableMember) {
156             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
157
158         } else {
159             unhandled(message);
160         }
161     }
162
163     /**
164      * Remove member from local copy of member list. If member down is self, then stop the actor
165      *
166      * @param member who went down
167      */
168     void receiveMemberRemoveOrUnreachable(Member member) {
169         //if its self, then stop itself
170         if (selfAddress.equals(member.address())) {
171             getContext().stop(getSelf());
172             return;
173         }
174
175         clusterMembers.remove(member.address());
176         log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
177     }
178
179     /**
180      * Add member to the local copy of member list if it doesn't already.
181      *
182      * @param member the member to add
183      */
184     void receiveMemberUp(Member member) {
185
186         if (selfAddress.equals(member.address())) {
187             return; //ignore up notification for self
188         }
189
190         if (!clusterMembers.contains(member.address())) {
191             clusterMembers.add(member.address());
192         }
193
194         log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
195     }
196
197     /**
198      * Sends Gossip status to other members in the cluster.
199      * <br>
200      * 1. If there are no member, ignore the tick. <br>
201      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
202      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
203      */
204     void receiveGossipTick() {
205         if (clusterMembers.size() == 0) {
206             return; //no members to send gossip status to
207         }
208
209         Address remoteMemberToGossipTo;
210
211         if (clusterMembers.size() == 1) {
212             remoteMemberToGossipTo = clusterMembers.get(0);
213         } else {
214             Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
215             remoteMemberToGossipTo = clusterMembers.get(randomIndex);
216         }
217
218         log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
219         getLocalStatusAndSendTo(remoteMemberToGossipTo);
220     }
221
222     /**
223      * Process gossip status received from a remote gossiper. Remote versions are compared with
224      * the local copy.
225      * <p/>
226      * For each bucket
227      * <ul>
228      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
229      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
230      *  <li>If both are same, noop</li>
231      * </ul>
232      *
233      * @param status bucket versions from a remote member
234      */
235     void receiveGossipStatus(GossipStatus status) {
236         //Don't accept messages from non-members
237         if (!clusterMembers.contains(status.from())) {
238             return;
239         }
240
241         final ActorRef sender = getSender();
242         Future<Object> futureReply =
243                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
244
245         futureReply.map(getMapperToProcessRemoteStatus(sender, status), getContext().dispatcher());
246
247     }
248
249     /**
250      * Sends the received buckets in the envelope to the parent Bucket store.
251      *
252      * @param envelope contains buckets from a remote gossiper
253      */
254     <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
255         //TODO: Add more validations
256         if (!selfAddress.equals(envelope.to())) {
257             if (log.isTraceEnabled()) {
258                 log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
259                         envelope.to());
260             }
261             return;
262         }
263
264         updateRemoteBuckets(envelope.getBuckets());
265
266     }
267
268     /**
269      * Helper to send received buckets to bucket store.
270      *
271      * @param buckets map of Buckets to update
272      */
273     <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
274         UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
275         getContext().parent().tell(updateRemoteBuckets, getSelf());
276     }
277
278     /**
279      * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
280      *
281      * @param remote     remote node to send Buckets to
282      * @param addresses  node addresses whose buckets needs to be sent
283      */
284     void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
285
286         Future<Object> futureReply =
287                 Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
288         futureReply.map(getMapperToSendGossip(remote), getContext().dispatcher());
289     }
290
291     /**
292      * Gets bucket versions from bucket store and sends to the supplied address.
293      *
294      * @param remoteActorSystemAddress remote gossiper to send to
295      */
296     void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
297
298         //Get local status from bucket store and send to remote
299         Future<Object> futureReply =
300                 Patterns.ask(getContext().parent(), new GetBucketVersions(), config.getAskDuration());
301
302         //Find gossiper on remote system
303         ActorSelection remoteRef = getContext().system().actorSelection(
304                 remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
305
306         log.trace("Sending bucket versions to [{}]", remoteRef);
307
308         futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
309
310     }
311
312     /**
313      * Helper to send bucket versions received from local store.
314      *
315      * @param remote        remote gossiper to send versions to
316      * @param localVersions bucket versions received from local store
317      */
318     void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
319
320         GossipStatus status = new GossipStatus(selfAddress, localVersions);
321         remote.tell(status, getSelf());
322     }
323
324     void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
325
326         GossipStatus status = new GossipStatus(selfAddress, localVersions);
327         remote.tell(status, getSelf());
328     }
329
330     ///
331     /// Private factories to create mappers
332     ///
333
334     private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
335
336         return new Mapper<Object, Void>() {
337             @Override
338             public Void apply(Object replyMessage) {
339                 if (replyMessage instanceof GetBucketVersionsReply) {
340                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
341                     Map<Address, Long> localVersions = reply.getVersions();
342
343                     sendGossipStatusTo(remote, localVersions);
344
345                 }
346                 return null;
347             }
348         };
349     }
350
351     /**
352      * Process bucket versions received from
353      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}.
354      * Then this method compares remote bucket versions with local bucket versions.
355      * <ul>
356      *     <li>The buckets that are newer locally, send
357      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
358      *     to remote
359      *     <li>The buckets that are older locally, send
360      *     {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
361      *     to remote so that remote sends GossipEnvelop.
362      * </ul>
363      *
364      * @param sender the remote member
365      * @param status bucket versions from a remote member
366      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
367      *
368      */
369     private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
370
371         final Map<Address, Long> remoteVersions = status.getVersions();
372
373         return new Mapper<Object, Void>() {
374             @Override
375             public Void apply(Object replyMessage) {
376                 if (replyMessage instanceof GetBucketVersionsReply) {
377                     GetBucketVersionsReply reply = (GetBucketVersionsReply) replyMessage;
378                     Map<Address, Long> localVersions = reply.getVersions();
379
380                     //diff between remote list and local
381                     Set<Address> localIsOlder = new HashSet<>();
382                     localIsOlder.addAll(remoteVersions.keySet());
383                     localIsOlder.removeAll(localVersions.keySet());
384
385                     //diff between local list and remote
386                     Set<Address> localIsNewer = new HashSet<>();
387                     localIsNewer.addAll(localVersions.keySet());
388                     localIsNewer.removeAll(remoteVersions.keySet());
389
390
391                     for (Address address : remoteVersions.keySet()) {
392
393                         if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
394                             continue; //this condition is taken care of by above diffs
395                         }
396                         if (localVersions.get(address) <  remoteVersions.get(address)) {
397                             localIsOlder.add(address);
398                         } else if (localVersions.get(address) > remoteVersions.get(address)) {
399                             localIsNewer.add(address);
400                         }
401                     }
402
403                     if (!localIsOlder.isEmpty()) {
404                         sendGossipStatusTo(sender, localVersions );
405                     }
406
407                     if (!localIsNewer.isEmpty()) {
408                         sendGossipTo(sender, localIsNewer);//send newer buckets to remote
409                     }
410
411                 }
412                 return null;
413             }
414         };
415     }
416
417     /**
418      * Processes the message from {@link org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore}
419      * that contains {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket}.
420      * These buckets are sent to a remote member encapsulated in
421      * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
422      *
423      * @param sender the remote member that sent
424      *           {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
425      *           in reply to which bucket is being sent back
426      * @return a {@link akka.dispatch.Mapper} that gets evaluated in future
427      *
428      */
429     private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
430
431         return new Mapper<Object, Void>() {
432             @SuppressWarnings({ "rawtypes", "unchecked" })
433             @Override
434             public Void apply(Object msg) {
435                 if (msg instanceof GetBucketsByMembersReply) {
436                     Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
437                     log.trace("Buckets to send from {}: {}", selfAddress, buckets);
438                     GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
439                     sender.tell(envelope, getSelf());
440                 }
441                 return null;
442             }
443         };
444     }
445
446     ///
447     ///Getter Setters
448     ///
449     List<Address> getClusterMembers() {
450         return clusterMembers;
451     }
452
453     void setClusterMembers(List<Address> clusterMembers) {
454         this.clusterMembers = clusterMembers;
455     }
456
457     Address getSelfAddress() {
458         return selfAddress;
459     }
460
461     void setSelfAddress(Address selfAddress) {
462         this.selfAddress = selfAddress;
463     }
464 }