126b9b669df473d46cb8f9c8cfa2b4eb6786ffd3
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.cluster.ClusterActorRefProvider;
15 import com.google.common.base.Preconditions;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Map.Entry;
19 import java.util.Set;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.RemoveRemoteBucket;
29 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
30 import org.opendaylight.controller.utils.ConditionalProbe;
31
32 /**
33  * A store that syncs its data across nodes in the cluster.
34  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
35  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
36  * <p>
37  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
38  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
39  *
40  */
41 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
42     /**
43      * Bucket owned by the node
44      */
45     private final BucketImpl<T> localBucket;
46
47     /**
48      * Buckets owned by other known nodes in the cluster.
49      */
50     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
51
52     /**
53      * Bucket version for every known node in the cluster including this node
54      */
55     private final Map<Address, Long> versions = new HashMap<>();
56
57     /**
58      * Cluster address for this node
59      */
60     private Address selfAddress;
61
62     // FIXME: should be part of test-specific subclass
63     private ConditionalProbe probe;
64
65     private final RemoteRpcProviderConfig config;
66
67     public BucketStore(final RemoteRpcProviderConfig config, final T initialData) {
68         this.config = Preconditions.checkNotNull(config);
69         this.localBucket = new BucketImpl<>(initialData);
70     }
71
72     @Override
73     public void preStart(){
74         ActorRefProvider provider = getContext().provider();
75         selfAddress = provider.getDefaultAddress();
76
77         if (provider instanceof ClusterActorRefProvider) {
78             getContext().actorOf(Gossiper.props(config).withMailbox(config.getMailBoxName()), "gossiper");
79         }
80     }
81
82     @Override
83     protected void handleReceive(final Object message) throws Exception {
84         if (probe != null) {
85             probe.tell(message, getSelf());
86         }
87
88         if (message instanceof GetBucketsByMembers) {
89             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
90         } else if (message instanceof GetBucketVersions) {
91             receiveGetBucketVersions();
92         } else if (message instanceof UpdateRemoteBuckets) {
93             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
94         } else if (message instanceof RemoveRemoteBucket) {
95             removeBucket(((RemoveRemoteBucket) message).getAddress());
96         } else if (message instanceof GetAllBuckets) {
97             // GetAllBuckets is used only for unit tests.
98             receiveGetAllBuckets();
99         } else if (message instanceof ConditionalProbe) {
100             // The ConditionalProbe is only used for unit tests.
101             LOG.info("Received probe {} {}", getSelf(), message);
102             probe = (ConditionalProbe) message;
103             // Send back any message to tell the caller we got the probe.
104             getSender().tell("Got it", getSelf());
105         } else {
106             LOG.debug("Unhandled message [{}]", message);
107             unhandled(message);
108         }
109     }
110
111     protected RemoteRpcProviderConfig getConfig() {
112         return config;
113     }
114
115     /**
116      * Returns all the buckets the this node knows about, self owned + remote
117      */
118     void receiveGetAllBuckets(){
119         final ActorRef sender = getSender();
120         sender.tell(new GetAllBucketsReply<T>(getAllBuckets()), getSelf());
121     }
122
123     /**
124      * Helper to collect all known buckets
125      *
126      * @return self owned + remote buckets
127      */
128     Map<Address, Bucket<T>> getAllBuckets(){
129         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
130
131         //first add the local bucket
132         all.put(selfAddress, new BucketImpl<>(localBucket));
133
134         //then get all remote buckets
135         all.putAll(remoteBuckets);
136
137         return all;
138     }
139
140     /**
141      * Returns buckets for requested members that this node knows about
142      *
143      * @param members requested members
144      */
145     void receiveGetBucketsByMembers(final Set<Address> members) {
146         final ActorRef sender = getSender();
147         Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
148         sender.tell(new GetBucketsByMembersReply<T>(buckets), getSelf());
149     }
150
151     /**
152      * Helper to collect buckets for requested memebers
153      *
154      * @param members requested members
155      * @return buckets for requested memebers
156      */
157     Map<Address, Bucket<T>> getBucketsByMembers(final Set<Address> members) {
158         Map<Address, Bucket<T>> buckets = new HashMap<>();
159
160         //first add the local bucket if asked
161         if (members.contains(selfAddress)) {
162             buckets.put(selfAddress, new BucketImpl<>(localBucket));
163         }
164
165         //then get buckets for requested remote nodes
166         for (Address address : members){
167             if (remoteBuckets.containsKey(address)) {
168                 buckets.put(address, remoteBuckets.get(address));
169             }
170         }
171
172         return buckets;
173     }
174
175     /**
176      * Returns versions for all buckets known
177      */
178     void receiveGetBucketVersions(){
179         final ActorRef sender = getSender();
180         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
181         sender.tell(reply, getSelf());
182     }
183
184     /**
185      * Update local copy of remote buckets where local copy's version is older
186      *
187      * @param receivedBuckets buckets sent by remote
188      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
189      */
190     void receiveUpdateRemoteBuckets(final Map<Address, Bucket<T>> receivedBuckets) {
191         LOG.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
192         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
193             //nothing to do
194             return;
195         }
196
197         final Map<Address, Bucket<T>> newBuckets = new HashMap<>(receivedBuckets.size());
198         for (Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
199             if (selfAddress.equals(entry.getKey())) {
200                 // Remote cannot update our bucket
201                 continue;
202             }
203
204             final Bucket<T> receivedBucket = entry.getValue();
205             if (receivedBucket == null) {
206                 LOG.debug("Ignoring null bucket from {}", entry.getKey());
207                 continue;
208             }
209
210             // update only if remote version is newer
211             final long remoteVersion = receivedBucket.getVersion();
212             final Long localVersion = versions.get(entry.getKey());
213             if (localVersion != null && remoteVersion <= localVersion.longValue()) {
214                 LOG.debug("Ignoring down-versioned bucket from {} ({} local {} remote)", entry.getKey(), localVersion,
215                     remoteVersion);
216                 continue;
217             }
218
219             newBuckets.put(entry.getKey(), receivedBucket);
220             remoteBuckets.put(entry.getKey(), receivedBucket);
221             versions.put(entry.getKey(), remoteVersion);
222             LOG.debug("Updating bucket from {} to version {}", entry.getKey(), remoteVersion);
223         }
224
225         LOG.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
226
227         onBucketsUpdated(newBuckets);
228     }
229
230     private void removeBucket(final Address address) {
231         final Bucket<T> bucket = remoteBuckets.remove(address);
232         if (bucket != null) {
233             onBucketRemoved(address, bucket);
234         }
235     }
236
237     /**
238      * Callback to subclasses invoked when a bucket is removed.
239      *
240      * @param address Remote address
241      * @param bucket Bucket removed
242      */
243     protected void onBucketRemoved(final Address address, final Bucket<T> bucket) {
244         // Default noop
245     }
246
247     /**
248      * Callback to subclasses invoked when the set of remote buckets is updated.
249      *
250      * @param newBuckets Map of address to new bucket. Never null, but can be empty.
251      */
252     protected void onBucketsUpdated(final Map<Address, Bucket<T>> newBuckets) {
253         // Default noop
254     }
255
256     public BucketImpl<T> getLocalBucket() {
257         return localBucket;
258     }
259
260     protected void updateLocalBucket(final T data) {
261         localBucket.setData(data);
262         versions.put(selfAddress, localBucket.getVersion());
263     }
264
265     public Map<Address, Bucket<T>> getRemoteBuckets() {
266         return remoteBuckets;
267     }
268
269     public Map<Address, Long> getVersions() {
270         return versions;
271     }
272 }