Further Guava Optional cleanups
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStoreActor.java
1 /*
2  * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_ALL_BUCKETS;
12 import static org.opendaylight.controller.remote.rpc.registry.gossip.BucketStoreAccess.Singletons.GET_BUCKET_VERSIONS;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorRefProvider;
16 import akka.actor.Address;
17 import akka.actor.PoisonPill;
18 import akka.actor.Terminated;
19 import akka.cluster.ClusterActorRefProvider;
20 import akka.persistence.DeleteSnapshotsFailure;
21 import akka.persistence.DeleteSnapshotsSuccess;
22 import akka.persistence.RecoveryCompleted;
23 import akka.persistence.SaveSnapshotFailure;
24 import akka.persistence.SaveSnapshotSuccess;
25 import akka.persistence.SnapshotOffer;
26 import akka.persistence.SnapshotSelectionCriteria;
27 import com.google.common.annotations.VisibleForTesting;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Verify;
30 import com.google.common.collect.HashMultimap;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.collect.SetMultimap;
33 import java.util.Collection;
34 import java.util.HashMap;
35 import java.util.Map;
36 import java.util.Map.Entry;
37 import java.util.Optional;
38 import java.util.function.Consumer;
39 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
40 import org.opendaylight.controller.remote.rpc.RemoteOpsProviderConfig;
41
42 /**
43  * A store that syncs its data across nodes in the cluster.
44  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
45  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
46  *
47  * <p>
48  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
49  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
50  */
51 public abstract class BucketStoreActor<T extends BucketData<T>> extends
52         AbstractUntypedPersistentActorWithMetering {
53     // Internal marker interface for messages which are just bridges to execute a method
54     @FunctionalInterface
55     private interface ExecuteInActor extends Consumer<BucketStoreActor<?>> {
56
57     }
58
59     /**
60      * Buckets owned by other known nodes in the cluster.
61      */
62     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
63
64     /**
65      * Bucket version for every known node in the cluster including this node.
66      */
67     private final Map<Address, Long> versions = new HashMap<>();
68
69     /**
70      * {@link ActorRef}s being watched for liveness due to being referenced in bucket data. Each actor is monitored
71      * once, possibly being tied to multiple addresses (and by extension, buckets).
72      */
73     private final SetMultimap<ActorRef, Address> watchedActors = HashMultimap.create(1, 1);
74
75     private final RemoteOpsProviderConfig config;
76     private final String persistenceId;
77
78     /**
79      * Cluster address for this node.
80      */
81     private Address selfAddress;
82
83     /**
84      * Bucket owned by the node. Initialized during recovery (due to incarnation number).
85      */
86     private LocalBucket<T> localBucket;
87     private T initialData;
88     private Integer incarnation;
89     private boolean persisting;
90
91     protected BucketStoreActor(final RemoteOpsProviderConfig config, final String persistenceId, final T initialData) {
92         this.config = Preconditions.checkNotNull(config);
93         this.initialData = Preconditions.checkNotNull(initialData);
94         this.persistenceId = Preconditions.checkNotNull(persistenceId);
95     }
96
97     static ExecuteInActor getBucketsByMembersMessage(final Collection<Address> members) {
98         return actor -> actor.getBucketsByMembers(members);
99     }
100
101     static ExecuteInActor removeBucketMessage(final Address addr) {
102         return actor -> actor.removeBucket(addr);
103     }
104
105     static ExecuteInActor updateRemoteBucketsMessage(final Map<Address, Bucket<?>> buckets) {
106         return actor -> actor.updateRemoteBuckets(buckets);
107     }
108
109     static ExecuteInActor getLocalDataMessage() {
110         return actor -> actor.getSender().tell(actor.getLocalData(), actor.getSelf());
111     }
112
113     static ExecuteInActor getRemoteBucketsMessage() {
114         return actor -> actor.getSender().tell(ImmutableMap.copyOf(actor.getRemoteBuckets()), actor.getSelf());
115     }
116
117     public final T getLocalData() {
118         return getLocalBucket().getData();
119     }
120
121     public final Map<Address, Bucket<T>> getRemoteBuckets() {
122         return remoteBuckets;
123     }
124
125     public final Map<Address, Long> getVersions() {
126         return versions;
127     }
128
129     @Override
130     public final String persistenceId() {
131         return persistenceId;
132     }
133
134     @Override
135     public void preStart() {
136         ActorRefProvider provider = getContext().provider();
137         selfAddress = provider.getDefaultAddress();
138
139         if (provider instanceof ClusterActorRefProvider) {
140             getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
141         }
142     }
143
144     @Override
145     protected void handleCommand(final Object message) throws Exception {
146         if (GET_ALL_BUCKETS == message) {
147             // GetAllBuckets is used only in testing
148             getSender().tell(getAllBuckets(), self());
149             return;
150         }
151
152         if (persisting) {
153             handleSnapshotMessage(message);
154             return;
155         }
156
157         if (message instanceof ExecuteInActor) {
158             ((ExecuteInActor) message).accept(this);
159         } else if (GET_BUCKET_VERSIONS == message) {
160             // FIXME: do we need to send ourselves?
161             getSender().tell(ImmutableMap.copyOf(versions), getSelf());
162         } else if (message instanceof Terminated) {
163             actorTerminated((Terminated) message);
164         } else if (message instanceof DeleteSnapshotsSuccess) {
165             LOG.debug("{}: got command: {}", persistenceId(), message);
166         } else if (message instanceof DeleteSnapshotsFailure) {
167             LOG.warn("{}: failed to delete prior snapshots", persistenceId(),
168                 ((DeleteSnapshotsFailure) message).cause());
169         } else {
170             LOG.debug("Unhandled message [{}]", message);
171             unhandled(message);
172         }
173     }
174
175     private void handleSnapshotMessage(final Object message) {
176         if (message instanceof SaveSnapshotFailure) {
177             LOG.error("{}: failed to persist state", persistenceId(), ((SaveSnapshotFailure) message).cause());
178             persisting = false;
179             self().tell(PoisonPill.getInstance(), ActorRef.noSender());
180         } else if (message instanceof SaveSnapshotSuccess) {
181             LOG.debug("{}: got command: {}", persistenceId(), message);
182             SaveSnapshotSuccess saved = (SaveSnapshotSuccess)message;
183             deleteSnapshots(new SnapshotSelectionCriteria(saved.metadata().sequenceNr(),
184                     saved.metadata().timestamp() - 1, 0L, 0L));
185             persisting = false;
186             unstash();
187         } else {
188             LOG.debug("{}: stashing command {}", persistenceId(), message);
189             stash();
190         }
191     }
192
193     @Override
194     protected final void handleRecover(final Object message) {
195         if (message instanceof RecoveryCompleted) {
196             if (incarnation != null) {
197                 incarnation = incarnation + 1;
198             } else {
199                 incarnation = 0;
200             }
201
202             this.localBucket = new LocalBucket<>(incarnation.intValue(), initialData);
203             initialData = null;
204             LOG.debug("{}: persisting new incarnation {}", persistenceId(), incarnation);
205             persisting = true;
206             saveSnapshot(incarnation);
207         } else if (message instanceof SnapshotOffer) {
208             incarnation = (Integer) ((SnapshotOffer)message).snapshot();
209             LOG.debug("{}: recovered incarnation {}", persistenceId(), incarnation);
210         } else {
211             LOG.warn("{}: ignoring recovery message {}", persistenceId(), message);
212         }
213     }
214
215     protected final RemoteOpsProviderConfig getConfig() {
216         return config;
217     }
218
219     protected final void updateLocalBucket(final T data) {
220         final LocalBucket<T> local = getLocalBucket();
221         final boolean bumpIncarnation = local.setData(data);
222         versions.put(selfAddress, local.getVersion());
223
224         if (bumpIncarnation) {
225             LOG.debug("Version wrapped. incrementing incarnation");
226
227             Verify.verify(incarnation < Integer.MAX_VALUE, "Ran out of incarnations, cannot continue");
228             incarnation = incarnation + 1;
229
230             persisting = true;
231             saveSnapshot(incarnation);
232         }
233     }
234
235     /**
236      * Callback to subclasses invoked when a bucket is removed.
237      *
238      * @param address Remote address
239      * @param bucket Bucket removed
240      */
241     protected abstract void onBucketRemoved(Address address, Bucket<T> bucket);
242
243     /**
244      * Callback to subclasses invoked when the set of remote buckets is updated.
245      *
246      * @param newBuckets Map of address to new bucket. Never null, but can be empty.
247      */
248     protected abstract void onBucketsUpdated(Map<Address, Bucket<T>> newBuckets);
249
250     /**
251      * Helper to collect all known buckets.
252      *
253      * @return self owned + remote buckets
254      */
255     private Map<Address, Bucket<T>> getAllBuckets() {
256         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
257
258         //first add the local bucket
259         all.put(selfAddress, getLocalBucket().snapshot());
260
261         //then get all remote buckets
262         all.putAll(remoteBuckets);
263
264         return all;
265     }
266
267     /**
268      * Helper to collect buckets for requested members.
269      *
270      * @param members requested members
271      */
272     private void getBucketsByMembers(final Collection<Address> members) {
273         Map<Address, Bucket<T>> buckets = new HashMap<>();
274
275         //first add the local bucket if asked
276         if (members.contains(selfAddress)) {
277             buckets.put(selfAddress, getLocalBucket().snapshot());
278         }
279
280         //then get buckets for requested remote nodes
281         for (Address address : members) {
282             if (remoteBuckets.containsKey(address)) {
283                 buckets.put(address, remoteBuckets.get(address));
284             }
285         }
286
287         getSender().tell(buckets, getSelf());
288     }
289
290     private void removeBucket(final Address addr) {
291         final Bucket<T> bucket = remoteBuckets.remove(addr);
292         if (bucket != null) {
293             bucket.getWatchActor().ifPresent(ref -> removeWatch(addr, ref));
294             onBucketRemoved(addr, bucket);
295         }
296         versions.remove(addr);
297     }
298
299     /**
300      * Update local copy of remote buckets where local copy's version is older.
301      *
302      * @param receivedBuckets buckets sent by remote
303      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
304      */
305     @VisibleForTesting
306     void updateRemoteBuckets(final Map<Address, Bucket<?>> receivedBuckets) {
307         LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
308         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
309             //nothing to do
310             return;
311         }
312
313         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
314         for (Entry<Address, Bucket<?>> entry : receivedBuckets.entrySet()) {
315             final Address addr = entry.getKey();
316
317             if (selfAddress.equals(addr)) {
318                 // Remote cannot update our bucket
319                 continue;
320             }
321
322             @SuppressWarnings("unchecked")
323             final Bucket<T> receivedBucket = (Bucket<T>) entry.getValue();
324             if (receivedBucket == null) {
325                 LOG.debug("Ignoring null bucket from {}", addr);
326                 continue;
327             }
328
329             // update only if remote version is newer
330             final long remoteVersion = receivedBucket.getVersion();
331             final Long localVersion = versions.get(addr);
332             if (localVersion != null && remoteVersion <= localVersion.longValue()) {
333                 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", addr, localVersion,
334                     remoteVersion);
335                 continue;
336             }
337             newBuckets.put(addr, receivedBucket);
338             versions.put(addr, remoteVersion);
339             final Bucket<T> prevBucket = remoteBuckets.put(addr, receivedBucket);
340
341             // Deal with DeathWatch subscriptions
342             final Optional<ActorRef> prevRef = prevBucket != null ? prevBucket.getWatchActor() : Optional.empty();
343             final Optional<ActorRef> curRef = receivedBucket.getWatchActor();
344             if (!curRef.equals(prevRef)) {
345                 prevRef.ifPresent(ref -> removeWatch(addr, ref));
346                 curRef.ifPresent(ref -> addWatch(addr, ref));
347             }
348
349             LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
350         }
351
352         LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
353
354         onBucketsUpdated(newBuckets);
355     }
356
357     private void addWatch(final Address addr, final ActorRef ref) {
358         if (!watchedActors.containsKey(ref)) {
359             getContext().watch(ref);
360             LOG.debug("Watching {}", ref);
361         }
362         watchedActors.put(ref, addr);
363     }
364
365     private void removeWatch(final Address addr, final ActorRef ref) {
366         watchedActors.remove(ref, addr);
367         if (!watchedActors.containsKey(ref)) {
368             getContext().unwatch(ref);
369             LOG.debug("No longer watching {}", ref);
370         }
371     }
372
373     private void actorTerminated(final Terminated message) {
374         LOG.info("Actor termination {} received", message);
375
376         for (Address addr : watchedActors.removeAll(message.getActor())) {
377             versions.remove(addr);
378             final Bucket<T> bucket = remoteBuckets.remove(addr);
379             if (bucket != null) {
380                 LOG.debug("Source actor dead, removing bucket {} from {}", bucket, addr);
381                 onBucketRemoved(addr, bucket);
382             }
383         }
384     }
385
386     @VisibleForTesting
387     protected boolean isPersisting() {
388         return persisting;
389     }
390
391     private LocalBucket<T> getLocalBucket() {
392         Preconditions.checkState(localBucket != null, "Attempted to access local bucket before recovery completed");
393         return localBucket;
394     }
395 }