b4af0adfe5f4160868752aab7e0210a8607c3910
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Terminated;
15 import akka.cluster.ClusterActorRefProvider;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.HashMultimap;
18 import com.google.common.collect.SetMultimap;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Map.Entry;
22 import java.util.Optional;
23 import java.util.Set;
24 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
25 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
34 import org.opendaylight.controller.utils.ConditionalProbe;
35
36 /**
37  * A store that syncs its data across nodes in the cluster.
38  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
39  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
40  *
41  * <p>
42  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
43  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
44  *
45  */
46 public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
47     /**
48      * Bucket owned by the node.
49      */
50     private final BucketImpl<T> localBucket;
51
52     /**
53      * Buckets owned by other known nodes in the cluster.
54      */
55     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
56
57     /**
58      * Bucket version for every known node in the cluster including this node.
59      */
60     private final Map<Address, Long> versions = new HashMap<>();
61
62     /**
63      * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
64      * once, possibly being tied to multiple addresses (and by extension, buckets).
65      */
66     private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
67
68     /**
69      * Cluster address for this node.
70      */
71     private Address selfAddress;
72
73     // FIXME: should be part of test-specific subclass
74     private ConditionalProbe probe;
75
76     private final RemoteRpcProviderConfig config;
77
78     public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
79         this.config = Preconditions.checkNotNull(config);
80         this.localBucket = new BucketImpl<>(initialData);
81     }
82
83     @Override
84     public void preStart() {
85         ActorRefProvider provider = getContext().provider();
86         selfAddress = provider.getDefaultAddress();
87
88         if (provider instanceof ClusterActorRefProvider) {
89             getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
90         }
91     }
92
93     @SuppressWarnings("unchecked")
94     @Override
95     protected void handleReceive(final Object message) throws Exception {
96         if (probe != null) {
97             probe.tell(message, getSelf());
98         }
99
100         if (message instanceof GetBucketsByMembers) {
101             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
102         } else if (message instanceof GetBucketVersions) {
103             receiveGetBucketVersions();
104         } else if (message instanceof UpdateRemoteBuckets) {
105             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
106         } else if (message instanceof RemoveRemoteBucket) {
107             removeBucket(((RemoveRemoteBucket) message).getAddress());
108         } else if (message instanceof Terminated) {
109             actorTerminated((Terminated) message);
110         } else if (message instanceof GetAllBuckets) {
111             // GetAllBuckets is used only for unit tests.
112             receiveGetAllBuckets();
113         } else if (message instanceof ConditionalProbe) {
114             // The ConditionalProbe is only used for unit tests.
115             LOG.info("Received probe {} {}", getSelf(), message);
116             probe = (ConditionalProbe) message;
117             // Send back any message to tell the caller we got the probe.
118             getSender().tell("Got it", getSelf());
119         } else {
120             LOG.debug("Unhandled message [{}]", message);
121             unhandled(message);
122         }
123     }
124
125     protected RemoteRpcProviderConfig getConfig() {
126         return config;
127     }
128
129     /**
130      * Returns all the buckets the this node knows about, self owned + remote.
131      */
132     void receiveGetAllBuckets() {
133         final ActorRef sender = getSender();
134         sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
135     }
136
137     /**
138      * Helper to collect all known buckets.
139      *
140      * @return self owned + remote buckets
141      */
142     Map<Address, Bucket<T>> getAllBuckets() {
143         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
144
145         //first add the local bucket
146         all.put(selfAddress, new BucketImpl<>(localBucket));
147
148         //then get all remote buckets
149         all.putAll(remoteBuckets);
150
151         return all;
152     }
153
154     /**
155      * Returns buckets for requested members that this node knows about.
156      *
157      * @param members requested members
158      */
159     void receiveGetBucketsByMembers(final Set<Address> members) {
160         final ActorRef sender = getSender();
161         Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
162         sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
163     }
164
165     /**
166      * Helper to collect buckets for requested members.
167      *
168      * @param members requested members
169      * @return buckets for requested members
170      */
171     Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
172         Map<Address, Bucket<T>> buckets = new HashMap<>();
173
174         //first add the local bucket if asked
175         if (members.contains(selfAddress)) {
176             buckets.put(selfAddress, new BucketImpl<>(localBucket));
177         }
178
179         //then get buckets for requested remote nodes
180         for (Address address : members) {
181             if (remoteBuckets.containsKey(address)) {
182                 buckets.put(address, remoteBuckets.get(address));
183             }
184         }
185
186         return buckets;
187     }
188
189     /**
190      * Returns versions for all buckets known.
191      */
192     void receiveGetBucketVersions() {
193         final ActorRef sender = getSender();
194         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
195         sender.tell(reply, getSelf());
196     }
197
198     /**
199      * Update local copy of remote buckets where local copy's version is older.
200      *
201      * @param receivedBuckets buckets sent by remote
202      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
203      */
204     void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
205         LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
206         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
207             //nothing to do
208             return;
209         }
210
211         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
212         for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
213             final Address addr = entry.getKey();
214
215             if (selfAddress.equals(addr)) {
216                 // Remote cannot update our bucket
217                 continue;
218             }
219
220             final Bucket<T> receivedBucket = entry.getValue();
221             if (receivedBucket == null) {
222                 LOG.debug("Ignoring null bucket from {}", addr);
223                 continue;
224             }
225
226             // update only if remote version is newer
227             final long remoteVersion = receivedBucket.getVersion();
228             final Long localVersion = versions.get(addr);
229             if (localVersion != null && remoteVersion <= localVersion.longValue()) {
230                 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
231                     remoteVersion);
232                 continue;
233             }
234             newBuckets.put(addr, receivedBucket);
235             versions.put(addr, remoteVersion);
236             final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
237
238             // Deal with DeathWatch subscriptions
239             final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
240             final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
241             if (!curRef.equals(prevRef)) {
242                 prevRef.ifPresent(ref -> removeWatch(addr, ref));
243                 curRef.ifPresent(ref -> addWatch(addr, ref));
244             }
245
246             LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
247         }
248
249         LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
250
251         onBucketsUpdated(newBuckets);
252     }
253
254     private void addWatch(final Address addr, final ActorRef ref) {
255         if (!watchedActors.containsKey(ref)) {
256             getContext().watch(ref);
257             LOG.debug("Watching {}", ref);
258         }
259         watchedActors.put(ref, addr);
260     }
261
262     private void removeWatch(final Address addr, final ActorRef ref) {
263         watchedActors.remove(ref, addr);
264         if (!watchedActors.containsKey(ref)) {
265             getContext().unwatch(ref);
266             LOG.debug("No longer watching {}", ref);
267         }
268     }
269
270     private void removeBucket(final Address addr) {
271         final Bucket<T> bucket = remoteBuckets.remove(addr);
272         if (bucket != null) {
273             bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
274             onBucketRemoved(addr, bucket);
275         }
276     }
277
278     private void actorTerminated(final Terminated message) {
279         LOG.info("Actor termination {} received", message);
280
281         for (Address addr : watchedActors.removeAll(message.getActor())) {
282             versions.remove(addr);
283             final Bucket<T> bucket = remoteBuckets.remove(addr);
284             if (bucket != null) {
285                 LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
286                 onBucketRemoved(addr, bucket);
287             }
288         }
289     }
290
291     /**
292      * Callback to subclasses invoked when a bucket is removed.
293      *
294      * @param address Remote address
295      * @param bucket Bucket removed
296      */
297     protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
298         // Default noop
299     }
300
301     /**
302      * Callback to subclasses invoked when the set of remote buckets is updated.
303      *
304      * @param newBuckets Map of address to new bucket. Never null, but can be empty.
305      */
306     protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
307         // Default noop
308     }
309
310     public BucketImpl<T> getLocalBucket() {
311         return localBucket;
312     }
313
314     protected void updateLocalBucket(final T data) {
315         localBucket.setData(data);
316         versions.put(selfAddress, localBucket.getVersion());
317     }
318
319     public Map<Address, Bucket<T>> getRemoteBuckets() {
320         return remoteBuckets;
321     }
322
323     public Map<Address, Long> getVersions() {
324         return versions;
325     }
326 }