2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.dispatch.Mapper;
17 import akka.util.Timeout;
18 import com.google.common.base.Optional;
19 import com.google.common.base.Preconditions;
20 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
21 import org.opendaylight.controller.cluster.datastore.Configuration;
22 import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
23 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
24 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
25 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
26 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
27 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
28 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
29 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
30 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
32 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import scala.concurrent.Await;
37 import scala.concurrent.Future;
38 import scala.concurrent.duration.Duration;
39 import scala.concurrent.duration.FiniteDuration;
40 import java.util.concurrent.TimeUnit;
41 import static akka.pattern.Patterns.ask;
44 * The ActorContext class contains utility methods which could be used by
45 * non-actors (like DistributedDataStore) to work with actors a little more
46 * easily. An ActorContext can be freely passed around to local object instances
47 * but should not be passed to actors especially remote actors
49 public class ActorContext {
50 private static final Logger
51 LOG = LoggerFactory.getLogger(ActorContext.class);
53 private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
55 public static final String MAILBOX = "bounded-mailbox";
57 private final ActorSystem actorSystem;
58 private final ActorRef shardManager;
59 private final ClusterWrapper clusterWrapper;
60 private final Configuration configuration;
61 private volatile SchemaContext schemaContext;
62 private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
63 private Timeout operationTimeout = new Timeout(operationDuration);
65 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
66 ClusterWrapper clusterWrapper,
67 Configuration configuration) {
68 this.actorSystem = actorSystem;
69 this.shardManager = shardManager;
70 this.clusterWrapper = clusterWrapper;
71 this.configuration = configuration;
74 public ActorSystem getActorSystem() {
78 public ActorRef getShardManager() {
82 public ActorSelection actorSelection(String actorPath) {
83 return actorSystem.actorSelection(actorPath);
86 public ActorSelection actorSelection(ActorPath actorPath) {
87 return actorSystem.actorSelection(actorPath);
90 public void setSchemaContext(SchemaContext schemaContext) {
91 this.schemaContext = schemaContext;
93 if(shardManager != null) {
94 shardManager.tell(new UpdateSchemaContext(schemaContext), null);
98 public void setOperationTimeout(int timeoutInSeconds) {
99 operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
100 operationTimeout = new Timeout(operationDuration);
103 public SchemaContext getSchemaContext() {
104 return schemaContext;
108 * Finds the primary shard for the given shard name
113 public Optional<ActorSelection> findPrimaryShard(String shardName) {
114 String path = findPrimaryPathOrNull(shardName);
116 return Optional.absent();
118 return Optional.of(actorSystem.actorSelection(path));
122 * Finds a local shard given its shard name and return it's ActorRef
124 * @param shardName the name of the local shard that needs to be found
125 * @return a reference to a local shard actor which represents the shard
126 * specified by the shardName
128 public Optional<ActorRef> findLocalShard(String shardName) {
129 Object result = executeOperation(shardManager, new FindLocalShard(shardName, false));
131 if (result instanceof LocalShardFound) {
132 LocalShardFound found = (LocalShardFound) result;
133 LOG.debug("Local shard found {}", found.getPath());
134 return Optional.of(found.getPath());
137 return Optional.absent();
141 * Finds a local shard async given its shard name and return a Future from which to obtain the
144 * @param shardName the name of the local shard that needs to be found
146 public Future<ActorRef> findLocalShardAsync( final String shardName, Timeout timeout) {
147 Future<Object> future = executeOperationAsync(shardManager,
148 new FindLocalShard(shardName, true), timeout);
150 return future.map(new Mapper<Object, ActorRef>() {
152 public ActorRef checkedApply(Object response) throws Throwable {
153 if(response instanceof LocalShardFound) {
154 LocalShardFound found = (LocalShardFound)response;
155 LOG.debug("Local shard found {}", found.getPath());
156 return found.getPath();
157 } else if(response instanceof ActorNotInitialized) {
158 throw new NotInitializedException(
159 String.format("Found local shard for %s but it's not initialized yet.",
161 } else if(response instanceof LocalShardNotFound) {
162 throw new LocalShardNotFoundException(
163 String.format("Local shard for %s does not exist.", shardName));
166 throw new UnknownMessageException(String.format(
167 "FindLocalShard returned unkown response: %s", response));
169 }, getActorSystem().dispatcher());
172 private String findPrimaryPathOrNull(String shardName) {
173 Object result = executeOperation(shardManager, new FindPrimary(shardName, false).toSerializable());
175 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
176 PrimaryFound found = PrimaryFound.fromSerializable(result);
178 LOG.debug("Primary found {}", found.getPrimaryPath());
179 return found.getPrimaryPath();
181 } else if (result.getClass().equals(ActorNotInitialized.class)){
182 throw new NotInitializedException(
183 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
193 * Executes an operation on a local actor and wait for it's response
197 * @return The response of the operation
199 public Object executeOperation(ActorRef actor, Object message) {
200 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
203 return Await.result(future, operationDuration);
204 } catch (Exception e) {
205 throw new TimeoutException("Sending message " + message.getClass().toString() +
206 " to actor " + actor.toString() + " failed. Try again later.", e);
210 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
211 Preconditions.checkArgument(actor != null, "actor must not be null");
212 Preconditions.checkArgument(message != null, "message must not be null");
214 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
215 return ask(actor, message, timeout);
219 * Execute an operation on a remote actor and wait for it's response
225 public Object executeOperation(ActorSelection actor, Object message) {
226 Future<Object> future = executeOperationAsync(actor, message);
229 return Await.result(future, operationDuration);
230 } catch (Exception e) {
231 throw new TimeoutException("Sending message " + message.getClass().toString() +
232 " to actor " + actor.toString() + " failed. Try again later.", e);
237 * Execute an operation on a remote actor asynchronously.
239 * @param actor the ActorSelection
240 * @param message the message to send
241 * @return a Future containing the eventual result
243 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
244 Preconditions.checkArgument(actor != null, "actor must not be null");
245 Preconditions.checkArgument(message != null, "message must not be null");
247 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
249 return ask(actor, message, operationTimeout);
253 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
254 * reply (essentially set and forget).
256 * @param actor the ActorSelection
257 * @param message the message to send
259 public void sendOperationAsync(ActorSelection actor, Object message) {
260 Preconditions.checkArgument(actor != null, "actor must not be null");
261 Preconditions.checkArgument(message != null, "message must not be null");
263 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
265 actor.tell(message, ActorRef.noSender());
268 public void shutdown() {
269 shardManager.tell(PoisonPill.getInstance(), null);
270 actorSystem.shutdown();
273 public ClusterWrapper getClusterWrapper() {
274 return clusterWrapper;
277 public String getCurrentMemberName(){
278 return clusterWrapper.getCurrentMemberName();
282 * Send the message to each and every shard
286 public void broadcast(Object message){
287 for(String shardName : configuration.getAllShardNames()){
289 Optional<ActorSelection> primary = findPrimaryShard(shardName);
290 if (primary.isPresent()) {
291 primary.get().tell(message, ActorRef.noSender());
293 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
294 message.getClass().getSimpleName(), shardName);
299 public FiniteDuration getOperationDuration() {
300 return operationDuration;
303 public boolean isLocalPath(String path) {
304 String selfAddress = clusterWrapper.getSelfAddress();
305 if (path == null || selfAddress == null) {
309 int atIndex1 = path.indexOf("@");
310 int atIndex2 = selfAddress.indexOf("@");
312 if (atIndex1 == -1 || atIndex2 == -1) {
316 int slashIndex1 = path.indexOf("/", atIndex1);
317 int slashIndex2 = selfAddress.indexOf("/", atIndex2);
319 if (slashIndex1 == -1 || slashIndex2 == -1) {
323 String hostPort1 = path.substring(atIndex1, slashIndex1);
324 String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
326 return hostPort1.equals(hostPort2);