2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.util.Timeout;
17 import org.opendaylight.controller.cluster.datastore.Configuration;
18 import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
19 import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
20 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
21 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
22 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25 import scala.concurrent.Await;
26 import scala.concurrent.Future;
27 import scala.concurrent.duration.Duration;
28 import scala.concurrent.duration.FiniteDuration;
30 import java.util.concurrent.TimeUnit;
32 import static akka.pattern.Patterns.ask;
35 * The ActorContext class contains utility methods which could be used by
36 * non-actors (like DistributedDataStore) to work with actors a little more
37 * easily. An ActorContext can be freely passed around to local object instances
38 * but should not be passed to actors especially remote actors
40 public class ActorContext {
41 private static final Logger
42 LOG = LoggerFactory.getLogger(ActorContext.class);
44 public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
45 public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
47 private final ActorSystem actorSystem;
48 private final ActorRef shardManager;
49 private final Configuration configuration;
51 private SchemaContext schemaContext = null;
53 public ActorContext(ActorSystem actorSystem, ActorRef shardManager, Configuration configuration){
54 this.actorSystem = actorSystem;
55 this.shardManager = shardManager;
56 this.configuration = configuration;
59 public ActorSystem getActorSystem() {
63 public ActorRef getShardManager() {
67 public ActorSelection actorSelection(String actorPath){
68 return actorSystem.actorSelection(actorPath);
71 public ActorSelection actorSelection(ActorPath actorPath){
72 return actorSystem.actorSelection(actorPath);
77 * Finds the primary for a given shard
82 public ActorSelection findPrimary(String shardName) {
83 Object result = executeLocalOperation(shardManager,
84 new FindPrimary(shardName), ASK_DURATION);
86 if(result instanceof PrimaryFound){
87 PrimaryFound found = (PrimaryFound) result;
89 LOG.error("Primary found {}", found.getPrimaryPath());
91 return actorSystem.actorSelection(found.getPrimaryPath());
93 throw new PrimaryNotFoundException();
97 * Executes an operation on a local actor and wait for it's response
101 * @return The response of the operation
103 public Object executeLocalOperation(ActorRef actor, Object message,
104 FiniteDuration duration){
105 Future<Object> future =
106 ask(actor, message, new Timeout(duration));
109 return Await.result(future, AWAIT_DURATION);
110 } catch (Exception e) {
111 throw new TimeoutException(e);
116 * Execute an operation on a remote actor and wait for it's response
122 public Object executeRemoteOperation(ActorSelection actor, Object message,
123 FiniteDuration duration){
124 Future<Object> future =
125 ask(actor, message, new Timeout(duration));
128 return Await.result(future, AWAIT_DURATION);
129 } catch (Exception e) {
130 throw new TimeoutException(e);
135 * Execute an operation on the primary for a given shard
137 * This method first finds the primary for a given shard ,then sends
138 * the message to the remote shard and waits for a response
143 * @throws org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException if the message to the remote shard times out
144 * @throws org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException if the primary shard is not found
148 public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
149 ActorSelection primary = findPrimary(shardName);
151 return executeRemoteOperation(primary, message, duration);
154 public void shutdown() {
155 shardManager.tell(PoisonPill.getInstance(), null);
156 actorSystem.shutdown();