Merge "Add MD-SAL artifacts"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.cluster.ClusterActorRefProvider;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import java.util.HashMap;
19 import java.util.Map;
20 import java.util.Set;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
24 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
35 import org.opendaylight.controller.utils.ConditionalProbe;
36
37 /**
38  * A store that syncs its data across nodes in the cluster.
39  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
40  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
41  * <p>
42  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
43  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
44  *
45  */
46 public class BucketStore extends AbstractUntypedActorWithMetering {
47
48     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
49
50     /**
51      * Bucket owned by the node
52      */
53     private BucketImpl localBucket = new BucketImpl();
54
55     /**
56      * Buckets ownded by other known nodes in the cluster
57      */
58     private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
59
60     /**
61      * Bucket version for every known node in the cluster including this node
62      */
63     private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
64
65     /**
66      * Cluster address for this node
67      */
68     private Address selfAddress;
69
70     private ConditionalProbe probe;
71
72     private final RemoteRpcProviderConfig config;
73
74     public BucketStore(){
75         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
76     }
77
78     @Override
79     public void preStart(){
80         ActorRefProvider provider = getContext().provider();
81         selfAddress = provider.getDefaultAddress();
82
83         if ( provider instanceof ClusterActorRefProvider) {
84             getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
85         }
86     }
87
88
89     @Override
90     protected void handleReceive(Object message) throws Exception {
91         if (probe != null) {
92             probe.tell(message, getSelf());
93         }
94
95         if (message instanceof ConditionalProbe) {
96             // The ConditionalProbe is only used for unit tests.
97             log.info("Received probe {} {}", getSelf(), message);
98             probe = (ConditionalProbe) message;
99             // Send back any message to tell the caller we got the probe.
100             getSender().tell("Got it", getSelf());
101         } else if (message instanceof UpdateBucket) {
102             receiveUpdateBucket(((UpdateBucket) message).getBucket());
103         } else if (message instanceof GetAllBuckets) {
104             receiveGetAllBucket();
105         } else if (message instanceof GetLocalBucket) {
106             receiveGetLocalBucket();
107         } else if (message instanceof GetBucketsByMembers) {
108             receiveGetBucketsByMembers(
109                     ((GetBucketsByMembers) message).getMembers());
110         } else if (message instanceof GetBucketVersions) {
111             receiveGetBucketVersions();
112         } else if (message instanceof UpdateRemoteBuckets) {
113             receiveUpdateRemoteBuckets(
114                     ((UpdateRemoteBuckets) message).getBuckets());
115         } else {
116             if(log.isDebugEnabled()) {
117                 log.debug("Unhandled message [{}]", message);
118             }
119             unhandled(message);
120         }
121     }
122
123     /**
124      * Returns a copy of bucket owned by this node
125      */
126     private void receiveGetLocalBucket() {
127         final ActorRef sender = getSender();
128         GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
129         sender.tell(reply, getSelf());
130     }
131
132     /**
133      * Updates the bucket owned by this node
134      *
135      * @param updatedBucket
136      */
137     void receiveUpdateBucket(Bucket updatedBucket){
138
139         localBucket = (BucketImpl) updatedBucket;
140         versions.put(selfAddress, localBucket.getVersion());
141     }
142
143     /**
144      * Returns all the buckets the this node knows about, self owned + remote
145      */
146     void receiveGetAllBucket(){
147         final ActorRef sender = getSender();
148         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
149     }
150
151     /**
152      * Helper to collect all known buckets
153      *
154      * @return self owned + remote buckets
155      */
156     Map<Address, Bucket> getAllBuckets(){
157         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
158
159         //first add the local bucket
160         all.put(selfAddress, localBucket);
161
162         //then get all remote buckets
163         all.putAll(remoteBuckets);
164
165         return all;
166     }
167
168     /**
169      * Returns buckets for requested members that this node knows about
170      *
171      * @param members requested members
172      */
173     void receiveGetBucketsByMembers(Set<Address> members){
174         final ActorRef sender = getSender();
175         Map<Address, Bucket> buckets = getBucketsByMembers(members);
176         sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
177     }
178
179     /**
180      * Helper to collect buckets for requested memebers
181      *
182      * @param members requested members
183      * @return buckets for requested memebers
184      */
185     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
186         Map<Address, Bucket> buckets = new HashMap<>();
187
188         //first add the local bucket if asked
189         if (members.contains(selfAddress)) {
190             buckets.put(selfAddress, localBucket);
191         }
192
193         //then get buckets for requested remote nodes
194         for (Address address : members){
195             if (remoteBuckets.containsKey(address)) {
196                 buckets.put(address, remoteBuckets.get(address));
197             }
198         }
199
200         return buckets;
201     }
202
203     /**
204      * Returns versions for all buckets known
205      */
206     void receiveGetBucketVersions(){
207         final ActorRef sender = getSender();
208         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
209         sender.tell(reply, getSelf());
210     }
211
212     /**
213      * Update local copy of remote buckets where local copy's version is older
214      *
215      * @param receivedBuckets buckets sent by remote
216      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
217      */
218     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
219
220         if (receivedBuckets == null || receivedBuckets.isEmpty())
221          {
222             return; //nothing to do
223         }
224
225         //Remote cant update self's bucket
226         receivedBuckets.remove(selfAddress);
227
228         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
229
230             Long localVersion = versions.get(entry.getKey());
231             if (localVersion == null) {
232                 localVersion = -1L;
233             }
234
235             Bucket receivedBucket = entry.getValue();
236
237             if (receivedBucket == null) {
238                 continue;
239             }
240
241             Long remoteVersion = receivedBucket.getVersion();
242             if (remoteVersion == null) {
243                 remoteVersion = -1L;
244             }
245
246             //update only if remote version is newer
247             if ( remoteVersion.longValue() > localVersion.longValue() ) {
248                 remoteBuckets.put(entry.getKey(), receivedBucket);
249                 versions.put(entry.getKey(), remoteVersion);
250             }
251         }
252         if(log.isDebugEnabled()) {
253             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
254         }
255     }
256
257     ///
258     ///Getter Setters
259     ///
260
261     BucketImpl getLocalBucket() {
262         return localBucket;
263     }
264
265     void setLocalBucket(BucketImpl localBucket) {
266         this.localBucket = localBucket;
267     }
268
269     ConcurrentMap<Address, Bucket> getRemoteBuckets() {
270         return remoteBuckets;
271     }
272
273     void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
274         this.remoteBuckets = remoteBuckets;
275     }
276
277     ConcurrentMap<Address, Long> getVersions() {
278         return versions;
279     }
280
281     void setVersions(ConcurrentMap<Address, Long> versions) {
282         this.versions = versions;
283     }
284
285     Address getSelfAddress() {
286         return selfAddress;
287     }
288 }