Merge "Small fix to xsql dependencies"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.event.Logging;
18 import akka.event.LoggingAdapter;
19 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
20 import org.opendaylight.controller.utils.ConditionalProbe;
21
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27
28 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
29 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
30 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
31 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
32 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
38
39 /**
40  * A store that syncs its data across nodes in the cluster.
41  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
42  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
43  * <p>
44  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
45  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
46  *
47  */
48 public class BucketStore extends UntypedActor {
49
50     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
51
52     /**
53      * Bucket owned by the node
54      */
55     private BucketImpl localBucket = new BucketImpl();;
56
57     /**
58      * Buckets ownded by other known nodes in the cluster
59      */
60     private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
61
62     /**
63      * Bucket version for every known node in the cluster including this node
64      */
65     private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
66
67     /**
68      * Cluster address for this node
69      */
70     private Address selfAddress;
71
72     private ConditionalProbe probe;
73
74     @Override
75     public void preStart(){
76         ActorRefProvider provider = getContext().provider();
77         selfAddress = provider.getDefaultAddress();
78
79         if ( provider instanceof ClusterActorRefProvider)
80             getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper");
81     }
82
83     @Override
84     public void onReceive(Object message) throws Exception {
85
86         log.debug("Received message: node[{}], message[{}]", selfAddress, message);
87
88         if (probe != null) {
89             probe.tell(message, getSelf());
90         }
91
92         if (message instanceof ConditionalProbe) {
93             log.info("Received probe {} {}", getSelf(), message);
94             probe = (ConditionalProbe) message;
95         } else if (message instanceof UpdateBucket) {
96             receiveUpdateBucket(((UpdateBucket) message).getBucket());
97         } else if (message instanceof GetAllBuckets) {
98             receiveGetAllBucket();
99         } else if (message instanceof GetLocalBucket) {
100             receiveGetLocalBucket();
101         } else if (message instanceof GetBucketsByMembers) {
102             receiveGetBucketsByMembers(
103                 ((GetBucketsByMembers) message).getMembers());
104         } else if (message instanceof GetBucketVersions) {
105             receiveGetBucketVersions();
106         } else if (message instanceof UpdateRemoteBuckets) {
107             receiveUpdateRemoteBuckets(
108                 ((UpdateRemoteBuckets) message).getBuckets());
109         } else {
110             log.debug("Unhandled message [{}]", message);
111             unhandled(message);
112         }
113
114     }
115
116     /**
117      * Returns a copy of bucket owned by this node
118      */
119     private void receiveGetLocalBucket() {
120         final ActorRef sender = getSender();
121         GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
122         sender.tell(reply, getSelf());
123     }
124
125     /**
126      * Updates the bucket owned by this node
127      *
128      * @param updatedBucket
129      */
130     void receiveUpdateBucket(Bucket updatedBucket){
131
132         localBucket = (BucketImpl) updatedBucket;
133         versions.put(selfAddress, localBucket.getVersion());
134     }
135
136     /**
137      * Returns all the buckets the this node knows about, self owned + remote
138      */
139     void receiveGetAllBucket(){
140         final ActorRef sender = getSender();
141         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
142     }
143
144     /**
145      * Helper to collect all known buckets
146      *
147      * @return self owned + remote buckets
148      */
149     Map<Address, Bucket> getAllBuckets(){
150         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
151
152         //first add the local bucket
153         all.put(selfAddress, localBucket);
154
155         //then get all remote buckets
156         all.putAll(remoteBuckets);
157
158         return all;
159     }
160
161     /**
162      * Returns buckets for requested members that this node knows about
163      *
164      * @param members requested members
165      */
166     void receiveGetBucketsByMembers(Set<Address> members){
167         final ActorRef sender = getSender();
168         Map<Address, Bucket> buckets = getBucketsByMembers(members);
169         sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
170     }
171
172     /**
173      * Helper to collect buckets for requested memebers
174      *
175      * @param members requested members
176      * @return buckets for requested memebers
177      */
178     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
179         Map<Address, Bucket> buckets = new HashMap<>();
180
181         //first add the local bucket if asked
182         if (members.contains(selfAddress))
183             buckets.put(selfAddress, localBucket);
184
185         //then get buckets for requested remote nodes
186         for (Address address : members){
187             if (remoteBuckets.containsKey(address))
188                 buckets.put(address, remoteBuckets.get(address));
189         }
190
191         return buckets;
192     }
193
194     /**
195      * Returns versions for all buckets known
196      */
197     void receiveGetBucketVersions(){
198         final ActorRef sender = getSender();
199         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
200         sender.tell(reply, getSelf());
201     }
202
203     /**
204      * Update local copy of remote buckets where local copy's version is older
205      *
206      * @param receivedBuckets buckets sent by remote
207      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
208      */
209     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
210
211         if (receivedBuckets == null || receivedBuckets.isEmpty())
212             return; //nothing to do
213
214         //Remote cant update self's bucket
215         receivedBuckets.remove(selfAddress);
216
217         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
218
219             Long localVersion = versions.get(entry.getKey());
220             if (localVersion == null) localVersion = -1L;
221
222             Bucket receivedBucket = entry.getValue();
223
224             if (receivedBucket == null)
225                 continue;
226
227             Long remoteVersion = receivedBucket.getVersion();
228             if (remoteVersion == null) remoteVersion = -1L;
229
230             //update only if remote version is newer
231             if ( remoteVersion.longValue() > localVersion.longValue() ) {
232                 remoteBuckets.put(entry.getKey(), receivedBucket);
233                 versions.put(entry.getKey(), remoteVersion);
234             }
235         }
236
237         log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
238     }
239
240     ///
241     ///Getter Setters
242     ///
243
244     BucketImpl getLocalBucket() {
245         return localBucket;
246     }
247
248     void setLocalBucket(BucketImpl localBucket) {
249         this.localBucket = localBucket;
250     }
251
252     ConcurrentMap<Address, Bucket> getRemoteBuckets() {
253         return remoteBuckets;
254     }
255
256     void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
257         this.remoteBuckets = remoteBuckets;
258     }
259
260     ConcurrentMap<Address, Long> getVersions() {
261         return versions;
262     }
263
264     void setVersions(ConcurrentMap<Address, Long> versions) {
265         this.versions = versions;
266     }
267
268     Address getSelfAddress() {
269         return selfAddress;
270     }
271 }