2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.cluster.ClusterActorRefProvider;
15 import com.google.common.base.Preconditions;
16 import java.util.HashMap;
18 import java.util.Map.Entry;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
30 import org.opendaylight.controller.utils.ConditionalProbe;
33 * A store that syncs its data across nodes in the cluster.
34 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
35 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
38 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
39 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
42 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
44 * Bucket owned by the node.
46 private final BucketImpl<T> localBucket;
49 * Buckets owned by other known nodes in the cluster.
51 private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
54 * Bucket version for every known node in the cluster including this node.
56 private final Map<Address, Long> versions = new HashMap<>();
59 * Cluster address for this node.
61 private Address selfAddress;
63 // FIXME: should be part of test-specific subclass
64 private ConditionalProbe probe;
66 private final RemoteRpcProviderConfig config;
68 public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
69 this.config = Preconditions.checkNotNull(config);
70 this.localBucket = new BucketImpl<>(initialData);
74 public void preStart() {
75 ActorRefProvider provider = getContext().provider();
76 selfAddress = provider.getDefaultAddress();
78 if (provider instanceof ClusterActorRefProvider) {
79 getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
83 @SuppressWarnings("unchecked")
85 protected void handleReceive(final Object message) throws Exception {
87 probe.tell(message, getSelf());
90 if (message instanceof GetBucketsByMembers) {
91 receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
92 } else if (message instanceof GetBucketVersions) {
93 receiveGetBucketVersions();
94 } else if (message instanceof UpdateRemoteBuckets) {
95 receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
96 } else if (message instanceof RemoveRemoteBucket) {
97 removeBucket(((RemoveRemoteBucket) message).getAddress());
98 } else if (message instanceof GetAllBuckets) {
99 // GetAllBuckets is used only for unit tests.
100 receiveGetAllBuckets();
101 } else if (message instanceof ConditionalProbe) {
102 // The ConditionalProbe is only used for unit tests.
103 LOG.info("Received probe {} {}", getSelf(), message);
104 probe = (ConditionalProbe) message;
105 // Send back any message to tell the caller we got the probe.
106 getSender().tell("Got it", getSelf());
108 LOG.debug("Unhandled message [{}]", message);
113 protected RemoteRpcProviderConfig getConfig() {
118 * Returns all the buckets the this node knows about, self owned + remote.
120 void receiveGetAllBuckets() {
121 final ActorRef sender = getSender();
122 sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
126 * Helper to collect all known buckets.
128 * @return self owned + remote buckets
130 Map<Address, Bucket<T>> getAllBuckets() {
131 Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
133 //first add the local bucket
134 all.put(selfAddress, new BucketImpl<>(localBucket));
136 //then get all remote buckets
137 all.putAll(remoteBuckets);
143 * Returns buckets for requested members that this node knows about.
145 * @param members requested members
147 void receiveGetBucketsByMembers(final Set<Address> members) {
148 final ActorRef sender = getSender();
149 Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
150 sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
154 * Helper to collect buckets for requested members.
156 * @param members requested members
157 * @return buckets for requested members
159 Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
160 Map<Address, Bucket<T>> buckets = new HashMap<>();
162 //first add the local bucket if asked
163 if (members.contains(selfAddress)) {
164 buckets.put(selfAddress, new BucketImpl<>(localBucket));
167 //then get buckets for requested remote nodes
168 for (Address address : members) {
169 if (remoteBuckets.containsKey(address)) {
170 buckets.put(address, remoteBuckets.get(address));
178 * Returns versions for all buckets known.
180 void receiveGetBucketVersions() {
181 final ActorRef sender = getSender();
182 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
183 sender.tell(reply, getSelf());
187 * Update local copy of remote buckets where local copy's version is older.
189 * @param receivedBuckets buckets sent by remote
190 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
192 void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
193 LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
194 if (receivedBuckets == null || receivedBuckets.isEmpty()) {
199 final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
200 for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
201 if (selfAddress.equals(entry.getKey())) {
202 // Remote cannot update our bucket
206 final Bucket<T> receivedBucket = entry.getValue();
207 if (receivedBucket == null) {
208 LOG.debug("Ignoring null bucket from {}", entry.getKey());
212 // update only if remote version is newer
213 final long remoteVersion = receivedBucket.getVersion();
214 final Long localVersion = versions.get(entry.getKey());
215 if (localVersion != null && remoteVersion <= localVersion.longValue()) {
216 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
221 newBuckets.put(entry.getKey(), receivedBucket);
222 remoteBuckets.put(entry.getKey(), receivedBucket);
223 versions.put(entry.getKey(), remoteVersion);
224 LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
227 LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
229 onBucketsUpdated(newBuckets);
232 private void removeBucket(final Address address) {
233 final Bucket<T> bucket = remoteBuckets.remove(address);
234 if (bucket != null) {
235 onBucketRemoved(address, bucket);
240 * Callback to subclasses invoked when a bucket is removed.
242 * @param address Remote address
243 * @param bucket Bucket removed
245 protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
250 * Callback to subclasses invoked when the set of remote buckets is updated.
252 * @param newBuckets Map of address to new bucket. Never null, but can be empty.
254 protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
258 public BucketImpl<T> getLocalBucket() {
262 protected void updateLocalBucket(final T data) {
263 localBucket.setData(data);
264 versions.put(selfAddress, localBucket.getVersion());
267 public Map<Address, Bucket<T>> getRemoteBuckets() {
268 return remoteBuckets;
271 public Map<Address, Long> getVersions() {