Merge "Snapshot changes"
[controller.git] / opendaylight / md-sal / sal-remoterpc-connector / src / main / java / org / opendaylight / controller / remote / rpc / registry / RpcRegistry.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.remote.rpc.registry;
9
10 import akka.actor.ActorSelection;
11 import akka.actor.Address;
12 import akka.actor.Props;
13 import akka.cluster.ClusterEvent;
14 import akka.cluster.Member;
15 import akka.japi.Creator;
16 import org.opendaylight.controller.remote.rpc.AbstractUntypedActor;
17 import org.opendaylight.controller.remote.rpc.ActorConstants;
18 import org.opendaylight.controller.remote.rpc.messages.AddRoutedRpc;
19 import org.opendaylight.controller.remote.rpc.messages.AddRpc;
20 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpc;
21 import org.opendaylight.controller.remote.rpc.messages.GetRoutedRpcReply;
22 import org.opendaylight.controller.remote.rpc.messages.GetRpc;
23 import org.opendaylight.controller.remote.rpc.messages.GetRpcReply;
24 import org.opendaylight.controller.remote.rpc.messages.RemoveRoutedRpc;
25 import org.opendaylight.controller.remote.rpc.messages.RemoveRpc;
26 import org.opendaylight.controller.remote.rpc.messages.RoutingTableData;
27 import org.opendaylight.controller.sal.connector.api.RpcRouter;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30 import scala.collection.JavaConversions;
31
32 import java.util.LinkedHashSet;
33 import java.util.Map;
34 import java.util.Random;
35 import java.util.Set;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.ScheduledExecutorService;
38 import java.util.concurrent.ScheduledFuture;
39 import java.util.concurrent.TimeUnit;
40
41 /**
42  * This Actor maintains the routing table state and sync it with other nodes in the cluster.
43  *
44  * A scheduler runs after an interval of time, which pick a random member from the cluster
45  * and send the current state of routing table to the member.
46  *
47  * when a message of routing table data is received, it gets merged with the local routing table
48  * to keep the latest data.
49  */
50
51 public class RpcRegistry extends AbstractUntypedActor {
52
53   private static final Logger LOG = LoggerFactory.getLogger(RpcRegistry.class);
54   private RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String> routingTable;
55   private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
56   private final ClusterWrapper clusterWrapper;
57   private final ScheduledFuture<?> syncScheduler;
58
59   private RpcRegistry(ClusterWrapper clusterWrapper){
60     this.routingTable = new RoutingTable<>();
61     this.clusterWrapper = clusterWrapper;
62     this.syncScheduler = scheduler.scheduleAtFixedRate(new SendRoutingTable(), 10, 10, TimeUnit.SECONDS);
63   }
64
65   public static Props props(final ClusterWrapper clusterWrapper){
66     return Props.create(new Creator<RpcRegistry>(){
67
68       @Override
69       public RpcRegistry create() throws Exception {
70         return new RpcRegistry(clusterWrapper);
71       }
72     });
73   }
74
75   @Override
76   protected void handleReceive(Object message) throws Exception {
77     LOG.debug("Received message {}", message);
78     if(message instanceof RoutingTableData) {
79       syncRoutingTable((RoutingTableData) message);
80     } else if(message instanceof GetRoutedRpc) {
81       getRoutedRpc((GetRoutedRpc) message);
82     } else if(message instanceof GetRpc) {
83       getRpc((GetRpc) message);
84     } else if(message instanceof AddRpc) {
85       addRpc((AddRpc) message);
86     } else if(message instanceof RemoveRpc) {
87       removeRpc((RemoveRpc) message);
88     } else if(message instanceof AddRoutedRpc) {
89       addRoutedRpc((AddRoutedRpc) message);
90     } else if(message instanceof RemoveRoutedRpc) {
91       removeRoutedRpc((RemoveRoutedRpc) message);
92     }
93   }
94
95   private void getRoutedRpc(GetRoutedRpc rpcMsg){
96     LOG.debug("Get latest routed Rpc location from routing table {}", rpcMsg);
97     String remoteActorPath = routingTable.getLastAddedRoutedRpc(rpcMsg.getRouteId());
98     GetRoutedRpcReply routedRpcReply = new GetRoutedRpcReply(remoteActorPath);
99
100     getSender().tell(routedRpcReply, self());
101   }
102
103   private void getRpc(GetRpc rpcMsg) {
104     LOG.debug("Get global Rpc location from routing table {}", rpcMsg);
105     String remoteActorPath = routingTable.getGlobalRoute(rpcMsg.getRouteId());
106     GetRpcReply rpcReply = new GetRpcReply(remoteActorPath);
107
108     getSender().tell(rpcReply, self());
109   }
110
111   private void addRpc(AddRpc rpcMsg) {
112     LOG.debug("Add Rpc to routing table {}", rpcMsg);
113     routingTable.addGlobalRoute(rpcMsg.getRouteId(), rpcMsg.getActorPath());
114
115     getSender().tell("Success", self());
116   }
117
118   private void removeRpc(RemoveRpc rpcMsg) {
119     LOG.debug("Removing Rpc to routing table {}", rpcMsg);
120     routingTable.removeGlobalRoute(rpcMsg.getRouteId());
121
122     getSender().tell("Success", self());
123   }
124
125   private void addRoutedRpc(AddRoutedRpc rpcMsg) {
126     routingTable.addRoutedRpcs(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
127     getSender().tell("Success", self());
128   }
129
130   private void removeRoutedRpc(RemoveRoutedRpc rpcMsg) {
131     routingTable.removeRoutes(rpcMsg.getAnnouncements(), rpcMsg.getActorPath());
132     getSender().tell("Success", self());
133   }
134
135   private void syncRoutingTable(RoutingTableData routingTableData) {
136     LOG.debug("Syncing routing table {}", routingTableData);
137
138     Map<RpcRouter.RouteIdentifier<?, ?, ?>, String> newRpcMap = routingTableData.getRpcMap();
139     Set<RpcRouter.RouteIdentifier<?, ?, ?>> routeIds = newRpcMap.keySet();
140     for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
141       routingTable.addGlobalRoute(routeId, newRpcMap.get(routeId));
142     }
143
144     Map<RpcRouter.RouteIdentifier<?, ?, ?>, LinkedHashSet<String>> newRoutedRpcMap =
145         routingTableData.getRoutedRpcMap();
146     routeIds = newRoutedRpcMap.keySet();
147
148     for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : routeIds) {
149       Set<String> routeAddresses = newRoutedRpcMap.get(routeId);
150       for(String routeAddress : routeAddresses) {
151         routingTable.addRoutedRpc(routeId, routeAddress);
152       }
153     }
154   }
155
156   private ActorSelection getRandomRegistryActor() {
157     ClusterEvent.CurrentClusterState clusterState = clusterWrapper.getState();
158     ActorSelection actor = null;
159     Set<Member> members = JavaConversions.asJavaSet(clusterState.members());
160     int memberSize = members.size();
161     // Don't select yourself
162     if(memberSize > 1) {
163       Address currentNodeAddress = clusterWrapper.getAddress();
164       int index = new Random().nextInt(memberSize);
165       int i = 0;
166       // keeping previous member, in case when random index member is same as current actor
167       // and current actor member is last in set
168       Member previousMember = null;
169       for(Member member : members){
170         if(i == index-1) {
171           previousMember = member;
172         }
173         if(i == index) {
174           if(!currentNodeAddress.equals(member.address())) {
175             actor = this.context().actorSelection(member.address() + ActorConstants.RPC_REGISTRY_PATH);
176             break;
177           } else if(index < memberSize-1){ // pick the next element in the set
178             index++;
179           }
180         }
181         i++;
182       }
183       if(actor == null && previousMember != null) {
184         actor = this.context().actorSelection(previousMember.address() + ActorConstants.RPC_REGISTRY_PATH);
185       }
186     }
187     return actor;
188   }
189
190   private class SendRoutingTable implements Runnable {
191
192     @Override
193     public void run() {
194       RoutingTableData routingTableData =
195           new RoutingTableData(routingTable.getGlobalRpcMap(), routingTable.getRoutedRpcMap());
196       LOG.debug("Sending routing table for sync {}", routingTableData);
197       ActorSelection actor = getRandomRegistryActor();
198       if(actor != null) {
199         actor.tell(routingTableData, self());
200       }
201     }
202   }
203 }