BUG-3128: rework sal-remoterpc-connector
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.cluster.ClusterActorRefProvider;
15 import com.google.common.base.Preconditions;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Set;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
30 import org.opendaylight.controller.utils.ConditionalProbe;
31
32 /**
33  * A store that syncs its data across nodes in the cluster.
34  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
35  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
36  *
37  * <p>
38  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
39  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
40  *
41  */
42 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
43     /**
44      * Bucket owned by the node.
45      */
46     private final BucketImpl<T> localBucket;
47
48     /**
49      * Buckets owned by other known nodes in the cluster.
50      */
51     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
52
53     /**
54      * Bucket version for every known node in the cluster including this node.
55      */
56     private final Map<Address, Long> versions = new HashMap<>();
57
58     /**
59      * Cluster address for this node.
60      */
61     private Address selfAddress;
62
63     // FIXME: should be part of test-specific subclass
64     private ConditionalProbe probe;
65
66     private final RemoteRpcProviderConfig config;
67
68     public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
69         this.config = Preconditions.checkNotNull(config);
70         this.localBucket = new BucketImpl<>(initialData);
71     }
72
73     @Override
74     public void preStart() {
75         ActorRefProvider provider = getContext().provider();
76         selfAddress = provider.getDefaultAddress();
77
78         if (provider instanceof ClusterActorRefProvider) {
79             getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
80         }
81     }
82
83     @SuppressWarnings("unchecked")
84     @Override
85     protected void handleReceive(final Object message) throws Exception {
86         if (probe != null) {
87             probe.tell(message, getSelf());
88         }
89
90         if (message instanceof GetBucketsByMembers) {
91             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
92         } else if (message instanceof GetBucketVersions) {
93             receiveGetBucketVersions();
94         } else if (message instanceof UpdateRemoteBuckets) {
95             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
96         } else if (message instanceof RemoveRemoteBucket) {
97             removeBucket(((RemoveRemoteBucket) message).getAddress());
98         } else if (message instanceof GetAllBuckets) {
99             // GetAllBuckets is used only for unit tests.
100             receiveGetAllBuckets();
101         } else if (message instanceof ConditionalProbe) {
102             // The ConditionalProbe is only used for unit tests.
103             LOG.info("Received probe {} {}", getSelf(), message);
104             probe = (ConditionalProbe) message;
105             // Send back any message to tell the caller we got the probe.
106             getSender().tell("Got it", getSelf());
107         } else {
108             LOG.debug("Unhandled message [{}]", message);
109             unhandled(message);
110         }
111     }
112
113     protected RemoteRpcProviderConfig getConfig() {
114         return config;
115     }
116
117     /**
118      * Returns all the buckets the this node knows about, self owned + remote.
119      */
120     void receiveGetAllBuckets() {
121         final ActorRef sender = getSender();
122         sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
123     }
124
125     /**
126      * Helper to collect all known buckets.
127      *
128      * @return self owned + remote buckets
129      */
130     Map<Address, Bucket<T>> getAllBuckets() {
131         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
132
133         //first add the local bucket
134         all.put(selfAddress, new BucketImpl<>(localBucket));
135
136         //then get all remote buckets
137         all.putAll(remoteBuckets);
138
139         return all;
140     }
141
142     /**
143      * Returns buckets for requested members that this node knows about.
144      *
145      * @param members requested members
146      */
147     void receiveGetBucketsByMembers(final Set<Address> members) {
148         final ActorRef sender = getSender();
149         Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
150         sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
151     }
152
153     /**
154      * Helper to collect buckets for requested members.
155      *
156      * @param members requested members
157      * @return buckets for requested members
158      */
159     Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
160         Map<Address, Bucket<T>> buckets = new HashMap<>();
161
162         //first add the local bucket if asked
163         if (members.contains(selfAddress)) {
164             buckets.put(selfAddress, new BucketImpl<>(localBucket));
165         }
166
167         //then get buckets for requested remote nodes
168         for (Address address : members) {
169             if (remoteBuckets.containsKey(address)) {
170                 buckets.put(address, remoteBuckets.get(address));
171             }
172         }
173
174         return buckets;
175     }
176
177     /**
178      * Returns versions for all buckets known.
179      */
180     void receiveGetBucketVersions() {
181         final ActorRef sender = getSender();
182         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
183         sender.tell(reply, getSelf());
184     }
185
186     /**
187      * Update local copy of remote buckets where local copy's version is older.
188      *
189      * @param receivedBuckets buckets sent by remote
190      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
191      */
192     void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
193         LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
194         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
195             //nothing to do
196             return;
197         }
198
199         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
200         for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
201             if (selfAddress.equals(entry.getKey())) {
202                 // Remote cannot update our bucket
203                 continue;
204             }
205
206             final Bucket<T> receivedBucket = entry.getValue();
207             if (receivedBucket == null) {
208                 LOG.debug("Ignoring null bucket from {}", entry.getKey());
209                 continue;
210             }
211
212             // update only if remote version is newer
213             final long remoteVersion = receivedBucket.getVersion();
214             final Long localVersion = versions.get(entry.getKey());
215             if (localVersion != null && remoteVersion <= localVersion.longValue()) {
216                 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
217                     remoteVersion);
218                 continue;
219             }
220
221             newBuckets.put(entry.getKey(), receivedBucket);
222             remoteBuckets.put(entry.getKey(), receivedBucket);
223             versions.put(entry.getKey(), remoteVersion);
224             LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
225         }
226
227         LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
228
229         onBucketsUpdated(newBuckets);
230     }
231
232     private void removeBucket(final Address address) {
233         final Bucket<T> bucket = remoteBuckets.remove(address);
234         if (bucket != null) {
235             onBucketRemoved(address, bucket);
236         }
237     }
238
239     /**
240      * Callback to subclasses invoked when a bucket is removed.
241      *
242      * @param address Remote address
243      * @param bucket Bucket removed
244      */
245     protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
246         // Default noop
247     }
248
249     /**
250      * Callback to subclasses invoked when the set of remote buckets is updated.
251      *
252      * @param newBuckets Map of address to new bucket. Never null, but can be empty.
253      */
254     protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
255         // Default noop
256     }
257
258     public BucketImpl<T> getLocalBucket() {
259         return localBucket;
260     }
261
262     protected void updateLocalBucket(final T data) {
263         localBucket.setData(data);
264         versions.put(selfAddress, localBucket.getVersion());
265     }
266
267     public Map<Address, Bucket<T>> getRemoteBuckets() {
268         return remoteBuckets;
269     }
270
271     public Map<Address, Long> getVersions() {
272         return versions;
273     }
274 }