Patch to eliminate sleeps in RemoteRpc tests
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Address;
13 import akka.actor.Props;
14 import akka.actor.UntypedActor;
15 import akka.cluster.Cluster;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import org.opendaylight.controller.utils.ConditionalProbe;
19
20 import java.util.HashMap;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25
26 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
27 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
28 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
29 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
30 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
31 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
32 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
36
37 /**
38  * A store that syncs its data across nodes in the cluster.
39  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
40  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
41  * <p>
42  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
43  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
44  *
45  */
46 public class BucketStore extends UntypedActor {
47
48     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
49
50     /**
51      * Bucket owned by the node
52      */
53     private BucketImpl localBucket = new BucketImpl();;
54
55     /**
56      * Buckets ownded by other known nodes in the cluster
57      */
58     private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
59
60     /**
61      * Bucket version for every known node in the cluster including this node
62      */
63     private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
64
65     /**
66      * Cluster address for this node
67      */
68     private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
69
70     /**
71      * Our private gossiper
72      */
73     private ActorRef gossiper;
74
75     private ConditionalProbe probe;
76
77     public BucketStore(){
78         gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
79     }
80
81     /**
82      * This constructor is useful for testing.
83      * TODO: Pass Props instead of ActorRef
84      *
85      * @param gossiper
86      */
87     public BucketStore(ActorRef gossiper){
88         this.gossiper = gossiper;
89     }
90
91     @Override
92     public void onReceive(Object message) throws Exception {
93
94         log.debug("Received message: node[{}], message[{}]", selfAddress,
95             message);
96
97         if (probe != null) {
98
99             probe.tell(message, getSelf());
100         }
101
102         if (message instanceof ConditionalProbe) {
103             log.info("Received probe {} {}", getSelf(), message);
104             probe = (ConditionalProbe) message;
105         } else if (message instanceof UpdateBucket) {
106             receiveUpdateBucket(((UpdateBucket) message).getBucket());
107         } else if (message instanceof GetAllBuckets) {
108             receiveGetAllBucket();
109         } else if (message instanceof GetLocalBucket) {
110             receiveGetLocalBucket();
111         } else if (message instanceof GetBucketsByMembers) {
112             receiveGetBucketsByMembers(
113                 ((GetBucketsByMembers) message).getMembers());
114         } else if (message instanceof GetBucketVersions) {
115             receiveGetBucketVersions();
116         } else if (message instanceof UpdateRemoteBuckets) {
117             receiveUpdateRemoteBuckets(
118                 ((UpdateRemoteBuckets) message).getBuckets());
119         } else {
120             log.debug("Unhandled message [{}]", message);
121             unhandled(message);
122         }
123
124     }
125
126     /**
127      * Returns a copy of bucket owned by this node
128      */
129     private void receiveGetLocalBucket() {
130         final ActorRef sender = getSender();
131         GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
132         sender.tell(reply, getSelf());
133     }
134
135     /**
136      * Updates the bucket owned by this node
137      *
138      * @param updatedBucket
139      */
140     void receiveUpdateBucket(Bucket updatedBucket){
141
142         localBucket = (BucketImpl) updatedBucket;
143         versions.put(selfAddress, localBucket.getVersion());
144     }
145
146     /**
147      * Returns all the buckets the this node knows about, self owned + remote
148      */
149     void receiveGetAllBucket(){
150         final ActorRef sender = getSender();
151         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
152     }
153
154     /**
155      * Helper to collect all known buckets
156      *
157      * @return self owned + remote buckets
158      */
159     Map<Address, Bucket> getAllBuckets(){
160         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
161
162         //first add the local bucket
163         all.put(selfAddress, localBucket);
164
165         //then get all remote buckets
166         all.putAll(remoteBuckets);
167
168         return all;
169     }
170
171     /**
172      * Returns buckets for requested members that this node knows about
173      *
174      * @param members requested members
175      */
176     void receiveGetBucketsByMembers(Set<Address> members){
177         final ActorRef sender = getSender();
178         Map<Address, Bucket> buckets = getBucketsByMembers(members);
179         sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
180     }
181
182     /**
183      * Helper to collect buckets for requested memebers
184      *
185      * @param members requested members
186      * @return buckets for requested memebers
187      */
188     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
189         Map<Address, Bucket> buckets = new HashMap<>();
190
191         //first add the local bucket if asked
192         if (members.contains(selfAddress))
193             buckets.put(selfAddress, localBucket);
194
195         //then get buckets for requested remote nodes
196         for (Address address : members){
197             if (remoteBuckets.containsKey(address))
198                 buckets.put(address, remoteBuckets.get(address));
199         }
200
201         return buckets;
202     }
203
204     /**
205      * Returns versions for all buckets known
206      */
207     void receiveGetBucketVersions(){
208         final ActorRef sender = getSender();
209         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
210         sender.tell(reply, getSelf());
211     }
212
213     /**
214      * Update local copy of remote buckets where local copy's version is older
215      *
216      * @param receivedBuckets buckets sent by remote
217      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
218      */
219     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
220
221         if (receivedBuckets == null || receivedBuckets.isEmpty())
222             return; //nothing to do
223
224         //Remote cant update self's bucket
225         receivedBuckets.remove(selfAddress);
226
227         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
228
229             Long localVersion = versions.get(entry.getKey());
230             if (localVersion == null) localVersion = -1L;
231
232             Bucket receivedBucket = entry.getValue();
233
234             if (receivedBucket == null)
235                 continue;
236
237             Long remoteVersion = receivedBucket.getVersion();
238             if (remoteVersion == null) remoteVersion = -1L;
239
240             //update only if remote version is newer
241             if ( remoteVersion > localVersion ) {
242                 remoteBuckets.put(entry.getKey(), receivedBucket);
243                 versions.put(entry.getKey(), remoteVersion);
244             }
245         }
246
247         log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
248     }
249
250     ///
251     ///Getter Setters
252     ///
253
254     BucketImpl getLocalBucket() {
255         return localBucket;
256     }
257
258     void setLocalBucket(BucketImpl localBucket) {
259         this.localBucket = localBucket;
260     }
261
262     ConcurrentMap<Address, Bucket> getRemoteBuckets() {
263         return remoteBuckets;
264     }
265
266     void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
267         this.remoteBuckets = remoteBuckets;
268     }
269
270     ConcurrentMap<Address, Long> getVersions() {
271         return versions;
272     }
273
274     void setVersions(ConcurrentMap<Address, Long> versions) {
275         this.versions = versions;
276     }
277
278     Address getSelfAddress() {
279         return selfAddress;
280     }
281
282 }