2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
18 import org.opendaylight.controller.cluster.datastore.Configuration;
19 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
20 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
21 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
23 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import scala.concurrent.Await;
27 import scala.concurrent.Future;
28 import scala.concurrent.duration.Duration;
29 import scala.concurrent.duration.FiniteDuration;
31 import java.util.concurrent.TimeUnit;
33 import static akka.pattern.Patterns.ask;
36 * The ActorContext class contains utility methods which could be used by
37 * non-actors (like DistributedDataStore) to work with actors a little more
38 * easily. An ActorContext can be freely passed around to local object instances
39 * but should not be passed to actors especially remote actors
41 public class ActorContext {
42 private static final Logger
43 LOG = LoggerFactory.getLogger(ActorContext.class);
45 public static final FiniteDuration ASK_DURATION =
46 Duration.create(5, TimeUnit.SECONDS);
47 public static final Duration AWAIT_DURATION =
48 Duration.create(5, TimeUnit.SECONDS);
50 private final ActorSystem actorSystem;
51 private final ActorRef shardManager;
52 private final ClusterWrapper clusterWrapper;
53 private final Configuration configuration;
55 private SchemaContext schemaContext = null;
57 public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
58 ClusterWrapper clusterWrapper,
59 Configuration configuration) {
60 this.actorSystem = actorSystem;
61 this.shardManager = shardManager;
62 this.clusterWrapper = clusterWrapper;
63 this.configuration = configuration;
66 public ActorSystem getActorSystem() {
70 public ActorRef getShardManager() {
74 public ActorSelection actorSelection(String actorPath) {
75 return actorSystem.actorSelection(actorPath);
78 public ActorSelection actorSelection(ActorPath actorPath) {
79 return actorSystem.actorSelection(actorPath);
84 * Finds the primary for a given shard
89 public ActorSelection findPrimary(String shardName) {
90 String path = findPrimaryPath(shardName);
91 return actorSystem.actorSelection(path);
94 public String findPrimaryPath(String shardName) {
95 Object result = executeLocalOperation(shardManager,
96 new FindPrimary(shardName).toSerializable(), ASK_DURATION);
98 if (result.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
99 PrimaryFound found = PrimaryFound.fromSerializable(result);
101 LOG.debug("Primary found {}", found.getPrimaryPath());
103 return found.getPrimaryPath();
105 throw new PrimaryNotFoundException("Could not find primary for shardName " + shardName);
110 * Executes an operation on a local actor and wait for it's response
115 * @return The response of the operation
117 public Object executeLocalOperation(ActorRef actor, Object message,
118 FiniteDuration duration) {
119 Future<Object> future =
120 ask(actor, message, new Timeout(duration));
123 return Await.result(future, AWAIT_DURATION);
124 } catch (Exception e) {
125 throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
130 * Execute an operation on a remote actor and wait for it's response
137 public Object executeRemoteOperation(ActorSelection actor, Object message,
138 FiniteDuration duration) {
140 LOG.debug("Sending remote message {} to {}", message.getClass().toString(), actor.toString());
142 Future<Object> future =
143 ask(actor, message, new Timeout(duration));
146 return Await.result(future, AWAIT_DURATION);
147 } catch (Exception e) {
148 throw new TimeoutException("Sending message " + message.getClass().toString() + " to actor " + actor.toString() + " failed" , e);
153 * Execute an operation on the primary for a given shard
155 * This method first finds the primary for a given shard ,then sends
156 * the message to the remote shard and waits for a response
163 * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
164 * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
166 public Object executeShardOperation(String shardName, Object message,
167 FiniteDuration duration) {
168 ActorSelection primary = findPrimary(shardName);
170 return executeRemoteOperation(primary, message, duration);
173 public void shutdown() {
174 shardManager.tell(PoisonPill.getInstance(), null);
175 actorSystem.shutdown();
179 * @deprecated Need to stop using this method. There are ways to send a
180 * remote ActorRef as a string which should be used instead of this hack
183 * @param localPathOfRemoteActor
187 public String resolvePath(final String primaryPath,
188 final String localPathOfRemoteActor) {
189 StringBuilder builder = new StringBuilder();
190 String[] primaryPathElements = primaryPath.split("/");
191 builder.append(primaryPathElements[0]).append("//")
192 .append(primaryPathElements[1]).append(primaryPathElements[2]);
193 String[] remotePathElements = localPathOfRemoteActor.split("/");
194 for (int i = 3; i < remotePathElements.length; i++) {
195 builder.append("/").append(remotePathElements[i]);
198 return builder.toString();
202 public ActorPath actorFor(String path){
203 return actorSystem.actorFor(path).path();
206 public String getCurrentMemberName(){
207 return clusterWrapper.getCurrentMemberName();