2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.cluster.ClusterActorRefProvider;
15 import com.google.common.base.Preconditions;
16 import java.util.HashMap;
18 import java.util.Map.Entry;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
30 import org.opendaylight.controller.utils.ConditionalProbe;
33 * A store that syncs its data across nodes in the cluster.
34 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
35 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
37 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
38 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
41 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
43 * Bucket owned by the node
45 private final BucketImpl<T> localBucket;
48 * Buckets owned by other known nodes in the cluster.
50 private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
53 * Bucket version for every known node in the cluster including this node
55 private final Map<Address, Long> versions = new HashMap<>();
58 * Cluster address for this node
60 private Address selfAddress;
62 // FIXME: should be part of test-specific subclass
63 private ConditionalProbe probe;
65 private final RemoteRpcProviderConfig config;
67 public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
68 this.config = Preconditions.checkNotNull(config);
69 this.localBucket = new BucketImpl<>(initialData);
73 public void preStart(){
74 ActorRefProvider provider = getContext().provider();
75 selfAddress = provider.getDefaultAddress();
77 if (provider instanceof ClusterActorRefProvider) {
78 getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
83 protected void handleReceive(final Object message) throws Exception {
85 probe.tell(message, getSelf());
88 if (message instanceof GetBucketsByMembers) {
89 receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
90 } else if (message instanceof GetBucketVersions) {
91 receiveGetBucketVersions();
92 } else if (message instanceof UpdateRemoteBuckets) {
93 receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
94 } else if (message instanceof RemoveRemoteBucket) {
95 removeBucket(((RemoveRemoteBucket) message).getAddress());
96 } else if (message instanceof GetAllBuckets) {
97 // GetAllBuckets is used only for unit tests.
98 receiveGetAllBuckets();
99 } else if (message instanceof ConditionalProbe) {
100 // The ConditionalProbe is only used for unit tests.
101 LOG.info("Received probe {} {}", getSelf(), message);
102 probe = (ConditionalProbe) message;
103 // Send back any message to tell the caller we got the probe.
104 getSender().tell("Got it", getSelf());
106 LOG.debug("Unhandled message [{}]", message);
111 protected RemoteRpcProviderConfig getConfig() {
116 * Returns all the buckets the this node knows about, self owned + remote
118 void receiveGetAllBuckets(){
119 final ActorRef sender = getSender();
120 sender.tell(new GetAllBucketsReply<T>(getAllBuckets()), getSelf());
124 * Helper to collect all known buckets
126 * @return self owned + remote buckets
128 Map<Address, Bucket<T>> getAllBuckets(){
129 Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
131 //first add the local bucket
132 all.put(selfAddress, new BucketImpl<>(localBucket));
134 //then get all remote buckets
135 all.putAll(remoteBuckets);
141 * Returns buckets for requested members that this node knows about
143 * @param members requested members
145 void receiveGetBucketsByMembers(final Set<Address> members) {
146 final ActorRef sender = getSender();
147 Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
148 sender.tell(new GetBucketsByMembersReply<T>(buckets), getSelf());
152 * Helper to collect buckets for requested memebers
154 * @param members requested members
155 * @return buckets for requested memebers
157 Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
158 Map<Address, Bucket<T>> buckets = new HashMap<>();
160 //first add the local bucket if asked
161 if (members.contains(selfAddress)) {
162 buckets.put(selfAddress, new BucketImpl<>(localBucket));
165 //then get buckets for requested remote nodes
166 for (Address address : members){
167 if (remoteBuckets.containsKey(address)) {
168 buckets.put(address, remoteBuckets.get(address));
176 * Returns versions for all buckets known
178 void receiveGetBucketVersions(){
179 final ActorRef sender = getSender();
180 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
181 sender.tell(reply, getSelf());
185 * Update local copy of remote buckets where local copy's version is older
187 * @param receivedBuckets buckets sent by remote
188 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
190 void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
191 LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
192 if (receivedBuckets == null || receivedBuckets.isEmpty()) {
197 final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
198 for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
199 if (selfAddress.equals(entry.getKey())) {
200 // Remote cannot update our bucket
204 final Bucket<T> receivedBucket = entry.getValue();
205 if (receivedBucket == null) {
206 LOG.debug("Ignoring null bucket from {}", entry.getKey());
210 // update only if remote version is newer
211 final long remoteVersion = receivedBucket.getVersion();
212 final Long localVersion = versions.get(entry.getKey());
213 if (localVersion != null && remoteVersion <= localVersion.longValue()) {
214 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
219 newBuckets.put(entry.getKey(), receivedBucket);
220 remoteBuckets.put(entry.getKey(), receivedBucket);
221 versions.put(entry.getKey(), remoteVersion);
222 LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
225 LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
227 onBucketsUpdated(newBuckets);
230 private void removeBucket(final Address address) {
231 final Bucket<T> bucket = remoteBuckets.remove(address);
232 if (bucket != null) {
233 onBucketRemoved(address, bucket);
238 * Callback to subclasses invoked when a bucket is removed.
240 * @param address Remote address
241 * @param bucket Bucket removed
243 protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
248 * Callback to subclasses invoked when the set of remote buckets is updated.
250 * @param newBuckets Map of address to new bucket. Never null, but can be empty.
252 protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
256 public BucketImpl<T> getLocalBucket() {
260 protected void updateLocalBucket(final T data) {
261 localBucket.setData(data);
262 versions.put(selfAddress, localBucket.getVersion());
265 public Map<Address, Bucket<T>> getRemoteBuckets() {
266 return remoteBuckets;
269 public Map<Address, Long> getVersions() {