3b078aa062fdf1fc3feaca7fa10147de8c3503cc
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.event.Logging;
18 import akka.event.LoggingAdapter;
19 import org.opendaylight.controller.utils.ConditionalProbe;
20
21 import java.util.HashMap;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26
27 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
28 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
29 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
30 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
31 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
32 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
37
38 /**
39  * A store that syncs its data across nodes in the cluster.
40  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
41  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
42  * <p>
43  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
44  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
45  *
46  */
47 public class BucketStore extends UntypedActor {
48
49     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
50
51     /**
52      * Bucket owned by the node
53      */
54     private BucketImpl localBucket = new BucketImpl();;
55
56     /**
57      * Buckets ownded by other known nodes in the cluster
58      */
59     private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
60
61     /**
62      * Bucket version for every known node in the cluster including this node
63      */
64     private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
65
66     /**
67      * Cluster address for this node
68      */
69     private Address selfAddress;
70
71     private ConditionalProbe probe;
72
73     @Override
74     public void preStart(){
75         ActorRefProvider provider = getContext().provider();
76         selfAddress = provider.getDefaultAddress();
77
78         if ( provider instanceof ClusterActorRefProvider)
79             getContext().actorOf(Props.create(Gossiper.class), "gossiper");
80     }
81
82     @Override
83     public void onReceive(Object message) throws Exception {
84
85         log.debug("Received message: node[{}], message[{}]", selfAddress, message);
86
87         if (probe != null) {
88             probe.tell(message, getSelf());
89         }
90
91         if (message instanceof ConditionalProbe) {
92             log.info("Received probe {} {}", getSelf(), message);
93             probe = (ConditionalProbe) message;
94         } else if (message instanceof UpdateBucket) {
95             receiveUpdateBucket(((UpdateBucket) message).getBucket());
96         } else if (message instanceof GetAllBuckets) {
97             receiveGetAllBucket();
98         } else if (message instanceof GetLocalBucket) {
99             receiveGetLocalBucket();
100         } else if (message instanceof GetBucketsByMembers) {
101             receiveGetBucketsByMembers(
102                 ((GetBucketsByMembers) message).getMembers());
103         } else if (message instanceof GetBucketVersions) {
104             receiveGetBucketVersions();
105         } else if (message instanceof UpdateRemoteBuckets) {
106             receiveUpdateRemoteBuckets(
107                 ((UpdateRemoteBuckets) message).getBuckets());
108         } else {
109             log.debug("Unhandled message [{}]", message);
110             unhandled(message);
111         }
112
113     }
114
115     /**
116      * Returns a copy of bucket owned by this node
117      */
118     private void receiveGetLocalBucket() {
119         final ActorRef sender = getSender();
120         GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
121         sender.tell(reply, getSelf());
122     }
123
124     /**
125      * Updates the bucket owned by this node
126      *
127      * @param updatedBucket
128      */
129     void receiveUpdateBucket(Bucket updatedBucket){
130
131         localBucket = (BucketImpl) updatedBucket;
132         versions.put(selfAddress, localBucket.getVersion());
133     }
134
135     /**
136      * Returns all the buckets the this node knows about, self owned + remote
137      */
138     void receiveGetAllBucket(){
139         final ActorRef sender = getSender();
140         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
141     }
142
143     /**
144      * Helper to collect all known buckets
145      *
146      * @return self owned + remote buckets
147      */
148     Map<Address, Bucket> getAllBuckets(){
149         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
150
151         //first add the local bucket
152         all.put(selfAddress, localBucket);
153
154         //then get all remote buckets
155         all.putAll(remoteBuckets);
156
157         return all;
158     }
159
160     /**
161      * Returns buckets for requested members that this node knows about
162      *
163      * @param members requested members
164      */
165     void receiveGetBucketsByMembers(Set<Address> members){
166         final ActorRef sender = getSender();
167         Map<Address, Bucket> buckets = getBucketsByMembers(members);
168         sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
169     }
170
171     /**
172      * Helper to collect buckets for requested memebers
173      *
174      * @param members requested members
175      * @return buckets for requested memebers
176      */
177     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
178         Map<Address, Bucket> buckets = new HashMap<>();
179
180         //first add the local bucket if asked
181         if (members.contains(selfAddress))
182             buckets.put(selfAddress, localBucket);
183
184         //then get buckets for requested remote nodes
185         for (Address address : members){
186             if (remoteBuckets.containsKey(address))
187                 buckets.put(address, remoteBuckets.get(address));
188         }
189
190         return buckets;
191     }
192
193     /**
194      * Returns versions for all buckets known
195      */
196     void receiveGetBucketVersions(){
197         final ActorRef sender = getSender();
198         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
199         sender.tell(reply, getSelf());
200     }
201
202     /**
203      * Update local copy of remote buckets where local copy's version is older
204      *
205      * @param receivedBuckets buckets sent by remote
206      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
207      */
208     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
209
210         if (receivedBuckets == null || receivedBuckets.isEmpty())
211             return; //nothing to do
212
213         //Remote cant update self's bucket
214         receivedBuckets.remove(selfAddress);
215
216         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
217
218             Long localVersion = versions.get(entry.getKey());
219             if (localVersion == null) localVersion = -1L;
220
221             Bucket receivedBucket = entry.getValue();
222
223             if (receivedBucket == null)
224                 continue;
225
226             Long remoteVersion = receivedBucket.getVersion();
227             if (remoteVersion == null) remoteVersion = -1L;
228
229             //update only if remote version is newer
230             if ( remoteVersion.longValue() > localVersion.longValue() ) {
231                 remoteBuckets.put(entry.getKey(), receivedBucket);
232                 versions.put(entry.getKey(), remoteVersion);
233             }
234         }
235
236         log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
237     }
238
239     ///
240     ///Getter Setters
241     ///
242
243     BucketImpl getLocalBucket() {
244         return localBucket;
245     }
246
247     void setLocalBucket(BucketImpl localBucket) {
248         this.localBucket = localBucket;
249     }
250
251     ConcurrentMap<Address, Bucket> getRemoteBuckets() {
252         return remoteBuckets;
253     }
254
255     void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
256         this.remoteBuckets = remoteBuckets;
257     }
258
259     ConcurrentMap<Address, Long> getVersions() {
260         return versions;
261     }
262
263     void setVersions(ConcurrentMap<Address, Long> versions) {
264         this.versions = versions;
265     }
266
267     Address getSelfAddress() {
268         return selfAddress;
269     }
270 }