2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.Address;
13 import akka.actor.Props;
14 import akka.actor.UntypedActor;
15 import akka.cluster.Cluster;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
19 import java.util.HashMap;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
25 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
26 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
27 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
28 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
29 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
30 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
31 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
32 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
37 * A store that syncs its data across nodes in the cluster.
38 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
39 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
41 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
42 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
45 public class BucketStore extends UntypedActor {
47 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
50 * Bucket owned by the node
52 private BucketImpl localBucket = new BucketImpl();;
55 * Buckets ownded by other known nodes in the cluster
57 private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
60 * Bucket version for every known node in the cluster including this node
62 private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
65 * Cluster address for this node
67 private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
70 * Our private gossiper
72 private ActorRef gossiper;
75 gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
79 * This constructor is useful for testing.
80 * TODO: Pass Props instead of ActorRef
84 public BucketStore(ActorRef gossiper){
85 this.gossiper = gossiper;
89 public void onReceive(Object message) throws Exception {
91 log.debug("Received message: node[{}], message[{}]", selfAddress, message);
93 if (message instanceof UpdateBucket)
94 receiveUpdateBucket(((UpdateBucket) message).getBucket());
96 else if (message instanceof GetAllBuckets)
97 receiveGetAllBucket();
99 else if (message instanceof GetLocalBucket)
100 receiveGetLocalBucket();
102 else if (message instanceof GetBucketsByMembers)
103 receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
105 else if (message instanceof GetBucketVersions)
106 receiveGetBucketVersions();
108 else if (message instanceof UpdateRemoteBuckets)
109 receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
112 log.debug("Unhandled message [{}]", message);
119 * Returns a copy of bucket owned by this node
121 private void receiveGetLocalBucket() {
122 final ActorRef sender = getSender();
123 GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
124 sender.tell(reply, getSelf());
128 * Updates the bucket owned by this node
130 * @param updatedBucket
132 void receiveUpdateBucket(Bucket updatedBucket){
134 localBucket = (BucketImpl) updatedBucket;
135 versions.put(selfAddress, localBucket.getVersion());
139 * Returns all the buckets the this node knows about, self owned + remote
141 void receiveGetAllBucket(){
142 final ActorRef sender = getSender();
143 sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
147 * Helper to collect all known buckets
149 * @return self owned + remote buckets
151 Map<Address, Bucket> getAllBuckets(){
152 Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
154 //first add the local bucket
155 all.put(selfAddress, localBucket);
157 //then get all remote buckets
158 all.putAll(remoteBuckets);
164 * Returns buckets for requested members that this node knows about
166 * @param members requested members
168 void receiveGetBucketsByMembers(Set<Address> members){
169 final ActorRef sender = getSender();
170 Map<Address, Bucket> buckets = getBucketsByMembers(members);
171 sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
175 * Helper to collect buckets for requested memebers
177 * @param members requested members
178 * @return buckets for requested memebers
180 Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
181 Map<Address, Bucket> buckets = new HashMap<>();
183 //first add the local bucket if asked
184 if (members.contains(selfAddress))
185 buckets.put(selfAddress, localBucket);
187 //then get buckets for requested remote nodes
188 for (Address address : members){
189 if (remoteBuckets.containsKey(address))
190 buckets.put(address, remoteBuckets.get(address));
197 * Returns versions for all buckets known
199 void receiveGetBucketVersions(){
200 final ActorRef sender = getSender();
201 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
202 sender.tell(reply, getSelf());
206 * Update local copy of remote buckets where local copy's version is older
208 * @param receivedBuckets buckets sent by remote
209 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
211 void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
213 if (receivedBuckets == null || receivedBuckets.isEmpty())
214 return; //nothing to do
216 //Remote cant update self's bucket
217 receivedBuckets.remove(selfAddress);
219 for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
221 Long localVersion = versions.get(entry.getKey());
222 if (localVersion == null) localVersion = -1L;
224 Bucket receivedBucket = entry.getValue();
226 if (receivedBucket == null)
229 Long remoteVersion = receivedBucket.getVersion();
230 if (remoteVersion == null) remoteVersion = -1L;
232 //update only if remote version is newer
233 if ( remoteVersion > localVersion ) {
234 remoteBuckets.put(entry.getKey(), receivedBucket);
235 versions.put(entry.getKey(), remoteVersion);
239 log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
246 BucketImpl getLocalBucket() {
250 void setLocalBucket(BucketImpl localBucket) {
251 this.localBucket = localBucket;
254 ConcurrentMap<Address, Bucket> getRemoteBuckets() {
255 return remoteBuckets;
258 void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
259 this.remoteBuckets = remoteBuckets;
262 ConcurrentMap<Address, Long> getVersions() {
266 void setVersions(ConcurrentMap<Address, Long> versions) {
267 this.versions = versions;
270 Address getSelfAddress() {