2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Terminated;
15 import akka.cluster.ClusterActorRefProvider;
16 import com.google.common.base.Preconditions;
17 import com.google.common.collect.HashMultimap;
18 import com.google.common.collect.SetMultimap;
19 import java.util.HashMap;
21 import java.util.Map.Entry;
22 import java.util.Optional;
24 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
25 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
30 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
31 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
32 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
33 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
34 import org.opendaylight.controller.utils.ConditionalProbe;
37 * A store that syncs its data across nodes in the cluster.
38 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
39 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
42 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
43 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
46 public class BucketStore<T extends BucketData<T>> extends AbstractUntypedActorWithMetering {
48 * Bucket owned by the node.
50 private final BucketImpl<T> localBucket;
53 * Buckets owned by other known nodes in the cluster.
55 private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
58 * Bucket version for every known node in the cluster including this node.
60 private final Map<Address, Long> versions = new HashMap<>();
63 * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
64 * once, possibly being tied to multiple addresses (and by extension, buckets).
66 private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
69 * Cluster address for this node.
71 private Address selfAddress;
73 // FIXME: should be part of test-specific subclass
74 private ConditionalProbe probe;
76 private final RemoteRpcProviderConfig config;
78 public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
79 this.config = Preconditions.checkNotNull(config);
80 this.localBucket = new BucketImpl<>(initialData);
84 public void preStart() {
85 ActorRefProvider provider = getContext().provider();
86 selfAddress = provider.getDefaultAddress();
88 if (provider instanceof ClusterActorRefProvider) {
89 getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
93 @SuppressWarnings("unchecked")
95 protected void handleReceive(final Object message) throws Exception {
97 probe.tell(message, getSelf());
100 if (message instanceof GetBucketsByMembers) {
101 receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
102 } else if (message instanceof GetBucketVersions) {
103 receiveGetBucketVersions();
104 } else if (message instanceof UpdateRemoteBuckets) {
105 receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
106 } else if (message instanceof RemoveRemoteBucket) {
107 removeBucket(((RemoveRemoteBucket) message).getAddress());
108 } else if (message instanceof Terminated) {
109 actorTerminated((Terminated) message);
110 } else if (message instanceof GetAllBuckets) {
111 // GetAllBuckets is used only for unit tests.
112 receiveGetAllBuckets();
113 } else if (message instanceof ConditionalProbe) {
114 // The ConditionalProbe is only used for unit tests.
115 LOG.info("Received probe {} {}", getSelf(), message);
116 probe = (ConditionalProbe) message;
117 // Send back any message to tell the caller we got the probe.
118 getSender().tell("Got it", getSelf());
120 LOG.debug("Unhandled message [{}]", message);
125 protected RemoteRpcProviderConfig getConfig() {
130 * Returns all the buckets the this node knows about, self owned + remote.
132 void receiveGetAllBuckets() {
133 final ActorRef sender = getSender();
134 sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
138 * Helper to collect all known buckets.
140 * @return self owned + remote buckets
142 Map<Address, Bucket<T>> getAllBuckets() {
143 Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
145 //first add the local bucket
146 all.put(selfAddress, new BucketImpl<>(localBucket));
148 //then get all remote buckets
149 all.putAll(remoteBuckets);
155 * Returns buckets for requested members that this node knows about.
157 * @param members requested members
159 void receiveGetBucketsByMembers(final Set<Address> members) {
160 final ActorRef sender = getSender();
161 Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
162 sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
166 * Helper to collect buckets for requested members.
168 * @param members requested members
169 * @return buckets for requested members
171 Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
172 Map<Address, Bucket<T>> buckets = new HashMap<>();
174 //first add the local bucket if asked
175 if (members.contains(selfAddress)) {
176 buckets.put(selfAddress, new BucketImpl<>(localBucket));
179 //then get buckets for requested remote nodes
180 for (Address address : members) {
181 if (remoteBuckets.containsKey(address)) {
182 buckets.put(address, remoteBuckets.get(address));
190 * Returns versions for all buckets known.
192 void receiveGetBucketVersions() {
193 final ActorRef sender = getSender();
194 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
195 sender.tell(reply, getSelf());
199 * Update local copy of remote buckets where local copy's version is older.
201 * @param receivedBuckets buckets sent by remote
202 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
204 void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
205 LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
206 if (receivedBuckets == null || receivedBuckets.isEmpty()) {
211 final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
212 for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
213 final Address addr = entry.getKey();
215 if (selfAddress.equals(addr)) {
216 // Remote cannot update our bucket
220 final Bucket<T> receivedBucket = entry.getValue();
221 if (receivedBucket == null) {
222 LOG.debug("Ignoring null bucket from {}", addr);
226 // update only if remote version is newer
227 final long remoteVersion = receivedBucket.getVersion();
228 final Long localVersion = versions.get(addr);
229 if (localVersion != null && remoteVersion <= localVersion.longValue()) {
230 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
234 newBuckets.put(addr, receivedBucket);
235 versions.put(addr, remoteVersion);
236 final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
238 // Deal with DeathWatch subscriptions
239 final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
240 final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
241 if (!curRef.equals(prevRef)) {
242 prevRef.ifPresent(ref -> removeWatch(addr, ref));
243 curRef.ifPresent(ref -> addWatch(addr, ref));
246 LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
249 LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
251 onBucketsUpdated(newBuckets);
254 private void addWatch(final Address addr, final ActorRef ref) {
255 if (!watchedActors.containsKey(ref)) {
256 getContext().watch(ref);
257 LOG.debug("Watching {}", ref);
259 watchedActors.put(ref, addr);
262 private void removeWatch(final Address addr, final ActorRef ref) {
263 watchedActors.remove(ref, addr);
264 if (!watchedActors.containsKey(ref)) {
265 getContext().unwatch(ref);
266 LOG.debug("No longer watching {}", ref);
270 private void removeBucket(final Address addr) {
271 final Bucket<T> bucket = remoteBuckets.remove(addr);
272 if (bucket != null) {
273 bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
274 onBucketRemoved(addr, bucket);
278 private void actorTerminated(final Terminated message) {
279 LOG.info("Actor termination {} received", message);
281 for (Address addr : watchedActors.removeAll(message.getActor())) {
282 versions.remove(addr);
283 final Bucket<T> bucket = remoteBuckets.remove(addr);
284 if (bucket != null) {
285 LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
286 onBucketRemoved(addr, bucket);
292 * Callback to subclasses invoked when a bucket is removed.
294 * @param address Remote address
295 * @param bucket Bucket removed
297 protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
302 * Callback to subclasses invoked when the set of remote buckets is updated.
304 * @param newBuckets Map of address to new bucket. Never null, but can be empty.
306 protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
310 public BucketImpl<T> getLocalBucket() {
314 protected void updateLocalBucket(final T data) {
315 localBucket.setData(data);
316 versions.put(selfAddress, localBucket.getVersion());
319 public Map<Address, Bucket<T>> getRemoteBuckets() {
320 return remoteBuckets;
323 public Map<Address, Long> getVersions() {