2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.Address;
13 import akka.actor.Props;
14 import akka.actor.UntypedActor;
15 import akka.cluster.Cluster;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import org.opendaylight.controller.utils.ConditionalProbe;
20 import java.util.HashMap;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
26 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
27 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
28 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
29 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
30 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
31 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
32 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
38 * A store that syncs its data across nodes in the cluster.
39 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
40 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
42 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
43 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
46 public class BucketStore extends UntypedActor {
48 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
51 * Bucket owned by the node
53 private BucketImpl localBucket = new BucketImpl();;
56 * Buckets ownded by other known nodes in the cluster
58 private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
61 * Bucket version for every known node in the cluster including this node
63 private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
66 * Cluster address for this node
68 private final Address selfAddress = Cluster.get(getContext().system()).selfAddress();
71 * Our private gossiper
73 private ActorRef gossiper;
75 private ConditionalProbe probe;
78 gossiper = getContext().actorOf(Props.create(Gossiper.class), "gossiper");
82 * This constructor is useful for testing.
83 * TODO: Pass Props instead of ActorRef
87 public BucketStore(ActorRef gossiper){
88 this.gossiper = gossiper;
92 public void onReceive(Object message) throws Exception {
94 log.debug("Received message: node[{}], message[{}]", selfAddress,
99 probe.tell(message, getSelf());
102 if (message instanceof ConditionalProbe) {
103 log.info("Received probe {} {}", getSelf(), message);
104 probe = (ConditionalProbe) message;
105 } else if (message instanceof UpdateBucket) {
106 receiveUpdateBucket(((UpdateBucket) message).getBucket());
107 } else if (message instanceof GetAllBuckets) {
108 receiveGetAllBucket();
109 } else if (message instanceof GetLocalBucket) {
110 receiveGetLocalBucket();
111 } else if (message instanceof GetBucketsByMembers) {
112 receiveGetBucketsByMembers(
113 ((GetBucketsByMembers) message).getMembers());
114 } else if (message instanceof GetBucketVersions) {
115 receiveGetBucketVersions();
116 } else if (message instanceof UpdateRemoteBuckets) {
117 receiveUpdateRemoteBuckets(
118 ((UpdateRemoteBuckets) message).getBuckets());
120 log.debug("Unhandled message [{}]", message);
127 * Returns a copy of bucket owned by this node
129 private void receiveGetLocalBucket() {
130 final ActorRef sender = getSender();
131 GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
132 sender.tell(reply, getSelf());
136 * Updates the bucket owned by this node
138 * @param updatedBucket
140 void receiveUpdateBucket(Bucket updatedBucket){
142 localBucket = (BucketImpl) updatedBucket;
143 versions.put(selfAddress, localBucket.getVersion());
147 * Returns all the buckets the this node knows about, self owned + remote
149 void receiveGetAllBucket(){
150 final ActorRef sender = getSender();
151 sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
155 * Helper to collect all known buckets
157 * @return self owned + remote buckets
159 Map<Address, Bucket> getAllBuckets(){
160 Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
162 //first add the local bucket
163 all.put(selfAddress, localBucket);
165 //then get all remote buckets
166 all.putAll(remoteBuckets);
172 * Returns buckets for requested members that this node knows about
174 * @param members requested members
176 void receiveGetBucketsByMembers(Set<Address> members){
177 final ActorRef sender = getSender();
178 Map<Address, Bucket> buckets = getBucketsByMembers(members);
179 sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
183 * Helper to collect buckets for requested memebers
185 * @param members requested members
186 * @return buckets for requested memebers
188 Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
189 Map<Address, Bucket> buckets = new HashMap<>();
191 //first add the local bucket if asked
192 if (members.contains(selfAddress))
193 buckets.put(selfAddress, localBucket);
195 //then get buckets for requested remote nodes
196 for (Address address : members){
197 if (remoteBuckets.containsKey(address))
198 buckets.put(address, remoteBuckets.get(address));
205 * Returns versions for all buckets known
207 void receiveGetBucketVersions(){
208 final ActorRef sender = getSender();
209 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
210 sender.tell(reply, getSelf());
214 * Update local copy of remote buckets where local copy's version is older
216 * @param receivedBuckets buckets sent by remote
217 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
219 void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
221 if (receivedBuckets == null || receivedBuckets.isEmpty())
222 return; //nothing to do
224 //Remote cant update self's bucket
225 receivedBuckets.remove(selfAddress);
227 for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
229 Long localVersion = versions.get(entry.getKey());
230 if (localVersion == null) localVersion = -1L;
232 Bucket receivedBucket = entry.getValue();
234 if (receivedBucket == null)
237 Long remoteVersion = receivedBucket.getVersion();
238 if (remoteVersion == null) remoteVersion = -1L;
240 //update only if remote version is newer
241 if ( remoteVersion > localVersion ) {
242 remoteBuckets.put(entry.getKey(), receivedBucket);
243 versions.put(entry.getKey(), remoteVersion);
247 log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
254 BucketImpl getLocalBucket() {
258 void setLocalBucket(BucketImpl localBucket) {
259 this.localBucket = localBucket;
262 ConcurrentMap<Address, Bucket> getRemoteBuckets() {
263 return remoteBuckets;
266 void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
267 this.remoteBuckets = remoteBuckets;
270 ConcurrentMap<Address, Long> getVersions() {
274 void setVersions(ConcurrentMap<Address, Long> versions) {
275 this.versions = versions;
278 Address getSelfAddress() {