2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorRefProvider;
15 import akka.actor.Address;
16 import akka.actor.PoisonPill;
17 import akka.actor.Terminated;
18 import akka.cluster.ClusterActorRefProvider;
19 import akka.persistence.DeleteSnapshotsFailure;
20 import akka.persistence.DeleteSnapshotsSuccess;
21 import akka.persistence.RecoveryCompleted;
22 import akka.persistence.SaveSnapshotFailure;
23 import akka.persistence.SaveSnapshotSuccess;
24 import akka.persistence.SnapshotOffer;
25 import akka.persistence.SnapshotSelectionCriteria;
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Verify;
29 import com.google.common.collect.HashMultimap;
30 import com.google.common.collect.SetMultimap;
31 import java.util.HashMap;
33 import java.util.Map.Entry;
34 import java.util.Optional;
36 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
37 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
41 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
42 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
43 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
44 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
47 * A store that syncs its data across nodes in the cluster.
48 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
49 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
52 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
53 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
56 public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
58 * Buckets owned by other known nodes in the cluster.
60 private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
63 * Bucket version for every known node in the cluster including this node.
65 private final Map<Address, Long> versions = new HashMap<>();
68 * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
69 * once, possibly being tied to multiple addresses (and by extension, buckets).
71 private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
73 private final RemoteRpcProviderConfig config;
74 private final String persistenceId;
77 * Cluster address for this node.
79 private Address selfAddress;
82 * Bucket owned by the node. Initialized during recovery (due to incarnation number).
84 private LocalBucket<T> localBucket;
85 private T initialData;
86 private Integer incarnation;
87 private boolean persisting;
89 public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
90 this.config = Preconditions.checkNotNull(config);
91 this.initialData = Preconditions.checkNotNull(initialData);
92 this.persistenceId = Preconditions.checkNotNull(persistenceId);
96 public String persistenceId() {
101 public void preStart() {
102 ActorRefProvider provider = getContext().provider();
103 selfAddress = provider.getDefaultAddress();
105 if (provider instanceof ClusterActorRefProvider) {
106 getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
110 @SuppressWarnings("unchecked")
112 protected void handleCommand(final Object message) throws Exception {
113 if (message instanceof GetAllBuckets) {
114 // GetAllBuckets is used only in testing
115 receiveGetAllBuckets();
120 handleSnapshotMessage(message);
124 if (message instanceof GetBucketsByMembers) {
125 receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
126 } else if (message instanceof GetBucketVersions) {
127 receiveGetBucketVersions();
128 } else if (message instanceof UpdateRemoteBuckets) {
129 receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
130 } else if (message instanceof RemoveRemoteBucket) {
131 removeBucket(((RemoveRemoteBucket) message).getAddress());
132 } else if (message instanceof Terminated) {
133 actorTerminated((Terminated) message);
134 } else if (message instanceof DeleteSnapshotsSuccess) {
135 LOG.debug("{}: got command: {}", persistenceId(), message);
136 } else if (message instanceof DeleteSnapshotsFailure) {
137 LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
138 ((DeleteSnapshotsFailure) message).cause());
140 LOG.debug("Unhandled message [{}]", message);
145 private void handleSnapshotMessage(final Object message) {
146 if (message instanceof SaveSnapshotFailure) {
147 LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
149 self().tell(PoisonPill.getInstance(), ActorRef.noSender());
150 } else if (message instanceof SaveSnapshotSuccess) {
151 LOG.debug("{}: got command: {}", persistenceId(), message);
152 SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
153 deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
154 saved.metadata().timestamp() - 1, 0L, 0L));
158 LOG.debug("{}: stashing command {}", persistenceId(), message);
164 protected void handleRecover(final Object message) throws Exception {
165 if (message instanceof RecoveryCompleted) {
166 if (incarnation != null) {
167 incarnation = incarnation + 1;
172 this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
174 LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
176 saveSnapshot(incarnation);
177 } else if (message instanceof SnapshotOffer) {
178 incarnation = (Integer) ((SnapshotOffer)message).snapshot();
179 LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
181 LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
185 protected RemoteRpcProviderConfig getConfig() {
190 * Returns all the buckets the this node knows about, self owned + remote.
193 protected void receiveGetAllBuckets() {
194 final ActorRef sender = getSender();
195 sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
199 * Helper to collect all known buckets.
201 * @return self owned + remote buckets
203 Map<Address, Bucket<T>> getAllBuckets() {
204 Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
206 //first add the local bucket
207 all.put(selfAddress, getLocalBucket().snapshot());
209 //then get all remote buckets
210 all.putAll(remoteBuckets);
216 * Returns buckets for requested members that this node knows about.
218 * @param members requested members
220 void receiveGetBucketsByMembers(final Set<Address> members) {
221 final ActorRef sender = getSender();
222 Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
223 sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
227 * Helper to collect buckets for requested members.
229 * @param members requested members
230 * @return buckets for requested members
232 Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
233 Map<Address, Bucket<T>> buckets = new HashMap<>();
235 //first add the local bucket if asked
236 if (members.contains(selfAddress)) {
237 buckets.put(selfAddress, getLocalBucket().snapshot());
240 //then get buckets for requested remote nodes
241 for (Address address : members) {
242 if (remoteBuckets.containsKey(address)) {
243 buckets.put(address, remoteBuckets.get(address));
251 * Returns versions for all buckets known.
253 void receiveGetBucketVersions() {
254 final ActorRef sender = getSender();
255 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
256 sender.tell(reply, getSelf());
260 * Update local copy of remote buckets where local copy's version is older.
262 * @param receivedBuckets buckets sent by remote
263 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
265 void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
266 LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
267 if (receivedBuckets == null || receivedBuckets.isEmpty()) {
272 final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
273 for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
274 final Address addr = entry.getKey();
276 if (selfAddress.equals(addr)) {
277 // Remote cannot update our bucket
281 final Bucket<T> receivedBucket = entry.getValue();
282 if (receivedBucket == null) {
283 LOG.debug("Ignoring null bucket from {}", addr);
287 // update only if remote version is newer
288 final long remoteVersion = receivedBucket.getVersion();
289 final Long localVersion = versions.get(addr);
290 if (localVersion != null && remoteVersion <= localVersion.longValue()) {
291 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
295 newBuckets.put(addr, receivedBucket);
296 versions.put(addr, remoteVersion);
297 final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
299 // Deal with DeathWatch subscriptions
300 final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
301 final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
302 if (!curRef.equals(prevRef)) {
303 prevRef.ifPresent(ref -> removeWatch(addr, ref));
304 curRef.ifPresent(ref -> addWatch(addr, ref));
307 LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
310 LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
312 onBucketsUpdated(newBuckets);
315 private void addWatch(final Address addr, final ActorRef ref) {
316 if (!watchedActors.containsKey(ref)) {
317 getContext().watch(ref);
318 LOG.debug("Watching {}", ref);
320 watchedActors.put(ref, addr);
323 private void removeWatch(final Address addr, final ActorRef ref) {
324 watchedActors.remove(ref, addr);
325 if (!watchedActors.containsKey(ref)) {
326 getContext().unwatch(ref);
327 LOG.debug("No longer watching {}", ref);
331 private void removeBucket(final Address addr) {
332 final Bucket<T> bucket = remoteBuckets.remove(addr);
333 if (bucket != null) {
334 bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
335 onBucketRemoved(addr, bucket);
339 private void actorTerminated(final Terminated message) {
340 LOG.info("Actor termination {} received", message);
342 for (Address addr : watchedActors.removeAll(message.getActor())) {
343 versions.remove(addr);
344 final Bucket<T> bucket = remoteBuckets.remove(addr);
345 if (bucket != null) {
346 LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
347 onBucketRemoved(addr, bucket);
353 * Callback to subclasses invoked when a bucket is removed.
355 * @param address Remote address
356 * @param bucket Bucket removed
358 protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
363 * Callback to subclasses invoked when the set of remote buckets is updated.
365 * @param newBuckets Map of address to new bucket. Never null, but can be empty.
367 protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
372 protected boolean isPersisting() {
376 public T getLocalData() {
377 return getLocalBucket().getData();
380 private LocalBucket<T> getLocalBucket() {
381 Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
385 protected void updateLocalBucket(final T data) {
386 final LocalBucket<T> local = getLocalBucket();
387 final boolean bumpIncarnation = local.setData(data);
388 versions.put(selfAddress, local.getVersion());
390 if (bumpIncarnation) {
391 LOG.debug("Version wrapped. incrementing incarnation");
393 Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
394 incarnation = incarnation + 1;
397 saveSnapshot(incarnation);
401 public Map<Address, Bucket<T>> getRemoteBuckets() {
402 return remoteBuckets;
405 public Map<Address, Long> getVersions() {