2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.cluster.ClusterActorRefProvider;
16 import com.google.common.base.Preconditions;
17 import java.util.HashMap;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
29 import org.opendaylight.controller.utils.ConditionalProbe;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * A store that syncs its data across nodes in the cluster.
35 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
36 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
39 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
40 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
43 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
45 private static final Long NO_VERSION = -1L;
47 protected final Logger log = LoggerFactory.getLogger(getClass());
50 * Bucket owned by the node.
52 private final BucketImpl<T> localBucket = new BucketImpl<>();
55 * Buckets ownded by other known nodes in the cluster.
57 private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
60 * Bucket version for every known node in the cluster including this node.
62 private final Map<Address, Long> versions = new HashMap<>();
65 * Cluster address for this node.
67 private Address selfAddress;
69 private ConditionalProbe probe;
71 private final RemoteRpcProviderConfig config;
73 public BucketStore(RemoteRpcProviderConfig config) {
74 this.config = Preconditions.checkNotNull(config);
78 public void preStart() {
79 ActorRefProvider provider = getContext().provider();
80 selfAddress = provider.getDefaultAddress();
82 if ( provider instanceof ClusterActorRefProvider) {
83 getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
87 @SuppressWarnings("unchecked")
89 protected void handleReceive(Object message) throws Exception {
91 probe.tell(message, getSelf());
94 if (message instanceof ConditionalProbe) {
95 // The ConditionalProbe is only used for unit tests.
96 log.info("Received probe {} {}", getSelf(), message);
97 probe = (ConditionalProbe) message;
98 // Send back any message to tell the caller we got the probe.
99 getSender().tell("Got it", getSelf());
100 } else if (message instanceof GetAllBuckets) {
101 receiveGetAllBuckets();
102 } else if (message instanceof GetBucketsByMembers) {
103 receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
104 } else if (message instanceof GetBucketVersions) {
105 receiveGetBucketVersions();
106 } else if (message instanceof UpdateRemoteBuckets) {
107 receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
109 log.debug("Unhandled message [{}]", message);
114 protected RemoteRpcProviderConfig getConfig() {
119 * Returns all the buckets the this node knows about, self owned + remote.
121 void receiveGetAllBuckets() {
122 final ActorRef sender = getSender();
123 sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
127 * Helper to collect all known buckets.
129 * @return self owned + remote buckets
131 Map<Address, Bucket<T>> getAllBuckets() {
132 Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
134 //first add the local bucket
135 all.put(selfAddress, new BucketImpl<>(localBucket));
137 //then get all remote buckets
138 all.putAll(remoteBuckets);
144 * Returns buckets for requested members that this node knows about.
146 * @param members requested members
148 void receiveGetBucketsByMembers(Set<Address> members) {
149 final ActorRef sender = getSender();
150 Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
151 sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
155 * Helper to collect buckets for requested members.
157 * @param members requested members
158 * @return buckets for requested members
160 Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
161 Map<Address, Bucket<T>> buckets = new HashMap<>();
163 //first add the local bucket if asked
164 if (members.contains(selfAddress)) {
165 buckets.put(selfAddress, new BucketImpl<>(localBucket));
168 //then get buckets for requested remote nodes
169 for (Address address : members) {
170 if (remoteBuckets.containsKey(address)) {
171 buckets.put(address, remoteBuckets.get(address));
179 * Returns versions for all buckets known.
181 void receiveGetBucketVersions() {
182 final ActorRef sender = getSender();
183 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
184 sender.tell(reply, getSelf());
188 * Update local copy of remote buckets where local copy's version is older.
190 * @param receivedBuckets buckets sent by remote
191 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
193 void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets) {
194 log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
195 if (receivedBuckets == null || receivedBuckets.isEmpty()) {
196 return; //nothing to do
199 //Remote cant update self's bucket
200 receivedBuckets.remove(selfAddress);
202 for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
204 Long localVersion = versions.get(entry.getKey());
205 if (localVersion == null) {
206 localVersion = NO_VERSION;
209 Bucket<T> receivedBucket = entry.getValue();
211 if (receivedBucket == null) {
215 Long remoteVersion = receivedBucket.getVersion();
216 if (remoteVersion == null) {
217 remoteVersion = NO_VERSION;
220 //update only if remote version is newer
221 if ( remoteVersion.longValue() > localVersion.longValue() ) {
222 remoteBuckets.put(entry.getKey(), receivedBucket);
223 versions.put(entry.getKey(), remoteVersion);
227 log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
232 protected void onBucketsUpdated() {
235 public BucketImpl<T> getLocalBucket() {
239 protected void updateLocalBucket(T data) {
240 localBucket.setData(data);
241 versions.put(selfAddress, localBucket.getVersion());
244 public Map<Address, Bucket<T>> getRemoteBuckets() {
245 return remoteBuckets;
248 public Map<Address, Long> getVersions() {