2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore.utils;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSelection;
14 import akka.actor.ActorSystem;
15 import akka.util.Timeout;
16 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
17 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20 import scala.concurrent.Await;
21 import scala.concurrent.Future;
22 import scala.concurrent.duration.Duration;
23 import scala.concurrent.duration.FiniteDuration;
25 import java.util.concurrent.TimeUnit;
27 import static akka.pattern.Patterns.ask;
30 * The ActorContext class contains utility methods which could be used by
31 * non-actors (like DistributedDataStore) to work with actors a little more
32 * easily. An ActorContext can be freely passed around to local object instances
33 * but should not be passed to actors especially remote actors
35 public class ActorContext {
36 private static final Logger
37 LOG = LoggerFactory.getLogger(ActorContext.class);
39 public static final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
40 public static final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
42 private final ActorSystem actorSystem;
43 private final ActorRef shardManager;
45 public ActorContext(ActorSystem actorSystem, ActorRef shardManager){
46 this.actorSystem = actorSystem;
47 this.shardManager = shardManager;
50 public ActorSystem getActorSystem() {
54 public ActorRef getShardManager() {
58 public ActorSelection actorSelection(String actorPath){
59 return actorSystem.actorSelection(actorPath);
62 public ActorSelection actorSelection(ActorPath actorPath){
63 return actorSystem.actorSelection(actorPath);
68 * Finds the primary for a given shard
73 public ActorSelection findPrimary(String shardName) {
74 Object result = executeLocalOperation(shardManager,
75 new FindPrimary(shardName), ASK_DURATION);
77 if(result instanceof PrimaryFound){
78 PrimaryFound found = (PrimaryFound) result;
80 LOG.error("Primary found {}", found.getPrimaryPath());
82 return actorSystem.actorSelection(found.getPrimaryPath());
84 throw new RuntimeException("primary was not found");
88 * Executes an operation on a local actor and wait for it's response
92 * @return The response of the operation
94 public Object executeLocalOperation(ActorRef actor, Object message,
95 FiniteDuration duration){
96 Future<Object> future =
97 ask(actor, message, new Timeout(duration));
100 return Await.result(future, AWAIT_DURATION);
101 } catch (Exception e) {
102 throw new RuntimeException(e);
107 * Execute an operation on a remote actor and wait for it's response
113 public Object executeRemoteOperation(ActorSelection actor, Object message,
114 FiniteDuration duration){
115 Future<Object> future =
116 ask(actor, message, new Timeout(duration));
119 return Await.result(future, AWAIT_DURATION);
120 } catch (Exception e) {
121 throw new RuntimeException(e);
126 * Execute an operation on the primary for a given shard
128 * This method first finds the primary for a given shard ,then sends
129 * the message to the remote shard and waits for a response
134 * @throws java.lang.RuntimeException when a primary is not found or if the message to the remote shard fails or times out
138 public Object executeShardOperation(String shardName, Object message, FiniteDuration duration){
139 ActorSelection primary = findPrimary(shardName);
141 return executeRemoteOperation(primary, message, duration);