2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.remote.rpc.registry.gossip;
10 import static com.google.common.base.Preconditions.checkState;
11 import static com.google.common.base.Verify.verify;
12 import static java.util.Objects.requireNonNull;
13 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
14 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
16 import akka.actor.ActorRef;
17 import akka.actor.ActorRefProvider;
18 import akka.actor.Address;
19 import akka.actor.PoisonPill;
20 import akka.actor.Terminated;
21 import akka.cluster.ClusterActorRefProvider;
22 import akka.persistence.DeleteSnapshotsFailure;
23 import akka.persistence.DeleteSnapshotsSuccess;
24 import akka.persistence.RecoveryCompleted;
25 import akka.persistence.SaveSnapshotFailure;
26 import akka.persistence.SaveSnapshotSuccess;
27 import akka.persistence.SnapshotOffer;
28 import akka.persistence.SnapshotSelectionCriteria;
29 import com.google.common.annotations.VisibleForTesting;
30 import com.google.common.collect.HashMultimap;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.SetMultimap;
33 import java.util.Collection;
34 import java.util.HashMap;
36 import java.util.Map.Entry;
37 import java.util.Optional;
38 import java.util.function.Consumer;
39 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
40 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
43 * A store that syncs its data across nodes in the cluster.
44 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
45 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
48 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
49 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
51 public abstract class BucketStoreActor<T extends BucketData<T>> extends
52 AbstractUntypedPersistentActorWithMetering {
53 // Internal marker interface for messages which are just bridges to execute a method
55 private interface ExecuteInActor extends Consumer<BucketStoreActor<?>> {
60 * Buckets owned by other known nodes in the cluster.
62 private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
65 * Bucket version for every known node in the cluster including this node.
67 private final Map<Address, Long> versions = new HashMap<>();
70 * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
71 * once, possibly being tied to multiple addresses (and by extension, buckets).
73 private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
75 private final RemoteOpsProviderConfig config;
76 private final String persistenceId;
79 * Cluster address for this node.
81 private Address selfAddress;
84 * Bucket owned by the node. Initialized during recovery (due to incarnation number).
86 private LocalBucket<T> localBucket;
87 private T initialData;
88 private Integer incarnation;
89 private boolean persisting;
91 protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) {
92 this.config = requireNonNull(config);
93 this.initialData = requireNonNull(initialData);
94 this.persistenceId = requireNonNull(persistenceId);
97 static ExecuteInActor getBucketsByMembersMessage(final Collection<Address> members) {
98 return actor -> actor.getBucketsByMembers(members);
101 static ExecuteInActor removeBucketMessage(final Address addr) {
102 return actor -> actor.removeBucket(addr);
105 static ExecuteInActor updateRemoteBucketsMessage(final Map<Address, Bucket<?>> buckets) {
106 return actor -> actor.updateRemoteBuckets(buckets);
109 static ExecuteInActor getLocalDataMessage() {
110 return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf());
113 static ExecuteInActor getRemoteBucketsMessage() {
114 return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf());
117 public final T getLocalData() {
118 return getLocalBucket().getData();
121 public final Map<Address, Bucket<T>> getRemoteBuckets() {
122 return remoteBuckets;
125 public final Map<Address, Long> getVersions() {
130 public final String persistenceId() {
131 return persistenceId;
135 public void preStart() {
136 ActorRefProvider provider = getContext().provider();
137 selfAddress = provider.getDefaultAddress();
139 if (provider instanceof ClusterActorRefProvider) {
140 getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
145 protected void handleCommand(final Object message) throws Exception {
146 if (GET_ALL_BUCKETS == message) {
147 // GetAllBuckets is used only in testing
148 getSender().tell(getAllBuckets(), self());
153 handleSnapshotMessage(message);
157 if (message instanceof ExecuteInActor) {
158 ((ExecuteInActor) message).accept(this);
159 } else if (GET_BUCKET_VERSIONS == message) {
160 // FIXME: do we need to send ourselves?
161 getSender().tell(ImmutableMap.copyOf(versions), getSelf());
162 } else if (message instanceof Terminated) {
163 actorTerminated((Terminated) message);
164 } else if (message instanceof DeleteSnapshotsSuccess) {
165 LOG.debug("{}: got command: {}", persistenceId(), message);
166 } else if (message instanceof DeleteSnapshotsFailure) {
167 LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
168 ((DeleteSnapshotsFailure) message).cause());
170 LOG.debug("Unhandled message [{}]", message);
175 private void handleSnapshotMessage(final Object message) {
176 if (message instanceof SaveSnapshotFailure) {
177 LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
179 self().tell(PoisonPill.getInstance(), ActorRef.noSender());
180 } else if (message instanceof SaveSnapshotSuccess) {
181 LOG.debug("{}: got command: {}", persistenceId(), message);
182 SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
183 deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(),
184 saved.metadata().timestamp() - 1, 0L, 0L));
188 LOG.debug("{}: stashing command {}", persistenceId(), message);
194 protected final void handleRecover(final Object message) {
195 if (message instanceof RecoveryCompleted) {
196 if (incarnation != null) {
197 incarnation = incarnation + 1;
202 this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
204 LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
206 saveSnapshot(incarnation);
207 } else if (message instanceof SnapshotOffer) {
208 incarnation = (Integer) ((SnapshotOffer)message).snapshot();
209 LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
211 LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
215 protected final RemoteOpsProviderConfig getConfig() {
219 protected final void updateLocalBucket(final T data) {
220 final LocalBucket<T> local = getLocalBucket();
221 final boolean bumpIncarnation = local.setData(data);
222 versions.put(selfAddress, local.getVersion());
224 if (bumpIncarnation) {
225 LOG.debug("Version wrapped. incrementing incarnation");
227 verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
228 incarnation = incarnation + 1;
231 saveSnapshot(incarnation);
236 * Callback to subclasses invoked when a bucket is removed.
238 * @param address Remote address
239 * @param bucket Bucket removed
241 protected abstract void onBucketRemoved(Address address, Bucket<T> bucket);
244 * Callback to subclasses invoked when the set of remote buckets is updated.
246 * @param newBuckets Map of address to new bucket. Never null, but can be empty.
248 protected abstract void onBucketsUpdated(Map<Address, Bucket<T>> newBuckets);
251 * Helper to collect all known buckets.
253 * @return self owned + remote buckets
255 private Map<Address, Bucket<T>> getAllBuckets() {
256 Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
258 //first add the local bucket
259 all.put(selfAddress, getLocalBucket().snapshot());
261 //then get all remote buckets
262 all.putAll(remoteBuckets);
268 * Helper to collect buckets for requested members.
270 * @param members requested members
272 private void getBucketsByMembers(final Collection<Address> members) {
273 Map<Address, Bucket<T>> buckets = new HashMap<>();
275 //first add the local bucket if asked
276 if (members.contains(selfAddress)) {
277 buckets.put(selfAddress, getLocalBucket().snapshot());
280 //then get buckets for requested remote nodes
281 for (Address address : members) {
282 if (remoteBuckets.containsKey(address)) {
283 buckets.put(address, remoteBuckets.get(address));
287 getSender().tell(buckets, getSelf());
290 private void removeBucket(final Address addr) {
291 final Bucket<T> bucket = remoteBuckets.remove(addr);
292 if (bucket != null) {
293 bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
294 onBucketRemoved(addr, bucket);
296 versions.remove(addr);
300 * Update local copy of remote buckets where local copy's version is older.
302 * @param receivedBuckets buckets sent by remote
303 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
306 void updateRemoteBuckets(final Map<Address, Bucket<?>> receivedBuckets) {
307 LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
308 if (receivedBuckets == null || receivedBuckets.isEmpty()) {
313 final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
314 for (Entry<Address, Bucket<?>> entry : receivedBuckets.entrySet()) {
315 final Address addr = entry.getKey();
317 if (selfAddress.equals(addr)) {
318 // Remote cannot update our bucket
322 @SuppressWarnings("unchecked")
323 final Bucket<T> receivedBucket = (Bucket<T>) entry.getValue();
324 if (receivedBucket == null) {
325 LOG.debug("Ignoring null bucket from {}", addr);
329 // update only if remote version is newer
330 final long remoteVersion = receivedBucket.getVersion();
331 final Long localVersion = versions.get(addr);
332 if (localVersion != null && remoteVersion <= localVersion.longValue()) {
333 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
337 newBuckets.put(addr, receivedBucket);
338 versions.put(addr, remoteVersion);
339 final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
341 // Deal with DeathWatch subscriptions
342 final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
343 final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
344 if (!curRef.equals(prevRef)) {
345 prevRef.ifPresent(ref -> removeWatch(addr, ref));
346 curRef.ifPresent(ref -> addWatch(addr, ref));
349 LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
352 LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
354 onBucketsUpdated(newBuckets);
357 private void addWatch(final Address addr, final ActorRef ref) {
358 if (!watchedActors.containsKey(ref)) {
359 getContext().watch(ref);
360 LOG.debug("Watching {}", ref);
362 watchedActors.put(ref, addr);
365 private void removeWatch(final Address addr, final ActorRef ref) {
366 watchedActors.remove(ref, addr);
367 if (!watchedActors.containsKey(ref)) {
368 getContext().unwatch(ref);
369 LOG.debug("No longer watching {}", ref);
373 private void actorTerminated(final Terminated message) {
374 LOG.info("Actor termination {} received", message);
376 for (Address addr : watchedActors.removeAll(message.getActor())) {
377 versions.remove(addr);
378 final Bucket<T> bucket = remoteBuckets.remove(addr);
379 if (bucket != null) {
380 LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr);
381 onBucketRemoved(addr, bucket);
387 protected boolean isPersisting() {
391 private LocalBucket<T> getLocalBucket() {
392 checkState(localBucket != null, "Attempted to access local bucket before recovery completed");