2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.remote.rpc.registry.gossip;
11 import akka.actor.ActorRef;
12 import akka.actor.ActorRefProvider;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.cluster.ClusterActorRefProvider;
16 import java.util.HashMap;
19 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
20 import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
21 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBuckets;
22 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetAllBucketsReply;
23 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersions;
24 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketVersionsReply;
25 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembers;
26 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.GetBucketsByMembersReply;
27 import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.BucketStoreMessages.UpdateRemoteBuckets;
28 import org.opendaylight.controller.utils.ConditionalProbe;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * A store that syncs its data across nodes in the cluster.
34 * It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
35 * A node can write ONLY to its bucket. This way, write conflicts are avoided.
37 * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
38 * This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
41 public class BucketStore<T extends Copier<T>> extends AbstractUntypedActorWithMetering {
43 private static final Long NO_VERSION = -1L;
45 protected final Logger log = LoggerFactory.getLogger(getClass());
48 * Bucket owned by the node
50 private final BucketImpl<T> localBucket = new BucketImpl<>();
53 * Buckets ownded by other known nodes in the cluster
55 private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
58 * Bucket version for every known node in the cluster including this node
60 private final Map<Address, Long> versions = new HashMap<>();
63 * Cluster address for this node
65 private Address selfAddress;
67 private ConditionalProbe probe;
69 private final RemoteRpcProviderConfig config;
72 config = new RemoteRpcProviderConfig(getContext().system().settings().config());
76 public void preStart(){
77 ActorRefProvider provider = getContext().provider();
78 selfAddress = provider.getDefaultAddress();
80 if ( provider instanceof ClusterActorRefProvider) {
81 getContext().actorOf(Props.create(Gossiper.class).withMailbox(config.getMailBoxName()), "gossiper");
86 protected void handleReceive(Object message) throws Exception {
88 probe.tell(message, getSelf());
91 if (message instanceof ConditionalProbe) {
92 // The ConditionalProbe is only used for unit tests.
93 log.info("Received probe {} {}", getSelf(), message);
94 probe = (ConditionalProbe) message;
95 // Send back any message to tell the caller we got the probe.
96 getSender().tell("Got it", getSelf());
97 } else if (message instanceof GetAllBuckets) {
98 receiveGetAllBuckets();
99 } else if (message instanceof GetBucketsByMembers) {
100 receiveGetBucketsByMembers(((GetBucketsByMembers) message).getMembers());
101 } else if (message instanceof GetBucketVersions) {
102 receiveGetBucketVersions();
103 } else if (message instanceof UpdateRemoteBuckets) {
104 receiveUpdateRemoteBuckets(((UpdateRemoteBuckets) message).getBuckets());
106 if(log.isDebugEnabled()) {
107 log.debug("Unhandled message [{}]", message);
114 * Returns all the buckets the this node knows about, self owned + remote
116 void receiveGetAllBuckets(){
117 final ActorRef sender = getSender();
118 sender.tell(new GetAllBucketsReply(getAllBuckets()), getSelf());
122 * Helper to collect all known buckets
124 * @return self owned + remote buckets
126 @SuppressWarnings("rawtypes")
127 Map<Address, Bucket> getAllBuckets(){
128 Map<Address, Bucket> all = new HashMap<>(remoteBuckets.size() + 1);
130 //first add the local bucket
131 all.put(selfAddress, new BucketImpl<>(localBucket));
133 //then get all remote buckets
134 all.putAll(remoteBuckets);
140 * Returns buckets for requested members that this node knows about
142 * @param members requested members
144 @SuppressWarnings("rawtypes")
145 void receiveGetBucketsByMembers(Set<Address> members){
146 final ActorRef sender = getSender();
147 Map<Address, Bucket> buckets = getBucketsByMembers(members);
148 sender.tell(new GetBucketsByMembersReply(buckets), getSelf());
152 * Helper to collect buckets for requested memebers
154 * @param members requested members
155 * @return buckets for requested memebers
157 @SuppressWarnings("rawtypes")
158 Map<Address, Bucket> getBucketsByMembers(Set<Address> members) {
159 Map<Address, Bucket> buckets = new HashMap<>();
161 //first add the local bucket if asked
162 if (members.contains(selfAddress)) {
163 buckets.put(selfAddress, new BucketImpl<>(localBucket));
166 //then get buckets for requested remote nodes
167 for (Address address : members){
168 if (remoteBuckets.containsKey(address)) {
169 buckets.put(address, remoteBuckets.get(address));
177 * Returns versions for all buckets known
179 void receiveGetBucketVersions(){
180 final ActorRef sender = getSender();
181 GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
182 sender.tell(reply, getSelf());
186 * Update local copy of remote buckets where local copy's version is older
188 * @param receivedBuckets buckets sent by remote
189 * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
191 @SuppressWarnings({ "rawtypes", "unchecked" })
192 void receiveUpdateRemoteBuckets(Map<Address, Bucket> receivedBuckets){
193 log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
194 if (receivedBuckets == null || receivedBuckets.isEmpty())
196 return; //nothing to do
199 //Remote cant update self's bucket
200 receivedBuckets.remove(selfAddress);
202 for (Map.Entry<Address, Bucket> entry : receivedBuckets.entrySet()){
204 Long localVersion = versions.get(entry.getKey());
205 if (localVersion == null) {
206 localVersion = NO_VERSION;
209 Bucket<T> receivedBucket = entry.getValue();
211 if (receivedBucket == null) {
215 Long remoteVersion = receivedBucket.getVersion();
216 if (remoteVersion == null) {
217 remoteVersion = NO_VERSION;
220 //update only if remote version is newer
221 if ( remoteVersion.longValue() > localVersion.longValue() ) {
222 remoteBuckets.put(entry.getKey(), receivedBucket);
223 versions.put(entry.getKey(), remoteVersion);
227 if(log.isDebugEnabled()) {
228 log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
232 public BucketImpl<T> getLocalBucket() {
236 protected void updateLocalBucket(T data) {
237 localBucket.setData(data);
238 versions.put(selfAddress, localBucket.getVersion());
241 public Map<Address, Bucket<T>> getRemoteBuckets() {
242 return remoteBuckets;
245 public Map<Address, Long> getVersions() {