Merge "BUG 3057 - notify added event source by topics created before"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / gossip / BucketStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
10
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.cluster.ClusterActorRefProvider;
16 import java.util.HashMap;
17 import java.util.Map;
18 import java.util.Set;
19 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
20 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
28 import org.opendaylight.controller.utils.ConditionalProbe;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33  * A store that syncs its data across nodes in the cluster.
34  * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
35  * A node can write ONLY to its bucket. This way, write conflicts are avoided.
36  * <p>
37  * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
38  * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
39  *
40  */
41 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
42
43     private static final Long NO_VERSION = -1L;
44
45     protected final Logger log = LoggerFactory.getLogger(getClass());
46
47     /**
48      * Bucket owned by the node
49      */
50     private final BucketImpl<T> localBucket = new BucketImpl<>();
51
52     /**
53      * Buckets ownded by other known nodes in the cluster
54      */
55     private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
56
57     /**
58      * Bucket version for every known node in the cluster including this node
59      */
60     private final Map<Address, Long> versions = new HashMap<>();
61
62     /**
63      * Cluster address for this node
64      */
65     private Address selfAddress;
66
67     private ConditionalProbe probe;
68
69     private final RemoteRpcProviderConfig config;
70
71     public BucketStore(){
72         config = new RemoteRpcProviderConfig(getContext().system().settings().config());
73     }
74
75     @Override
76     public void preStart(){
77         ActorRefProvider provider = getContext().provider();
78         selfAddress = provider.getDefaultAddress();
79
80         if ( provider instanceof ClusterActorRefProvider) {
81             getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
82         }
83     }
84
85     @Override
86     protected void handleReceive(Object message) throws Exception {
87         if (probe != null) {
88             probe.tell(message, getSelf());
89         }
90
91         if (message instanceof ConditionalProbe) {
92             // The ConditionalProbe is only used for unit tests.
93             log.info("Received probe {} {}", getSelf(), message);
94             probe = (ConditionalProbe) message;
95             // Send back any message to tell the caller we got the probe.
96             getSender().tell("Got it", getSelf());
97         } else if (message instanceof GetAllBuckets) {
98             receiveGetAllBuckets();
99         } else if (message instanceof GetBucketsByMembers) {
100             receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
101         } else if (message instanceof GetBucketVersions) {
102             receiveGetBucketVersions();
103         } else if (message instanceof UpdateRemoteBuckets) {
104             receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
105         } else {
106             if(log.isDebugEnabled()) {
107                 log.debug("Unhandled message [{}]", message);
108             }
109             unhandled(message);
110         }
111     }
112
113     /**
114      * Returns all the buckets the this node knows about, self owned + remote
115      */
116     void receiveGetAllBuckets(){
117         final ActorRef sender = getSender();
118         sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
119     }
120
121     /**
122      * Helper to collect all known buckets
123      *
124      * @return self owned + remote buckets
125      */
126     @SuppressWarnings("rawtypes")
127     Map<Address, Bucket> getAllBuckets(){
128         Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
129
130         //first add the local bucket
131         all.put(selfAddress, new BucketImpl<>(localBucket));
132
133         //then get all remote buckets
134         all.putAll(remoteBuckets);
135
136         return all;
137     }
138
139     /**
140      * Returns buckets for requested members that this node knows about
141      *
142      * @param members requested members
143      */
144     @SuppressWarnings("rawtypes")
145     void receiveGetBucketsByMembers(Set<Address> members){
146         final ActorRef sender = getSender();
147         Map<Address, Bucket> buckets = getBucketsByMembers(members);
148         sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
149     }
150
151     /**
152      * Helper to collect buckets for requested memebers
153      *
154      * @param members requested members
155      * @return buckets for requested memebers
156      */
157     @SuppressWarnings("rawtypes")
158     Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
159         Map<Address, Bucket> buckets = new HashMap<>();
160
161         //first add the local bucket if asked
162         if (members.contains(selfAddress)) {
163             buckets.put(selfAddress, new BucketImpl<>(localBucket));
164         }
165
166         //then get buckets for requested remote nodes
167         for (Address address : members){
168             if (remoteBuckets.containsKey(address)) {
169                 buckets.put(address, remoteBuckets.get(address));
170             }
171         }
172
173         return buckets;
174     }
175
176     /**
177      * Returns versions for all buckets known
178      */
179     void receiveGetBucketVersions(){
180         final ActorRef sender = getSender();
181         GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
182         sender.tell(reply, getSelf());
183     }
184
185     /**
186      * Update local copy of remote buckets where local copy's version is older
187      *
188      * @param receivedBuckets buckets sent by remote
189      *                        {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
190      */
191     @SuppressWarnings({ "rawtypes", "unchecked" })
192     void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
193         log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
194         if (receivedBuckets == null || receivedBuckets.isEmpty())
195          {
196             return; //nothing to do
197         }
198
199         //Remote cant update self's bucket
200         receivedBuckets.remove(selfAddress);
201
202         for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
203
204             Long localVersion = versions.get(entry.getKey());
205             if (localVersion == null) {
206                 localVersion = NO_VERSION;
207             }
208
209             Bucket<T> receivedBucket = entry.getValue();
210
211             if (receivedBucket == null) {
212                 continue;
213             }
214
215             Long remoteVersion = receivedBucket.getVersion();
216             if (remoteVersion == null) {
217                 remoteVersion = NO_VERSION;
218             }
219
220             //update only if remote version is newer
221             if ( remoteVersion.longValue() > localVersion.longValue() ) {
222                 remoteBuckets.put(entry.getKey(), receivedBucket);
223                 versions.put(entry.getKey(), remoteVersion);
224             }
225         }
226
227         if(log.isDebugEnabled()) {
228             log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
229         }
230     }
231
232     public BucketImpl<T> getLocalBucket() {
233         return localBucket;
234     }
235
236     protected void updateLocalBucket(T data) {
237         localBucket.setData(data);
238         versions.put(selfAddress, localBucket.getVersion());
239     }
240
241     public Map<Address, Bucket<T>> getRemoteBuckets() {
242         return remoteBuckets;
243     }
244
245     public Map<Address, Long> getVersions() {
246         return versions;
247     }
248 }