2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import com.google.common.base.Optional;
18 import com.google.common.base.Preconditions;
19 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
20 import org.opendaylight.controller.cluster.datastore.Configuration;
21 import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
22 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
23 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
24 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
25 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
26 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
27 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
28 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
29 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32 import scala.concurrent.Await;
33 import scala.concurrent.Future;
34 import scala.concurrent.duration.Duration;
35 import scala.concurrent.duration.FiniteDuration;
36 import java.util.concurrent.TimeUnit;
37 import static akka.pattern.Patterns.ask;
40 * The ActorContext class contains utility methods which could be used by
41 * non-actors (like DistributedDataStore) to work with actors a little more
42 * easily. An ActorContext can be freely passed around to local object instances
43 * but should not be passed to actors especially remote actors
45 public class ActorContext {
46 private static final Logger
47 LOG = LoggerFactory.getLogger(ActorContext.class);
49 private static final FiniteDuration DEFAULT_OPER_DURATION = Duration.create(5, TimeUnit.SECONDS);
51 public static final String MAILBOX = "bounded-mailbox";
53 private final ActorSystem actorSystem;
54 private final ActorRef shardManager;
55 private final ClusterWrapper clusterWrapper;
56 private final Configuration configuration;
57 private volatile SchemaContext schemaContext;
58 private FiniteDuration operationDuration = DEFAULT_OPER_DURATION;
59 private Timeout operationTimeout = new Timeout(operationDuration);
61 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
62 ClusterWrapper clusterWrapper,
63 Configuration configuration) {
64 this.actorSystem = actorSystem;
65 this.shardManager = shardManager;
66 this.clusterWrapper = clusterWrapper;
67 this.configuration = configuration;
70 public ActorSystem getActorSystem() {
74 public ActorRef getShardManager() {
78 public ActorSelection actorSelection(String actorPath) {
79 return actorSystem.actorSelection(actorPath);
82 public ActorSelection actorSelection(ActorPath actorPath) {
83 return actorSystem.actorSelection(actorPath);
86 public void setSchemaContext(SchemaContext schemaContext) {
87 this.schemaContext = schemaContext;
89 if(shardManager != null) {
90 shardManager.tell(new UpdateSchemaContext(schemaContext), null);
94 public void setOperationTimeout(int timeoutInSeconds) {
95 operationDuration = Duration.create(timeoutInSeconds, TimeUnit.SECONDS);
96 operationTimeout = new Timeout(operationDuration);
99 public SchemaContext getSchemaContext() {
100 return schemaContext;
104 * Finds the primary shard for the given shard name
109 public Optional<ActorSelection> findPrimaryShard(String shardName) {
110 String path = findPrimaryPathOrNull(shardName);
112 return Optional.absent();
114 return Optional.of(actorSystem.actorSelection(path));
118 * Finds a local shard given it's shard name and return it's ActorRef
120 * @param shardName the name of the local shard that needs to be found
121 * @return a reference to a local shard actor which represents the shard
122 * specified by the shardName
124 public Optional<ActorRef> findLocalShard(String shardName) {
125 Object result = executeOperation(shardManager, new FindLocalShard(shardName));
127 if (result instanceof LocalShardFound) {
128 LocalShardFound found = (LocalShardFound) result;
129 LOG.debug("Local shard found {}", found.getPath());
130 return Optional.of(found.getPath());
133 return Optional.absent();
137 private String findPrimaryPathOrNull(String shardName) {
138 Object result = executeOperation(shardManager, new FindPrimary(shardName).toSerializable());
140 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
141 PrimaryFound found = PrimaryFound.fromSerializable(result);
143 LOG.debug("Primary found {}", found.getPrimaryPath());
144 return found.getPrimaryPath();
146 } else if (result.getClass().equals(ActorNotInitialized.class)){
147 throw new NotInitializedException(
148 String.format("Found primary shard[%s] but its not initialized yet. Please try again later", shardName)
158 * Executes an operation on a local actor and wait for it's response
162 * @return The response of the operation
164 public Object executeOperation(ActorRef actor, Object message) {
165 Future<Object> future = executeOperationAsync(actor, message, operationTimeout);
168 return Await.result(future, operationDuration);
169 } catch (Exception e) {
170 throw new TimeoutException("Sending message " + message.getClass().toString() +
171 " to actor " + actor.toString() + " failed. Try again later.", e);
175 public Future<Object> executeOperationAsync(ActorRef actor, Object message, Timeout timeout) {
176 Preconditions.checkArgument(actor != null, "actor must not be null");
177 Preconditions.checkArgument(message != null, "message must not be null");
179 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
180 return ask(actor, message, timeout);
184 * Execute an operation on a remote actor and wait for it's response
190 public Object executeOperation(ActorSelection actor, Object message) {
191 Future<Object> future = executeOperationAsync(actor, message);
194 return Await.result(future, operationDuration);
195 } catch (Exception e) {
196 throw new TimeoutException("Sending message " + message.getClass().toString() +
197 " to actor " + actor.toString() + " failed. Try again later.", e);
202 * Execute an operation on a remote actor asynchronously.
204 * @param actor the ActorSelection
205 * @param message the message to send
206 * @return a Future containing the eventual result
208 public Future<Object> executeOperationAsync(ActorSelection actor, Object message) {
209 Preconditions.checkArgument(actor != null, "actor must not be null");
210 Preconditions.checkArgument(message != null, "message must not be null");
212 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
214 return ask(actor, message, operationTimeout);
218 * Sends an operation to be executed by a remote actor asynchronously without waiting for a
219 * reply (essentially set and forget).
221 * @param actor the ActorSelection
222 * @param message the message to send
224 public void sendOperationAsync(ActorSelection actor, Object message) {
225 Preconditions.checkArgument(actor != null, "actor must not be null");
226 Preconditions.checkArgument(message != null, "message must not be null");
228 LOG.debug("Sending message {} to {}", message.getClass().toString(), actor.toString());
230 actor.tell(message, ActorRef.noSender());
233 public void shutdown() {
234 shardManager.tell(PoisonPill.getInstance(), null);
235 actorSystem.shutdown();
238 public ClusterWrapper getClusterWrapper() {
239 return clusterWrapper;
242 public String getCurrentMemberName(){
243 return clusterWrapper.getCurrentMemberName();
247 * Send the message to each and every shard
251 public void broadcast(Object message){
252 for(String shardName : configuration.getAllShardNames()){
254 Optional<ActorSelection> primary = findPrimaryShard(shardName);
255 if (primary.isPresent()) {
256 primary.get().tell(message, ActorRef.noSender());
258 LOG.warn("broadcast failed to send message {} to shard {}. Primary not found",
259 message.getClass().getSimpleName(), shardName);
264 public FiniteDuration getOperationDuration() {
265 return operationDuration;
268 public boolean isLocalPath(String path) {
269 String selfAddress = clusterWrapper.getSelfAddress();
270 if (path == null || selfAddress == null) {
274 int atIndex1 = path.indexOf("@");
275 int atIndex2 = selfAddress.indexOf("@");
277 if (atIndex1 == -1 || atIndex2 == -1) {
281 int slashIndex1 = path.indexOf("/", atIndex1);
282 int slashIndex2 = selfAddress.indexOf("/", atIndex2);
284 if (slashIndex1 == -1 || slashIndex2 == -1) {
288 String hostPort1 = path.substring(atIndex1, slashIndex1);
289 String hostPort2 = selfAddress.substring(atIndex2, slashIndex2);
291 return hostPort1.equals(hostPort2);