70f4053723d4c426be572de0c9c69fbb3e6950c8
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorRefProvider;
15 import akka.actor.Address;
16 import akka.actor.PoisonPill;
17 import akka.actor.Terminated;
18 import akka.cluster.ClusterActorRefProvider;
19 import akka.persistence.DeleteSnapshotsFailure;
20 import akka.persistence.DeleteSnapshotsSuccess;
21 import akka.persistence.RecoveryCompleted;
22 import akka.persistence.SaveSnapshotFailure;
23 import akka.persistence.SaveSnapshotSuccess;
24 import akka.persistence.SnapshotOffer;
25 import akka.persistence.SnapshotSelectionCriteria;
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.base.Preconditions;
28 import com.google.common.base.Verify;
29 import com.google.common.collect.HashMultimap;
30 import com.google.common.collect.SetMultimap;
31 import java.util.HashMap;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.Optional;
35 import java.util.Set;
36 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
37 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
38 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
39 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
40 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
41 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
42 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
43 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
44 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
45
46 /**
47  * A store that syncs its data across nodes in the cluster.
48  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
49  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
50  *
51  * <p>
52  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
53  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
54  *
55  */
56 public class BucketStore<T extends BucketData<T>> extends AbstractUntypedPersistentActorWithMetering {
57     /**
58      * Buckets owned by other known nodes in the cluster.
59      */
60     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
61
62     /**
63      * Bucket version for every known node in the cluster including this node.
64      */
65     private final Map<Address, Long> versions = new HashMap<>();
66
67     /**
68      * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
69      * once, possibly being tied to multiple addresses (and by extension, buckets).
70      */
71     private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
72
73     private final RemoteRpcProviderConfig config;
74     private final String persistenceId;
75
76     /**
77      * Cluster address for this node.
78      */
79     private Address selfAddress;
80
81     /**
82      * Bucket owned by the node. Initialized during recovery (due to incarnation number).
83      */
84     private LocalBucket<T> localBucket;
85     private T initialData;
86     private Integer incarnation;
87     private boolean persisting;
88
89     public BucketStore(final RemoteRpcProviderConfig config, final String persistenceId, final T initialData) {
90         this.config = Preconditions.checkNotNull(config);
91         this.initialData = Preconditions.checkNotNull(initialData);
92         this.persistenceId = Preconditions.checkNotNull(persistenceId);
93     }
94
95     @Override
96     public String persistenceId() {
97         return persistenceId;
98     }
99
100     @Override
101     public void preStart() {
102         ActorRefProvider provider = getContext().provider();
103         selfAddress = provider.getDefaultAddress();
104
105         if (provider instanceof ClusterActorRefProvider) {
106             getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
107         }
108     }
109
110     @SuppressWarnings("unchecked")
111     @Override
112     protected void handleCommand(final Object message) throws Exception {
113         if (message instanceof GetAllBuckets) {
114             // GetAllBuckets is used only in testing
115             receiveGetAllBuckets();
116             return;
117         }
118
119         if (persisting) {
120             handleSnapshotMessage(message);
121             return;
122         }
123
124         if (message instanceof GetBucketsByMembers) {
125             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
126         } else if (message instanceof GetBucketVersions) {
127             receiveGetBucketVersions();
128         } else if (message instanceof UpdateRemoteBuckets) {
129             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
130         } else if (message instanceof RemoveRemoteBucket) {
131             removeBucket(((RemoveRemoteBucket) message).getAddress());
132         } else if (message instanceof Terminated) {
133             actorTerminated((Terminated) message);
134         } else if (message instanceof DeleteSnapshotsSuccess) {
135             LOG.debug("{}: got command: {}", persistenceId(), message);
136         } else if (message instanceof DeleteSnapshotsFailure) {
137             LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
138                 ((DeleteSnapshotsFailure) message).cause());
139         } else {
140             LOG.debug("Unhandled message [{}]", message);
141             unhandled(message);
142         }
143     }
144
145     private void handleSnapshotMessage(final Object message) {
146         if (message instanceof SaveSnapshotFailure) {
147             LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
148             persisting = false;
149             self().tell(PoisonPill.getInstance(), ActorRef.noSender());
150         } else if (message instanceof SaveSnapshotSuccess) {
151             LOG.debug("{}: got command: {}", persistenceId(), message);
152             SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
153             deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
154                     saved.metadata().timestamp() - 1, 0L, 0L));
155             persisting = false;
156             unstash();
157         } else {
158             LOG.debug("{}: stashing command {}", persistenceId(), message);
159             stash();
160         }
161     }
162
163     @Override
164     protected void handleRecover(final Object message) throws Exception {
165         if (message instanceof RecoveryCompleted) {
166             if (incarnation != null) {
167                 incarnation = incarnation + 1;
168             } else {
169                 incarnation = 0;
170             }
171
172             this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
173             initialData = null;
174             LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
175             persisting = true;
176             saveSnapshot(incarnation);
177         } else if (message instanceof SnapshotOffer) {
178             incarnation = (Integer) ((SnapshotOffer)message).snapshot();
179             LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
180         } else {
181             LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
182         }
183     }
184
185     protected RemoteRpcProviderConfig getConfig() {
186         return config;
187     }
188
189     /**
190      * Returns all the buckets the this node knows about, self owned + remote.
191      */
192     @VisibleForTesting
193     protected void receiveGetAllBuckets() {
194         final ActorRef sender = getSender();
195         sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
196     }
197
198     /**
199      * Helper to collect all known buckets.
200      *
201      * @return self owned + remote buckets
202      */
203     Map<Address, Bucket<T>> getAllBuckets() {
204         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
205
206         //first add the local bucket
207         all.put(selfAddress, getLocalBucket().snapshot());
208
209         //then get all remote buckets
210         all.putAll(remoteBuckets);
211
212         return all;
213     }
214
215     /**
216      * Returns buckets for requested members that this node knows about.
217      *
218      * @param members requested members
219      */
220     void receiveGetBucketsByMembers(final Set<Address> members) {
221         final ActorRef sender = getSender();
222         Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
223         sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
224     }
225
226     /**
227      * Helper to collect buckets for requested members.
228      *
229      * @param members requested members
230      * @return buckets for requested members
231      */
232     Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
233         Map<Address, Bucket<T>> buckets = new HashMap<>();
234
235         //first add the local bucket if asked
236         if (members.contains(selfAddress)) {
237             buckets.put(selfAddress, getLocalBucket().snapshot());
238         }
239
240         //then get buckets for requested remote nodes
241         for (Address address : members) {
242             if (remoteBuckets.containsKey(address)) {
243                 buckets.put(address, remoteBuckets.get(address));
244             }
245         }
246
247         return buckets;
248     }
249
250     /**
251      * Returns versions for all buckets known.
252      */
253     void receiveGetBucketVersions() {
254         final ActorRef sender = getSender();
255         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
256         sender.tell(reply, getSelf());
257     }
258
259     /**
260      * Update local copy of remote buckets where local copy's version is older.
261      *
262      * @param receivedBuckets buckets sent by remote
263      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
264      */
265     void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
266         LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
267         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
268             //nothing to do
269             return;
270         }
271
272         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
273         for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
274             final Address addr = entry.getKey();
275
276             if (selfAddress.equals(addr)) {
277                 // Remote cannot update our bucket
278                 continue;
279             }
280
281             final Bucket<T> receivedBucket = entry.getValue();
282             if (receivedBucket == null) {
283                 LOG.debug("Ignoring null bucket from {}", addr);
284                 continue;
285             }
286
287             // update only if remote version is newer
288             final long remoteVersion = receivedBucket.getVersion();
289             final Long localVersion = versions.get(addr);
290             if (localVersion != null && remoteVersion <= localVersion.longValue()) {
291                 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
292                     remoteVersion);
293                 continue;
294             }
295             newBuckets.put(addr, receivedBucket);
296             versions.put(addr, remoteVersion);
297             final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
298
299             // Deal with DeathWatch subscriptions
300             final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
301             final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
302             if (!curRef.equals(prevRef)) {
303                 prevRef.ifPresent(ref -> removeWatch(addr, ref));
304                 curRef.ifPresent(ref -> addWatch(addr, ref));
305             }
306
307             LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
308         }
309
310         LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
311
312         onBucketsUpdated(newBuckets);
313     }
314
315     private void addWatch(final Address addr, final ActorRef ref) {
316         if (!watchedActors.containsKey(ref)) {
317             getContext().watch(ref);
318             LOG.debug("Watching {}", ref);
319         }
320         watchedActors.put(ref, addr);
321     }
322
323     private void removeWatch(final Address addr, final ActorRef ref) {
324         watchedActors.remove(ref, addr);
325         if (!watchedActors.containsKey(ref)) {
326             getContext().unwatch(ref);
327             LOG.debug("No longer watching {}", ref);
328         }
329     }
330
331     private void removeBucket(final Address addr) {
332         final Bucket<T> bucket = remoteBuckets.remove(addr);
333         if (bucket != null) {
334             bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
335             onBucketRemoved(addr, bucket);
336         }
337     }
338
339     private void actorTerminated(final Terminated message) {
340         LOG.info("Actor termination {} received", message);
341
342         for (Address addr : watchedActors.removeAll(message.getActor())) {
343             versions.remove(addr);
344             final Bucket<T> bucket = remoteBuckets.remove(addr);
345             if (bucket != null) {
346                 LOG.debug("Source actor dead, removing bucket {} from ", bucket, addr);
347                 onBucketRemoved(addr, bucket);
348             }
349         }
350     }
351
352     /**
353      * Callback to subclasses invoked when a bucket is removed.
354      *
355      * @param address Remote address
356      * @param bucket Bucket removed
357      */
358     protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
359         // Default noop
360     }
361
362     /**
363      * Callback to subclasses invoked when the set of remote buckets is updated.
364      *
365      * @param newBuckets Map of address to new bucket. Never null, but can be empty.
366      */
367     protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
368         // Default noop
369     }
370
371     @VisibleForTesting
372     protected boolean isPersisting() {
373         return persisting;
374     }
375
376     public T getLocalData() {
377         return getLocalBucket().getData();
378     }
379
380     private LocalBucket<T> getLocalBucket() {
381         Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
382         return localBucket;
383     }
384
385     protected void updateLocalBucket(final T data) {
386         final LocalBucket<T> local = getLocalBucket();
387         final boolean bumpIncarnation = local.setData(data);
388         versions.put(selfAddress, local.getVersion());
389
390         if (bumpIncarnation) {
391             LOG.debug("Version wrapped. incrementing incarnation");
392
393             Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
394             incarnation = incarnation + 1;
395
396             persisting = true;
397             saveSnapshot(incarnation);
398         }
399     }
400
401     public Map<Address, Bucket<T>> getRemoteBuckets() {
402         return remoteBuckets;
403     }
404
405     public Map<Address, Long> getVersions() {
406         return versions;
407     }
408 }