2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.cluster.ClusterActorRefProvider;
17 import akka.event.Logging;
18 import akka.event.LoggingAdapter;
19 import org.opendaylight.controller.remote.rpc.utils.ActorUtil;
20 import org.opendaylight.controller.utils.ConditionalProbe;
22 import java.util.HashMap;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ConcurrentMap;
28 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
29 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
30 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
31 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
32 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
33 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
34 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
35 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
36 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
37 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
40 * A store that syncs its data across nodes in the cluster.
41 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
42 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
44 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
45 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
48 public class BucketStore extends UntypedActor {
50 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
53 * Bucket owned by the node
55 private BucketImpl localBucket = new BucketImpl();;
58 * Buckets ownded by other known nodes in the cluster
60 private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
63 * Bucket version for every known node in the cluster including this node
65 private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
68 * Cluster address for this node
70 private Address selfAddress;
72 private ConditionalProbe probe;
75 public void preStart(){
76 ActorRefProvider provider = getContext().provider();
77 selfAddress = provider.getDefaultAddress();
79 if ( provider instanceof ClusterActorRefProvider)
80 getContext().actorOf(Props.create(Gossiper.class).withMailbox(ActorUtil.MAILBOX), "gossiper");
84 public void onReceive(Object message) throws Exception {
86 log.debug("Received message: node[{}], message[{}]", selfAddress, message);
89 probe.tell(message, getSelf());
92 if (message instanceof ConditionalProbe) {
93 log.info("Received probe {} {}", getSelf(), message);
94 probe = (ConditionalProbe) message;
95 } else if (message instanceof UpdateBucket) {
96 receiveUpdateBucket(((UpdateBucket) message).getBucket());
97 } else if (message instanceof GetAllBuckets) {
98 receiveGetAllBucket();
99 } else if (message instanceof GetLocalBucket) {
100 receiveGetLocalBucket();
101 } else if (message instanceof GetBucketsByMembers) {
102 receiveGetBucketsByMembers(
103 ((GetBucketsByMembers) message).getMembers());
104 } else if (message instanceof GetBucketVersions) {
105 receiveGetBucketVersions();
106 } else if (message instanceof UpdateRemoteBuckets) {
107 receiveUpdateRemoteBuckets(
108 ((UpdateRemoteBuckets) message).getBuckets());
110 log.debug("Unhandled message [{}]", message);
117 * Returns a copy of bucket owned by this node
119 private void receiveGetLocalBucket() {
120 final ActorRef sender = getSender();
121 GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
122 sender.tell(reply, getSelf());
126 * Updates the bucket owned by this node
128 * @param updatedBucket
130 void receiveUpdateBucket(Bucket updatedBucket){
132 localBucket = (BucketImpl) updatedBucket;
133 versions.put(selfAddress, localBucket.getVersion());
137 * Returns all the buckets the this node knows about, self owned + remote
139 void receiveGetAllBucket(){
140 final ActorRef sender = getSender();
141 sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
145 * Helper to collect all known buckets
147 * @return self owned + remote buckets
149 Map<Address, Bucket> getAllBuckets(){
150 Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
152 //first add the local bucket
153 all.put(selfAddress, localBucket);
155 //then get all remote buckets
156 all.putAll(remoteBuckets);
162 * Returns buckets for requested members that this node knows about
164 * @param members requested members
166 void receiveGetBucketsByMembers(Set<Address> members){
167 final ActorRef sender = getSender();
168 Map<Address, Bucket> buckets = getBucketsByMembers(members);
169 sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
173 * Helper to collect buckets for requested memebers
175 * @param members requested members
176 * @return buckets for requested memebers
178 Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
179 Map<Address, Bucket> buckets = new HashMap<>();
181 //first add the local bucket if asked
182 if (members.contains(selfAddress))
183 buckets.put(selfAddress, localBucket);
185 //then get buckets for requested remote nodes
186 for (Address address : members){
187 if (remoteBuckets.containsKey(address))
188 buckets.put(address, remoteBuckets.get(address));
195 * Returns versions for all buckets known
197 void receiveGetBucketVersions(){
198 final ActorRef sender = getSender();
199 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
200 sender.tell(reply, getSelf());
204 * Update local copy of remote buckets where local copy's version is older
206 * @param receivedBuckets buckets sent by remote
207 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
209 void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
211 if (receivedBuckets == null || receivedBuckets.isEmpty())
212 return; //nothing to do
214 //Remote cant update self's bucket
215 receivedBuckets.remove(selfAddress);
217 for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
219 Long localVersion = versions.get(entry.getKey());
220 if (localVersion == null) localVersion = -1L;
222 Bucket receivedBucket = entry.getValue();
224 if (receivedBucket == null)
227 Long remoteVersion = receivedBucket.getVersion();
228 if (remoteVersion == null) remoteVersion = -1L;
230 //update only if remote version is newer
231 if ( remoteVersion.longValue() > localVersion.longValue() ) {
232 remoteBuckets.put(entry.getKey(), receivedBucket);
233 versions.put(entry.getKey(), remoteVersion);
237 log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
244 BucketImpl getLocalBucket() {
248 void setLocalBucket(BucketImpl localBucket) {
249 this.localBucket = localBucket;
252 ConcurrentMap<Address, Bucket> getRemoteBuckets() {
253 return remoteBuckets;
256 void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
257 this.remoteBuckets = remoteBuckets;
260 ConcurrentMap<Address, Long> getVersions() {
264 void setVersions(ConcurrentMap<Address, Long> versions) {
265 this.versions = versions;
268 Address getSelfAddress() {