2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.cluster.ClusterActorRefProvider;
16 import akka.event.Logging;
17 import akka.event.LoggingAdapter;
18 import java.util.HashMap;
21 import java.util.concurrent.ConcurrentHashMap;
22 import java.util.concurrent.ConcurrentMap;
23 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
24 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucket;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetLocalBucketReply;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateBucket;
34 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
35 import org.opendaylight.controller.utils.ConditionalProbe;
38 * A store that syncs its data across nodes in the cluster.
39 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
40 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
42 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
43 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
46 public class BucketStore extends AbstractUntypedActorWithMetering {
48 final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
51 * Bucket owned by the node
53 private BucketImpl localBucket = new BucketImpl();
56 * Buckets ownded by other known nodes in the cluster
58 private ConcurrentMap<Address, Bucket> remoteBuckets = new ConcurrentHashMap<>();
61 * Bucket version for every known node in the cluster including this node
63 private ConcurrentMap<Address, Long> versions = new ConcurrentHashMap<>();
66 * Cluster address for this node
68 private Address selfAddress;
70 private ConditionalProbe probe;
72 private final RemoteRpcProviderConfig config;
75 config = new RemoteRpcProviderConfig(getContext().system().settings().config());
79 public void preStart(){
80 ActorRefProvider provider = getContext().provider();
81 selfAddress = provider.getDefaultAddress();
83 if ( provider instanceof ClusterActorRefProvider) {
84 getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
90 protected void handleReceive(Object message) throws Exception {
92 probe.tell(message, getSelf());
95 if (message instanceof ConditionalProbe) {
96 // The ConditionalProbe is only used for unit tests.
97 log.info("Received probe {} {}", getSelf(), message);
98 probe = (ConditionalProbe) message;
99 // Send back any message to tell the caller we got the probe.
100 getSender().tell("Got it", getSelf());
101 } else if (message instanceof UpdateBucket) {
102 receiveUpdateBucket(((UpdateBucket) message).getBucket());
103 } else if (message instanceof GetAllBuckets) {
104 receiveGetAllBucket();
105 } else if (message instanceof GetLocalBucket) {
106 receiveGetLocalBucket();
107 } else if (message instanceof GetBucketsByMembers) {
108 receiveGetBucketsByMembers(
109 ((GetBucketsByMembers) message).getMembers());
110 } else if (message instanceof GetBucketVersions) {
111 receiveGetBucketVersions();
112 } else if (message instanceof UpdateRemoteBuckets) {
113 receiveUpdateRemoteBuckets(
114 ((UpdateRemoteBuckets) message).getBuckets());
116 if(log.isDebugEnabled()) {
117 log.debug("Unhandled message [{}]", message);
124 * Returns a copy of bucket owned by this node
126 private void receiveGetLocalBucket() {
127 final ActorRef sender = getSender();
128 GetLocalBucketReply reply = new GetLocalBucketReply(localBucket);
129 sender.tell(reply, getSelf());
133 * Updates the bucket owned by this node
135 * @param updatedBucket
137 void receiveUpdateBucket(Bucket updatedBucket){
139 localBucket = (BucketImpl) updatedBucket;
140 versions.put(selfAddress, localBucket.getVersion());
144 * Returns all the buckets the this node knows about, self owned + remote
146 void receiveGetAllBucket(){
147 final ActorRef sender = getSender();
148 sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
152 * Helper to collect all known buckets
154 * @return self owned + remote buckets
156 Map<Address, Bucket> getAllBuckets(){
157 Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
159 //first add the local bucket
160 all.put(selfAddress, localBucket);
162 //then get all remote buckets
163 all.putAll(remoteBuckets);
169 * Returns buckets for requested members that this node knows about
171 * @param members requested members
173 void receiveGetBucketsByMembers(Set<Address> members){
174 final ActorRef sender = getSender();
175 Map<Address, Bucket> buckets = getBucketsByMembers(members);
176 sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
180 * Helper to collect buckets for requested memebers
182 * @param members requested members
183 * @return buckets for requested memebers
185 Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
186 Map<Address, Bucket> buckets = new HashMap<>();
188 //first add the local bucket if asked
189 if (members.contains(selfAddress)) {
190 buckets.put(selfAddress, localBucket);
193 //then get buckets for requested remote nodes
194 for (Address address : members){
195 if (remoteBuckets.containsKey(address)) {
196 buckets.put(address, remoteBuckets.get(address));
204 * Returns versions for all buckets known
206 void receiveGetBucketVersions(){
207 final ActorRef sender = getSender();
208 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
209 sender.tell(reply, getSelf());
213 * Update local copy of remote buckets where local copy's version is older
215 * @param receivedBuckets buckets sent by remote
216 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
218 void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
220 if (receivedBuckets == null || receivedBuckets.isEmpty())
222 return; //nothing to do
225 //Remote cant update self's bucket
226 receivedBuckets.remove(selfAddress);
228 for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
230 Long localVersion = versions.get(entry.getKey());
231 if (localVersion == null) {
235 Bucket receivedBucket = entry.getValue();
237 if (receivedBucket == null) {
241 Long remoteVersion = receivedBucket.getVersion();
242 if (remoteVersion == null) {
246 //update only if remote version is newer
247 if ( remoteVersion.longValue() > localVersion.longValue() ) {
248 remoteBuckets.put(entry.getKey(), receivedBucket);
249 versions.put(entry.getKey(), remoteVersion);
252 if(log.isDebugEnabled()) {
253 log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
261 BucketImpl getLocalBucket() {
265 void setLocalBucket(BucketImpl localBucket) {
266 this.localBucket = localBucket;
269 ConcurrentMap<Address, Bucket> getRemoteBuckets() {
270 return remoteBuckets;
273 void setRemoteBuckets(ConcurrentMap<Address, Bucket> remoteBuckets) {
274 this.remoteBuckets = remoteBuckets;
277 ConcurrentMap<Address, Long> getVersions() {
281 void setVersions(ConcurrentMap<Address, Long> versions) {
282 this.versions = versions;
285 Address getSelfAddress() {