2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import com.google.common.cache.CacheBuilder;
18 import com.google.common.cache.CacheLoader;
19 import com.google.common.cache.LoadingCache;
20 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
21 import org.opendaylight.controller.cluster.datastore.Configuration;
22 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
23 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
24 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
25 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
26 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import scala.concurrent.Await;
30 import scala.concurrent.Future;
31 import scala.concurrent.duration.Duration;
32 import scala.concurrent.duration.FiniteDuration;
34 import java.util.concurrent.TimeUnit;
36 import static akka.pattern.Patterns.ask;
39 * The ActorContext class contains utility methods which could be used by
40 * non-actors (like DistributedDataStore) to work with actors a little more
41 * easily. An ActorContext can be freely passed around to local object instances
42 * but should not be passed to actors especially remote actors
44 public class ActorContext {
45 private static final Logger
46 LOG = LoggerFactory.getLogger(ActorContext.class);
48 public static final FiniteDuration ASK_DURATION =
49 Duration.create(5, TimeUnit.SECONDS);
50 public static final Duration AWAIT_DURATION =
51 Duration.create(5, TimeUnit.SECONDS);
53 private final ActorSystem actorSystem;
54 private final ActorRef shardManager;
55 private final ClusterWrapper clusterWrapper;
56 private final Configuration configuration;
58 private SchemaContext schemaContext = null;
60 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
61 ClusterWrapper clusterWrapper,
62 Configuration configuration) {
63 this.actorSystem = actorSystem;
64 this.shardManager = shardManager;
65 this.clusterWrapper = clusterWrapper;
66 this.configuration = configuration;
69 public ActorSystem getActorSystem() {
73 public ActorRef getShardManager() {
77 public ActorSelection actorSelection(String actorPath) {
78 return actorSystem.actorSelection(actorPath);
81 public ActorSelection actorSelection(ActorPath actorPath) {
82 return actorSystem.actorSelection(actorPath);
87 * Finds the primary for a given shard
92 public ActorSelection findPrimary(String shardName) {
93 String path = findPrimaryPath(shardName);
94 return actorSystem.actorSelection(path);
97 public String findPrimaryPath(String shardName) {
98 Object result = executeLocalOperation(shardManager,
99 new FindPrimary(shardName).toSerializable(), ASK_DURATION);
101 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
102 PrimaryFound found = PrimaryFound.fromSerializable(result);
104 LOG.debug("Primary found {}", found.getPrimaryPath());
106 return found.getPrimaryPath();
108 throw new PrimaryNotFoundException();
113 * Executes an operation on a local actor and wait for it's response
118 * @return The response of the operation
120 public Object executeLocalOperation(ActorRef actor, Object message,
121 FiniteDuration duration) {
122 Future<Object> future =
123 ask(actor, message, new Timeout(duration));
126 return Await.result(future, AWAIT_DURATION);
127 } catch (Exception e) {
128 throw new TimeoutException(e);
133 * Execute an operation on a remote actor and wait for it's response
140 public Object executeRemoteOperation(ActorSelection actor, Object message,
141 FiniteDuration duration) {
143 LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
145 Future<Object> future =
146 ask(actor, message, new Timeout(duration));
149 return Await.result(future, AWAIT_DURATION);
150 } catch (Exception e) {
151 throw new TimeoutException(e);
156 * Execute an operation on the primary for a given shard
158 * This method first finds the primary for a given shard ,then sends
159 * the message to the remote shard and waits for a response
166 * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
167 * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
169 public Object executeShardOperation(String shardName, Object message,
170 FiniteDuration duration) {
171 ActorSelection primary = findPrimary(shardName);
173 return executeRemoteOperation(primary, message, duration);
176 public void shutdown() {
177 shardManager.tell(PoisonPill.getInstance(), null);
178 actorSystem.shutdown();
181 public String getRemoteActorPath(final String shardName,
182 final String localPathOfRemoteActor) {
183 final String path = findPrimaryPath(shardName);
185 LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
186 .expireAfterAccess(2, TimeUnit.SECONDS)
188 new CacheLoader<String, String>() {
189 public String load(String key) {
190 return resolvePath(path, localPathOfRemoteActor);
194 return graphs.getUnchecked(localPathOfRemoteActor);
197 public String resolvePath(final String primaryPath,
198 final String localPathOfRemoteActor) {
199 StringBuilder builder = new StringBuilder();
200 String[] primaryPathElements = primaryPath.split("/");
201 builder.append(primaryPathElements[0]).append("//")
202 .append(primaryPathElements[1]).append(primaryPathElements[2]);
203 String[] remotePathElements = localPathOfRemoteActor.split("/");
204 for (int i = 3; i < remotePathElements.length; i++) {
205 builder.append("/").append(remotePathElements[i]);
208 return builder.toString();
212 public ActorPath actorFor(String path){
213 return actorSystem.actorFor(path).path();
216 public String getCurrentMemberName(){
217 return clusterWrapper.getCurrentMemberName();