BUG-7594: Rework sal-remoterpc-connector messages
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / Gossiper.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
9
10 import akka.actor.ActorRef;
11 import akka.actor.ActorRefProvider;
12 import akka.actor.ActorSelection;
13 import akka.actor.Address;
14 import akka.actor.Cancellable;
15 import akka.actor.Props;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterActorRefProvider;
18 import akka.cluster.ClusterEvent;
19 import akka.cluster.Member;
20 import com.google.common.annotations.VisibleForTesting;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Verify;
23 import java.util.ArrayList;
24 import java.util.HashMap;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Map.Entry;
29 import java.util.Set;
30 import java.util.concurrent.ThreadLocalRandom;
31 import java.util.concurrent.TimeUnit;
32 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
33 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
34 import scala.concurrent.duration.FiniteDuration;
35
36 /**
37  * Gossiper that syncs bucket store across nodes in the cluster.
38  *
39  * <p>
40  * It keeps a local scheduler that periodically sends Gossip ticks to
41  * itself to send bucket store's bucket versions to a randomly selected remote
42  * gossiper.
43  *
44  * <p>
45  * When bucket versions are received from a remote gossiper, it is compared
46  * with bucket store's bucket versions. Which ever buckets are newer
47  * locally, are sent to remote gossiper. If any bucket is older in bucket store,
48  * a gossip status is sent to remote gossiper so that it can send the newer buckets.
49  *
50  * <p>
51  * When a bucket is received from a remote gossiper, its sent to the bucket store
52  * for update.
53  */
54 public class Gossiper extends AbstractUntypedActorWithMetering {
55     private static final Object GOSSIP_TICK = new Object() {
56         @Override
57         public String toString() {
58             return "gossip tick";
59         }
60     };
61
62     private final boolean autoStartGossipTicks;
63     private final RemoteRpcProviderConfig config;
64
65     /**
66      * All known cluster members.
67      */
68     private final List<Address> clusterMembers = new ArrayList<>();
69
70     /**
71      * Cached ActorSelections for remote peers.
72      */
73     private final Map<Address, ActorSelection> peers = new HashMap<>();
74
75     /**
76      * ActorSystem's address for the current cluster node.
77      */
78     private Address selfAddress;
79
80     private Cluster cluster;
81
82     private Cancellable gossipTask;
83
84     private BucketStoreAccess bucketStore;
85
86     Gossiper(final RemoteRpcProviderConfig config, final Boolean autoStartGossipTicks) {
87         this.config = Preconditions.checkNotNull(config);
88         this.autoStartGossipTicks = autoStartGossipTicks.booleanValue();
89     }
90
91     Gossiper(final RemoteRpcProviderConfig config) {
92         this(config, Boolean.TRUE);
93     }
94
95     public static Props props(final RemoteRpcProviderConfig config) {
96         return Props.create(Gossiper.class, config);
97     }
98
99     static Props testProps(final RemoteRpcProviderConfig config) {
100         return Props.create(Gossiper.class, config, Boolean.FALSE);
101     }
102
103     @Override
104     public void preStart() {
105         ActorRefProvider provider = getContext().provider();
106         selfAddress = provider.getDefaultAddress();
107
108         bucketStore = new BucketStoreAccess(getContext(), config.getAskDuration());
109
110         if (provider instanceof ClusterActorRefProvider ) {
111             cluster = Cluster.get(getContext().system());
112             cluster.subscribe(getSelf(),
113                     ClusterEvent.initialStateAsEvents(),
114                     ClusterEvent.MemberEvent.class,
115                     ClusterEvent.ReachableMember.class,
116                     ClusterEvent.UnreachableMember.class);
117         }
118
119         if (autoStartGossipTicks) {
120             gossipTask = getContext().system().scheduler().schedule(
121                     new FiniteDuration(1, TimeUnit.SECONDS),        //initial delay
122                     config.getGossipTickInterval(),                 //interval
123                     getSelf(),                                      //target
124                     GOSSIP_TICK,                                    //message
125                     getContext().dispatcher(),                      //execution context
126                     getSelf()                                       //sender
127             );
128         }
129     }
130
131     @Override
132     public void postStop() {
133         if (cluster != null) {
134             cluster.unsubscribe(getSelf());
135         }
136         if (gossipTask != null) {
137             gossipTask.cancel();
138         }
139     }
140
141     @Override
142     protected void handleReceive(final Object message) throws Exception {
143         //Usually sent by self via gossip task defined above. But its not enforced.
144         //These ticks can be sent by another actor as well which is esp. useful while testing
145         if (GOSSIP_TICK.equals(message)) {
146             receiveGossipTick();
147         } else if (message instanceof GossipStatus) {
148             // Message from remote gossiper with its bucket versions
149             receiveGossipStatus((GossipStatus) message);
150         } else if (message instanceof GossipEnvelope) {
151             // Message from remote gossiper with buckets. This is usually in response to GossipStatus
152             // message. The contained buckets are newer as determined by the remote gossiper by
153             // comparing the GossipStatus message with its local versions.
154             receiveGossip((GossipEnvelope) message);
155         } else if (message instanceof ClusterEvent.MemberUp) {
156             receiveMemberUpOrReachable(((ClusterEvent.MemberUp) message).member());
157
158         } else if (message instanceof ClusterEvent.ReachableMember) {
159             receiveMemberUpOrReachable(((ClusterEvent.ReachableMember) message).member());
160
161         } else if (message instanceof ClusterEvent.MemberRemoved) {
162             receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
163
164         } else if (message instanceof ClusterEvent.UnreachableMember) {
165             receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
166
167         } else {
168             unhandled(message);
169         }
170     }
171
172     /**
173      * Remove member from local copy of member list. If member down is self, then stop the actor
174      *
175      * @param member who went down
176      */
177     private void receiveMemberRemoveOrUnreachable(final Member member) {
178         //if its self, then stop itself
179         if (selfAddress.equals(member.address())) {
180             getContext().stop(getSelf());
181             return;
182         }
183
184         removePeer(member.address());
185         LOG.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
186     }
187
188     private void addPeer(final Address address) {
189         if (!clusterMembers.contains(address)) {
190             clusterMembers.add(address);
191         }
192         peers.computeIfAbsent(address, input -> getContext().system()
193             .actorSelection(input.toString() + getSelf().path().toStringWithoutAddress()));
194     }
195
196     private void removePeer(final Address address) {
197         clusterMembers.remove(address);
198         peers.remove(address);
199         bucketStore.removeRemoteBucket(address);
200     }
201
202     /**
203      * Add member to the local copy of member list if it doesn't already.
204      *
205      * @param member the member to add
206      */
207     private void receiveMemberUpOrReachable(final Member member) {
208         //ignore up notification for self
209         if (selfAddress.equals(member.address())) {
210             return;
211         }
212
213         addPeer(member.address());
214         LOG.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
215     }
216
217     /**
218      * Sends Gossip status to other members in the cluster.
219      * <br>
220      * 1. If there are no member, ignore the tick. <br>
221      * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
222      * 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
223      */
224     @VisibleForTesting
225     void receiveGossipTick() {
226         final Address address;
227         switch (clusterMembers.size()) {
228             case 0:
229                 //no members to send gossip status to
230                 return;
231             case 1:
232                 address = clusterMembers.get(0);
233                 break;
234             default:
235                 final int randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
236                 address = clusterMembers.get(randomIndex);
237                 break;
238         }
239
240         LOG.trace("Gossiping to [{}]", address);
241         getLocalStatusAndSendTo(Verify.verifyNotNull(peers.get(address)));
242     }
243
244     /**
245      * Process gossip status received from a remote gossiper. Remote versions are compared with
246      * the local copy.
247      * <p/>
248      * For each bucket
249      * <ul>
250      *  <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
251      *  <li>If local is older, GossipStatus is sent to remote so that it can reply with GossipEnvelope</li>
252      *  <li>If both are same, noop</li>
253      * </ul>
254      *
255      * @param status bucket versions from a remote member
256      */
257     @VisibleForTesting
258     void receiveGossipStatus(final GossipStatus status) {
259         // Don't accept messages from non-members
260         if (peers.containsKey(status.from())) {
261             // FIXME: sender should be part of GossipStatus
262             final ActorRef sender = getSender();
263             bucketStore.getBucketVersions(versions ->  processRemoteStatus(sender, status, versions));
264         }
265     }
266
267     private void processRemoteStatus(final ActorRef remote, final GossipStatus status,
268             final Map<Address, Long> localVersions) {
269         final Map<Address, Long> remoteVersions = status.versions();
270
271         //diff between remote list and local
272         final Set<Address> localIsOlder = new HashSet<>(remoteVersions.keySet());
273         localIsOlder.removeAll(localVersions.keySet());
274
275         //diff between local list and remote
276         final Set<Address> localIsNewer = new HashSet<>(localVersions.keySet());
277         localIsNewer.removeAll(remoteVersions.keySet());
278
279
280         for (Entry<Address, Long> entry : remoteVersions.entrySet()) {
281             Address address = entry.getKey();
282             Long remoteVersion = entry.getValue();
283             Long localVersion = localVersions.get(address);
284             if (localVersion == null || remoteVersion == null) {
285                 //this condition is taken care of by above diffs
286                 continue;
287             }
288
289             if (localVersion < remoteVersion) {
290                 localIsOlder.add(address);
291             } else if (localVersion > remoteVersion) {
292                 localIsNewer.add(address);
293             }
294         }
295
296         if (!localIsOlder.isEmpty()) {
297             remote.tell(new GossipStatus(selfAddress, localVersions), getSelf());
298         }
299
300         if (!localIsNewer.isEmpty()) {
301             //send newer buckets to remote
302             bucketStore.getBucketsByMembers(localIsNewer, buckets -> {
303                 LOG.trace("Buckets to send from {}: {}", selfAddress, buckets);
304                 remote.tell(new GossipEnvelope(selfAddress, remote.path().address(), buckets), getSelf());
305             });
306         }
307     }
308
309     /**
310      * Sends the received buckets in the envelope to the parent Bucket store.
311      *
312      * @param envelope contains buckets from a remote gossiper
313      */
314     @VisibleForTesting
315     void receiveGossip(final GossipEnvelope envelope) {
316         //TODO: Add more validations
317         if (!selfAddress.equals(envelope.to())) {
318             LOG.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
319             return;
320         }
321
322         updateRemoteBuckets(envelope.buckets());
323     }
324
325     /**
326      * Helper to send received buckets to bucket store.
327      *
328      * @param buckets map of Buckets to update
329      */
330     @VisibleForTesting
331     void updateRemoteBuckets(final Map<Address, ? extends Bucket<?>> buckets) {
332         bucketStore.updateRemoteBuckets(buckets);
333     }
334
335     /**
336      * Gets bucket versions from bucket store and sends to the supplied address.
337      *
338      * @param remoteActorSystemAddress remote gossiper to send to
339      */
340     @VisibleForTesting
341     void getLocalStatusAndSendTo(final ActorSelection remoteGossiper) {
342         bucketStore.getBucketVersions(versions -> {
343             LOG.trace("Sending bucket versions to [{}]", remoteGossiper);
344             /*
345              * XXX: we are leaking our reference here. That may be useful for establishing buckets monitoring,
346              *      but can we identify which bucket is the local one?
347              */
348             remoteGossiper.tell(new GossipStatus(selfAddress, versions), getSelf());
349         });
350     }
351
352     ///
353     ///Getter Setters
354     ///
355
356     @VisibleForTesting
357     void setClusterMembers(final Address... members) {
358         clusterMembers.clear();
359         peers.clear();
360
361         for (Address addr : members) {
362             addPeer(addr);
363         }
364     }
365 }