b50dfb1ba3e196f66e33d74438d43a2aca2d81ee
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.cluster.ClusterActorRefProvider;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
19 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
20 import org.opendaylight.controller.utils.ConditionalProbe;
21
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
27
28 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
29 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
30 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
31 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
32 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
38
39 /**
40  * A store that syncs its data across nodes in the cluster.
41  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
42  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
43  * <p>
44  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
45  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
46  *
47  */
48 public class BucketStore extends AbstractUntypedActorWithMetering {
49
50     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
51
52     /**
53      * Bucket owned by the node
54      */
55     private BucketImpl localBucket = new BucketImpl();
56
57     /**
58      * Buckets ownded by other known nodes in the cluster
59      */
60     private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
61
62     /**
63      * Bucket version for every known node in the cluster including this node
64      */
65     private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
66
67     /**
68      * Cluster address for this node
69      */
70     private Address selfAddress;
71
72     private ConditionalProbe probe;
73
74     private final RemoteRpcProviderConfig config;
75
76     public BucketStore(){
77         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
78     }
79
80     @Override
81     public void preStart(){
82         ActorRefProvider provider = getContext().provider();
83         selfAddress = provider.getDefaultAddress();
84
85         if ( provider instanceof ClusterActorRefProvider)
86             getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
87     }
88
89
90     @Override
91     protected void handleReceive(Object message) throws Exception {
92         if (probe != null) {
93             probe.tell(message, getSelf());
94         }
95
96         if (message instanceof ConditionalProbe) {
97             log.info("Received probe {} {}", getSelf(), message);
98             probe = (ConditionalProbe) message;
99         } else if (message instanceof UpdateBucket) {
100             receiveUpdateBucket(((UpdateBucket) message).getBucket());
101         } else if (message instanceof GetAllBuckets) {
102             receiveGetAllBucket();
103         } else if (message instanceof GetLocalBucket) {
104             receiveGetLocalBucket();
105         } else if (message instanceof GetBucketsByMembers) {
106             receiveGetBucketsByMembers(
107                     ((GetBucketsByMembers) message).getMembers());
108         } else if (message instanceof GetBucketVersions) {
109             receiveGetBucketVersions();
110         } else if (message instanceof UpdateRemoteBuckets) {
111             receiveUpdateRemoteBuckets(
112                     ((UpdateRemoteBuckets) message).getBuckets());
113         } else {
114             if(log.isDebugEnabled()) {
115                 log.debug("Unhandled message [{}]", message);
116             }
117             unhandled(message);
118         }
119     }
120
121     /**
122      * Returns a copy of bucket owned by this node
123      */
124     private void receiveGetLocalBucket() {
125         final ActorRef sender = getSender();
126         GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
127         sender.tell(reply, getSelf());
128     }
129
130     /**
131      * Updates the bucket owned by this node
132      *
133      * @param updatedBucket
134      */
135     void receiveUpdateBucket(Bucket updatedBucket){
136
137         localBucket = (BucketImpl) updatedBucket;
138         versions.put(selfAddress, localBucket.getVersion());
139     }
140
141     /**
142      * Returns all the buckets the this node knows about, self owned + remote
143      */
144     void receiveGetAllBucket(){
145         final ActorRef sender = getSender();
146         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
147     }
148
149     /**
150      * Helper to collect all known buckets
151      *
152      * @return self owned + remote buckets
153      */
154     Map<Address, Bucket> getAllBuckets(){
155         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
156
157         //first add the local bucket
158         all.put(selfAddress, localBucket);
159
160         //then get all remote buckets
161         all.putAll(remoteBuckets);
162
163         return all;
164     }
165
166     /**
167      * Returns buckets for requested members that this node knows about
168      *
169      * @param members requested members
170      */
171     void receiveGetBucketsByMembers(Set<Address> members){
172         final ActorRef sender = getSender();
173         Map<Address, Bucket> buckets = getBucketsByMembers(members);
174         sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
175     }
176
177     /**
178      * Helper to collect buckets for requested memebers
179      *
180      * @param members requested members
181      * @return buckets for requested memebers
182      */
183     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
184         Map<Address, Bucket> buckets = new HashMap<>();
185
186         //first add the local bucket if asked
187         if (members.contains(selfAddress))
188             buckets.put(selfAddress, localBucket);
189
190         //then get buckets for requested remote nodes
191         for (Address address : members){
192             if (remoteBuckets.containsKey(address))
193                 buckets.put(address, remoteBuckets.get(address));
194         }
195
196         return buckets;
197     }
198
199     /**
200      * Returns versions for all buckets known
201      */
202     void receiveGetBucketVersions(){
203         final ActorRef sender = getSender();
204         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
205         sender.tell(reply, getSelf());
206     }
207
208     /**
209      * Update local copy of remote buckets where local copy's version is older
210      *
211      * @param receivedBuckets buckets sent by remote
212      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
213      */
214     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
215
216         if (receivedBuckets == null || receivedBuckets.isEmpty())
217             return; //nothing to do
218
219         //Remote cant update self's bucket
220         receivedBuckets.remove(selfAddress);
221
222         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
223
224             Long localVersion = versions.get(entry.getKey());
225             if (localVersion == null) localVersion = -1L;
226
227             Bucket receivedBucket = entry.getValue();
228
229             if (receivedBucket == null)
230                 continue;
231
232             Long remoteVersion = receivedBucket.getVersion();
233             if (remoteVersion == null) remoteVersion = -1L;
234
235             //update only if remote version is newer
236             if ( remoteVersion.longValue() > localVersion.longValue() ) {
237                 remoteBuckets.put(entry.getKey(), receivedBucket);
238                 versions.put(entry.getKey(), remoteVersion);
239             }
240         }
241         if(log.isDebugEnabled()) {
242             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
243         }
244     }
245
246     ///
247     ///Getter Setters
248     ///
249
250     BucketImpl getLocalBucket() {
251         return localBucket;
252     }
253
254     void setLocalBucket(BucketImpl localBucket) {
255         this.localBucket = localBucket;
256     }
257
258     ConcurrentMap<Address, Bucket> getRemoteBuckets() {
259         return remoteBuckets;
260     }
261
262     void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
263         this.remoteBuckets = remoteBuckets;
264     }
265
266     ConcurrentMap<Address, Long> getVersions() {
267         return versions;
268     }
269
270     void setVersions(ConcurrentMap<Address, Long> versions) {
271         this.versions = versions;
272     }
273
274     Address getSelfAddress() {
275         return selfAddress;
276     }
277 }