81e6a9ccc3c253e0e409e37e75f8d7ad1a4af888
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.cluster.ClusterActorRefProvider;
16 import com.google.common.base.Preconditions;
17 import java.util.HashMap;
18 import java.util.Map;
19 import java.util.Set;
20 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
21 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
28 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
29 import org.opendaylight.controller.utils.ConditionalProbe;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * A store that syncs its data across nodes in the cluster.
35  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
36  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
37  *
38  * <p>
39  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
40  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
41  *
42  */
43 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
44
45     private static final Long NO_VERSION = -1L;
46
47     protected final Logger log = LoggerFactory.getLogger(getClass());
48
49     /**
50      * Bucket owned by the node.
51      */
52     private final BucketImpl<T> localBucket = new BucketImpl<>();
53
54     /**
55      * Buckets ownded by other known nodes in the cluster.
56      */
57     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
58
59     /**
60      * Bucket version for every known node in the cluster including this node.
61      */
62     private final Map<Address, Long> versions = new HashMap<>();
63
64     /**
65      * Cluster address for this node.
66      */
67     private Address selfAddress;
68
69     private ConditionalProbe probe;
70
71     private final RemoteRpcProviderConfig config;
72
73     public BucketStore(RemoteRpcProviderConfig config) {
74         this.config = Preconditions.checkNotNull(config);
75     }
76
77     @Override
78     public void preStart() {
79         ActorRefProvider provider = getContext().provider();
80         selfAddress = provider.getDefaultAddress();
81
82         if ( provider instanceof ClusterActorRefProvider) {
83             getContext().actorOf(Props.create(Gossiper.class, config).withMailbox(config.getMailBoxName()), "gossiper");
84         }
85     }
86
87     @SuppressWarnings("unchecked")
88     @Override
89     protected void handleReceive(Object message) throws Exception {
90         if (probe != null) {
91             probe.tell(message, getSelf());
92         }
93
94         if (message instanceof ConditionalProbe) {
95             // The ConditionalProbe is only used for unit tests.
96             log.info("Received probe {} {}", getSelf(), message);
97             probe = (ConditionalProbe) message;
98             // Send back any message to tell the caller we got the probe.
99             getSender().tell("Got it", getSelf());
100         } else if (message instanceof GetAllBuckets) {
101             receiveGetAllBuckets();
102         } else if (message instanceof GetBucketsByMembers) {
103             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
104         } else if (message instanceof GetBucketVersions) {
105             receiveGetBucketVersions();
106         } else if (message instanceof UpdateRemoteBuckets) {
107             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
108         } else {
109             log.debug("Unhandled message [{}]", message);
110             unhandled(message);
111         }
112     }
113
114     protected RemoteRpcProviderConfig getConfig() {
115         return config;
116     }
117
118     /**
119      * Returns all the buckets the this node knows about, self owned + remote.
120      */
121     void receiveGetAllBuckets() {
122         final ActorRef sender = getSender();
123         sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
124     }
125
126     /**
127      * Helper to collect all known buckets.
128      *
129      * @return self owned + remote buckets
130      */
131     Map<Address, Bucket<T>> getAllBuckets() {
132         Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
133
134         //first add the local bucket
135         all.put(selfAddress, new BucketImpl<>(localBucket));
136
137         //then get all remote buckets
138         all.putAll(remoteBuckets);
139
140         return all;
141     }
142
143     /**
144      * Returns buckets for requested members that this node knows about.
145      *
146      * @param members requested members
147      */
148     void receiveGetBucketsByMembers(Set<Address> members) {
149         final ActorRef sender = getSender();
150         Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
151         sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
152     }
153
154     /**
155      * Helper to collect buckets for requested members.
156      *
157      * @param members requested members
158      * @return buckets for requested members
159      */
160     Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
161         Map<Address, Bucket<T>> buckets = new HashMap<>();
162
163         //first add the local bucket if asked
164         if (members.contains(selfAddress)) {
165             buckets.put(selfAddress, new BucketImpl<>(localBucket));
166         }
167
168         //then get buckets for requested remote nodes
169         for (Address address : members) {
170             if (remoteBuckets.containsKey(address)) {
171                 buckets.put(address, remoteBuckets.get(address));
172             }
173         }
174
175         return buckets;
176     }
177
178     /**
179      * Returns versions for all buckets known.
180      */
181     void receiveGetBucketVersions() {
182         final ActorRef sender = getSender();
183         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
184         sender.tell(reply, getSelf());
185     }
186
187     /**
188      * Update local copy of remote buckets where local copy's version is older.
189      *
190      * @param receivedBuckets buckets sent by remote
191      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
192      */
193     void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets) {
194         log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
195         if (receivedBuckets == null || receivedBuckets.isEmpty()) {
196             return; //nothing to do
197         }
198
199         //Remote cant update self's bucket
200         receivedBuckets.remove(selfAddress);
201
202         for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
203
204             Long localVersion = versions.get(entry.getKey());
205             if (localVersion == null) {
206                 localVersion = NO_VERSION;
207             }
208
209             Bucket<T> receivedBucket = entry.getValue();
210
211             if (receivedBucket == null) {
212                 continue;
213             }
214
215             Long remoteVersion = receivedBucket.getVersion();
216             if (remoteVersion == null) {
217                 remoteVersion = NO_VERSION;
218             }
219
220             //update only if remote version is newer
221             if ( remoteVersion.longValue() > localVersion.longValue() ) {
222                 remoteBuckets.put(entry.getKey(), receivedBucket);
223                 versions.put(entry.getKey(), remoteVersion);
224             }
225         }
226
227         log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
228
229         onBucketsUpdated();
230     }
231
232     protected void onBucketsUpdated() {
233     }
234
235     public BucketImpl<T> getLocalBucket() {
236         return localBucket;
237     }
238
239     protected void updateLocalBucket(T data) {
240         localBucket.setData(data);
241         versions.put(selfAddress, localBucket.getVersion());
242     }
243
244     public Map<Address, Bucket<T>> getRemoteBuckets() {
245         return remoteBuckets;
246     }
247
248     public Map<Address, Long> getVersions() {
249         return versions;
250     }
251 }