Merge "Fix test case mis-spelling."
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.cluster.ClusterActorRefProvider;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import com.google.common.annotations.VisibleForTesting;
19 import java.util.HashMap;
20 import java.util.Map;
21 import java.util.Set;
22 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
23 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
31 import org.opendaylight.controller.utils.ConditionalProbe;
32
33 /**
34  * A store that syncs its data across nodes in the cluster.
35  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
36  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
37  * <p>
38  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
39  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
40  *
41  */
42 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
43
44     private static final Long NO_VERSION = -1L;
45
46     final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
47
48     /**
49      * Bucket owned by the node
50      */
51     private final BucketImpl<T> localBucket = new BucketImpl<>();
52
53     /**
54      * Buckets ownded by other known nodes in the cluster
55      */
56     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
57
58     /**
59      * Bucket version for every known node in the cluster including this node
60      */
61     private final Map<Address, Long> versions = new HashMap<>();
62
63     /**
64      * Cluster address for this node
65      */
66     private Address selfAddress;
67
68     private ConditionalProbe probe;
69
70     private final RemoteRpcProviderConfig config;
71
72     public BucketStore(){
73         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
74     }
75
76     @Override
77     public void preStart(){
78         ActorRefProvider provider = getContext().provider();
79         selfAddress = provider.getDefaultAddress();
80
81         if ( provider instanceof ClusterActorRefProvider) {
82             getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
83         }
84     }
85
86     @Override
87     protected void handleReceive(Object message) throws Exception {
88         if (probe != null) {
89             probe.tell(message, getSelf());
90         }
91
92         if (message instanceof ConditionalProbe) {
93             // The ConditionalProbe is only used for unit tests.
94             log.info("Received probe {} {}", getSelf(), message);
95             probe = (ConditionalProbe) message;
96             // Send back any message to tell the caller we got the probe.
97             getSender().tell("Got it", getSelf());
98         } else if (message instanceof GetAllBuckets) {
99             receiveGetAllBuckets();
100         } else if (message instanceof GetBucketsByMembers) {
101             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
102         } else if (message instanceof GetBucketVersions) {
103             receiveGetBucketVersions();
104         } else if (message instanceof UpdateRemoteBuckets) {
105             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
106         } else {
107             if(log.isDebugEnabled()) {
108                 log.debug("Unhandled message [{}]", message);
109             }
110             unhandled(message);
111         }
112     }
113
114     /**
115      * Returns all the buckets the this node knows about, self owned + remote
116      */
117     void receiveGetAllBuckets(){
118         final ActorRef sender = getSender();
119         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
120     }
121
122     /**
123      * Helper to collect all known buckets
124      *
125      * @return self owned + remote buckets
126      */
127     @SuppressWarnings("rawtypes")
128     Map<Address, Bucket> getAllBuckets(){
129         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
130
131         //first add the local bucket
132         all.put(selfAddress, new BucketImpl<>(localBucket));
133
134         //then get all remote buckets
135         all.putAll(remoteBuckets);
136
137         return all;
138     }
139
140     /**
141      * Returns buckets for requested members that this node knows about
142      *
143      * @param members requested members
144      */
145     @SuppressWarnings("rawtypes")
146     void receiveGetBucketsByMembers(Set<Address> members){
147         final ActorRef sender = getSender();
148         Map<Address, Bucket> buckets = getBucketsByMembers(members);
149         sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
150     }
151
152     /**
153      * Helper to collect buckets for requested memebers
154      *
155      * @param members requested members
156      * @return buckets for requested memebers
157      */
158     @SuppressWarnings("rawtypes")
159     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
160         Map<Address, Bucket> buckets = new HashMap<>();
161
162         //first add the local bucket if asked
163         if (members.contains(selfAddress)) {
164             buckets.put(selfAddress, new BucketImpl<>(localBucket));
165         }
166
167         //then get buckets for requested remote nodes
168         for (Address address : members){
169             if (remoteBuckets.containsKey(address)) {
170                 buckets.put(address, remoteBuckets.get(address));
171             }
172         }
173
174         return buckets;
175     }
176
177     /**
178      * Returns versions for all buckets known
179      */
180     void receiveGetBucketVersions(){
181         final ActorRef sender = getSender();
182         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
183         sender.tell(reply, getSelf());
184     }
185
186     /**
187      * Update local copy of remote buckets where local copy's version is older
188      *
189      * @param receivedBuckets buckets sent by remote
190      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
191      */
192     @SuppressWarnings({ "rawtypes", "unchecked" })
193     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
194         log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
195         if (receivedBuckets == null || receivedBuckets.isEmpty())
196          {
197             return; //nothing to do
198         }
199
200         //Remote cant update self's bucket
201         receivedBuckets.remove(selfAddress);
202
203         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
204
205             Long localVersion = versions.get(entry.getKey());
206             if (localVersion == null) {
207                 localVersion = NO_VERSION;
208             }
209
210             Bucket<T> receivedBucket = entry.getValue();
211
212             if (receivedBucket == null) {
213                 continue;
214             }
215
216             Long remoteVersion = receivedBucket.getVersion();
217             if (remoteVersion == null) {
218                 remoteVersion = NO_VERSION;
219             }
220
221             //update only if remote version is newer
222             if ( remoteVersion.longValue() > localVersion.longValue() ) {
223                 remoteBuckets.put(entry.getKey(), receivedBucket);
224                 versions.put(entry.getKey(), remoteVersion);
225             }
226         }
227
228         if(log.isDebugEnabled()) {
229             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
230         }
231     }
232
233     protected BucketImpl<T> getLocalBucket() {
234         return localBucket;
235     }
236
237     protected void updateLocalBucket(T data) {
238         localBucket.setData(data);
239         versions.put(selfAddress, localBucket.getVersion());
240     }
241
242     protected Map<Address, Bucket<T>> getRemoteBuckets() {
243         return remoteBuckets;
244     }
245
246     @VisibleForTesting
247     Map<Address, Long> getVersions() {
248         return versions;
249     }
250 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.