Gossip based eventually consistent RPC Registry.
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.Address;
13 import akka.actor.Props;
14 import akka.actor.UntypedActor;
15 import akka.cluster.Cluster;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24
25 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
26 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
27 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
28 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
29 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
30 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
31 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
32 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
35
36 /**
37  * A store that syncs its data across nodes in the cluster.
38  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
39  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
40  * <p>
41  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
42  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
43  *
44  */
45 public class BucketStore extends UntypedActor {
46
47     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
48
49     /**
50      * Bucket owned by the node
51      */
52     private BucketImpl localBucket = new BucketImpl();;
53
54     /**
55      * Buckets ownded by other known nodes in the cluster
56      */
57     private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
58
59     /**
60      * Bucket version for every known node in the cluster including this node
61      */
62     private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
63
64     /**
65      * Cluster address for this node
66      */
67     private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
68
69     /**
70      * Our private gossiper
71      */
72     private ActorRef gossiper;
73
74     public BucketStore(){
75         gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
76     }
77
78     /**
79      * This constructor is useful for testing.
80      * TODO: Pass Props instead of ActorRef
81      *
82      * @param gossiper
83      */
84     public BucketStore(ActorRef gossiper){
85         this.gossiper = gossiper;
86     }
87
88     @Override
89     public void onReceive(Object message) throws Exception {
90
91         log.debug("Received message: node[{}], message[{}]", selfAddress, message);
92
93         if (message instanceof UpdateBucket)
94             receiveUpdateBucket(((UpdateBucket) message).getBucket());
95
96         else if (message instanceof GetAllBuckets)
97             receiveGetAllBucket();
98
99         else if (message instanceof GetLocalBucket)
100             receiveGetLocalBucket();
101
102         else if (message instanceof GetBucketsByMembers)
103             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
104
105         else if (message instanceof GetBucketVersions)
106             receiveGetBucketVersions();
107
108         else if (message instanceof UpdateRemoteBuckets)
109             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
110
111         else {
112             log.debug("Unhandled message [{}]", message);
113             unhandled(message);
114         }
115
116     }
117
118     /**
119      * Returns a copy of bucket owned by this node
120      */
121     private void receiveGetLocalBucket() {
122         final ActorRef sender = getSender();
123         GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
124         sender.tell(reply, getSelf());
125     }
126
127     /**
128      * Updates the bucket owned by this node
129      *
130      * @param updatedBucket
131      */
132     void receiveUpdateBucket(Bucket updatedBucket){
133
134         localBucket = (BucketImpl) updatedBucket;
135         versions.put(selfAddress, localBucket.getVersion());
136     }
137
138     /**
139      * Returns all the buckets the this node knows about, self owned + remote
140      */
141     void receiveGetAllBucket(){
142         final ActorRef sender = getSender();
143         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
144     }
145
146     /**
147      * Helper to collect all known buckets
148      *
149      * @return self owned + remote buckets
150      */
151     Map<Address, Bucket> getAllBuckets(){
152         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
153
154         //first add the local bucket
155         all.put(selfAddress, localBucket);
156
157         //then get all remote buckets
158         all.putAll(remoteBuckets);
159
160         return all;
161     }
162
163     /**
164      * Returns buckets for requested members that this node knows about
165      *
166      * @param members requested members
167      */
168     void receiveGetBucketsByMembers(Set<Address> members){
169         final ActorRef sender = getSender();
170         Map<Address, Bucket> buckets = getBucketsByMembers(members);
171         sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
172     }
173
174     /**
175      * Helper to collect buckets for requested memebers
176      *
177      * @param members requested members
178      * @return buckets for requested memebers
179      */
180     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
181         Map<Address, Bucket> buckets = new HashMap<>();
182
183         //first add the local bucket if asked
184         if (members.contains(selfAddress))
185             buckets.put(selfAddress, localBucket);
186
187         //then get buckets for requested remote nodes
188         for (Address address : members){
189             if (remoteBuckets.containsKey(address))
190                 buckets.put(address, remoteBuckets.get(address));
191         }
192
193         return buckets;
194     }
195
196     /**
197      * Returns versions for all buckets known
198      */
199     void receiveGetBucketVersions(){
200         final ActorRef sender = getSender();
201         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
202         sender.tell(reply, getSelf());
203     }
204
205     /**
206      * Update local copy of remote buckets where local copy's version is older
207      *
208      * @param receivedBuckets buckets sent by remote
209      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
210      */
211     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
212
213         if (receivedBuckets == null || receivedBuckets.isEmpty())
214             return; //nothing to do
215
216         //Remote cant update self's bucket
217         receivedBuckets.remove(selfAddress);
218
219         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
220
221             Long localVersion = versions.get(entry.getKey());
222             if (localVersion == null) localVersion = -1L;
223
224             Bucket receivedBucket = entry.getValue();
225
226             if (receivedBucket == null)
227                 continue;
228
229             Long remoteVersion = receivedBucket.getVersion();
230             if (remoteVersion == null) remoteVersion = -1L;
231
232             //update only if remote version is newer
233             if ( remoteVersion > localVersion ) {
234                 remoteBuckets.put(entry.getKey(), receivedBucket);
235                 versions.put(entry.getKey(), remoteVersion);
236             }
237         }
238
239         log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
240     }
241
242     ///
243     ///Getter Setters
244     ///
245
246     BucketImpl getLocalBucket() {
247         return localBucket;
248     }
249
250     void setLocalBucket(BucketImpl localBucket) {
251         this.localBucket = localBucket;
252     }
253
254     ConcurrentMap<Address, Bucket> getRemoteBuckets() {
255         return remoteBuckets;
256     }
257
258     void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
259         this.remoteBuckets = remoteBuckets;
260     }
261
262     ConcurrentMap<Address, Long> getVersions() {
263         return versions;
264     }
265
266     void setVersions(ConcurrentMap<Address, Long> versions) {
267         this.versions = versions;
268     }
269
270     Address getSelfAddress() {
271         return selfAddress;
272     }
273 }