</instructions>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <propertyExpansion>checkstyle.violationSeverity=error</propertyExpansion>
+ </configuration>
+ </plugin>
</plugins>
</build>
private static final long serialVersionUID = 1L;
- public RemoteDOMRpcException(final String message,final Throwable cause) {
+ RemoteDOMRpcException(final String message, final Throwable cause) {
super(message,cause);
}
}
import akka.dispatch.OnComplete;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.CheckedFuture;
import java.util.concurrent.ExecutionException;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
-/**
- * @author tony
- *
- */
class RemoteDOMRpcFuture extends AbstractFuture<DOMRpcResult> implements CheckedFuture<DOMRpcResult, DOMRpcException> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteDOMRpcFuture.class);
} catch (final ExecutionException e) {
throw mapException(e);
} catch (final InterruptedException e) {
- throw Throwables.propagate(e);
+ throw new RemoteDOMRpcException("Interruped while invoking RPC", e);
}
}
} catch (final ExecutionException e) {
throw mapException(e);
} catch (final InterruptedException e) {
- throw Throwables.propagate(e);
+ throw new RemoteDOMRpcException("Interruped while invoking RPC", e);
}
}
- private DOMRpcException mapException(final ExecutionException e) {
- final Throwable cause = e.getCause();
+ private DOMRpcException mapException(final ExecutionException ex) {
+ final Throwable cause = ex.getCause();
if (cause instanceof DOMRpcException) {
return (DOMRpcException) cause;
}
- return new RemoteDOMRpcException("Exception during invoking RPC", e);
+ return new RemoteDOMRpcException("Exception during invoking RPC", ex);
}
private final class FutureUpdater extends OnComplete<Object> {
final NormalizedNode<?, ?> input) {
if (input instanceof RemoteRpcInput) {
LOG.warn("Rpc {} was removed during execution or there is loop present. Failing received rpc.", rpc);
- return Futures
- .<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException(
+ return Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(
+ new DOMRpcImplementationNotAvailableException(
"Rpc implementation for {} was removed during processing.", rpc));
}
final RemoteDOMRpcFuture frontEndFuture = RemoteDOMRpcFuture.create(rpc.getType().getLastComponent());
} else {
final ActorRef remoteImplRef = new LatestEntryRoutingLogic(routePairs).select();
final Object executeRpcMessage = ExecuteRpc.from(rpc, input);
- LOG.debug("Found remote actor {} for rpc {} - sending {}", remoteImplRef, rpc.getType(), executeRpcMessage);
+ LOG.debug("Found remote actor {} for rpc {} - sending {}", remoteImplRef, rpc.getType(),
+ executeRpcMessage);
frontEndFuture.completeWith(ask(remoteImplRef, executeRpcMessage, config.getAskDuration()));
}
}
import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
class RemoteRpcInput implements ContainerNode {
private final ContainerNode delegate;
}
protected static RemoteRpcInput from(@Nullable final NormalizedNode<?, ?> node) {
- if(node == null) {
+ if (node == null) {
return null;
}
*/
public class RemoteRpcProvider implements AutoCloseable, Provider, SchemaContextListener {
- private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
-
- private final DOMRpcProviderService rpcProvisionRegistry;
-
- private ListenerRegistration<SchemaContextListener> schemaListenerRegistration;
- private final ActorSystem actorSystem;
- private SchemaService schemaService;
- private DOMRpcService rpcService;
- private SchemaContext schemaContext;
- private ActorRef rpcManager;
- private final RemoteRpcProviderConfig config;
-
-
- public RemoteRpcProvider(final ActorSystem actorSystem,
- final DOMRpcProviderService rpcProvisionRegistry,
- final RemoteRpcProviderConfig config) {
- this.actorSystem = actorSystem;
- this.rpcProvisionRegistry = rpcProvisionRegistry;
- this.config = Preconditions.checkNotNull(config);
- }
-
- public void setRpcService(DOMRpcService rpcService) {
- this.rpcService = rpcService;
- }
-
- public void setSchemaService(SchemaService schemaService) {
- this.schemaService = schemaService;
- }
-
- @Override
- public void close() throws Exception {
- if (schemaListenerRegistration != null) {
- schemaListenerRegistration.close();
- schemaListenerRegistration = null;
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteRpcProvider.class);
+
+ private final DOMRpcProviderService rpcProvisionRegistry;
+
+ private ListenerRegistration<SchemaContextListener> schemaListenerRegistration;
+ private final ActorSystem actorSystem;
+ private SchemaService schemaService;
+ private DOMRpcService rpcService;
+ private SchemaContext schemaContext;
+ private ActorRef rpcManager;
+ private final RemoteRpcProviderConfig config;
+
+ public RemoteRpcProvider(final ActorSystem actorSystem, final DOMRpcProviderService rpcProvisionRegistry,
+ final RemoteRpcProviderConfig config) {
+ this.actorSystem = actorSystem;
+ this.rpcProvisionRegistry = rpcProvisionRegistry;
+ this.config = Preconditions.checkNotNull(config);
+ }
+
+ public void setRpcService(DOMRpcService rpcService) {
+ this.rpcService = rpcService;
+ }
+
+ public void setSchemaService(SchemaService schemaService) {
+ this.schemaService = schemaService;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (schemaListenerRegistration != null) {
+ schemaListenerRegistration.close();
+ schemaListenerRegistration = null;
+ }
+ }
+
+ @Override
+ public void onSessionInitiated(final Broker.ProviderSession session) {
+ schemaService = session.getService(SchemaService.class);
+ rpcService = session.getService(DOMRpcService.class);
+ start();
+ }
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ return null;
+ }
+
+ public void start() {
+ LOG.info("Starting remote rpc service...");
+
+ schemaContext = schemaService.getGlobalContext();
+ rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext, rpcProvisionRegistry, rpcService, config),
+ config.getRpcManagerName());
+ schemaListenerRegistration = schemaService.registerSchemaContextListener(this);
+ LOG.debug("rpc manager started");
+ }
+
+ @Override
+ public void onGlobalContextUpdated(final SchemaContext newSchemaContext) {
+ this.schemaContext = newSchemaContext;
+ rpcManager.tell(new UpdateSchemaContext(newSchemaContext), null);
}
- }
-
- @Override
- public void onSessionInitiated(final Broker.ProviderSession session) {
- schemaService = session.getService(SchemaService.class);
- rpcService = session.getService(DOMRpcService.class);
- start();
- }
-
- @Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- return null;
- }
-
- public void start() {
- LOG.info("Starting remote rpc service...");
-
- schemaContext = schemaService.getGlobalContext();
- rpcManager = actorSystem.actorOf(RpcManager.props(schemaContext,
- rpcProvisionRegistry, rpcService, config), config.getRpcManagerName());
- schemaListenerRegistration = schemaService.registerSchemaContextListener(this);
- LOG.debug("rpc manager started");
- }
-
- @Override
- public void onGlobalContextUpdated(final SchemaContext schemaContext) {
- this.schemaContext = schemaContext;
- rpcManager.tell(new UpdateSchemaContext(schemaContext), null);
- }
}
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import scala.concurrent.duration.FiniteDuration;
-/**
- */
public class RemoteRpcProviderConfig extends CommonConfig {
protected static final String TAG_RPC_BROKER_NAME = "rpc-broker-name";
private Timeout cachedAskDuration;
private FiniteDuration cachedGossipTickInterval;
- public RemoteRpcProviderConfig(Config config){
+ public RemoteRpcProviderConfig(Config config) {
super(config);
}
- public String getRpcBrokerName(){
+ public String getRpcBrokerName() {
return get().getString(TAG_RPC_BROKER_NAME);
}
- public String getRpcRegistryName(){
+ public String getRpcRegistryName() {
return get().getString(TAG_RPC_REGISTRY_NAME);
}
- public String getRpcManagerName(){
+ public String getRpcManagerName() {
return get().getString(TAG_RPC_MGR_NAME);
}
- public String getRpcBrokerPath(){
+ public String getRpcBrokerPath() {
return get().getString(TAG_RPC_BROKER_PATH);
}
- public String getRpcRegistryPath(){
+ public String getRpcRegistryPath() {
return get().getString(TAG_RPC_REGISTRY_PATH);
}
- public String getRpcManagerPath(){
+ public String getRpcManagerPath() {
return get().getString(TAG_RPC_MGR_PATH);
}
-
- public Timeout getAskDuration(){
- if (cachedAskDuration != null){
+ public Timeout getAskDuration() {
+ if (cachedAskDuration != null) {
return cachedAskDuration;
}
return cachedAskDuration;
}
- public FiniteDuration getGossipTickInterval(){
+ public FiniteDuration getGossipTickInterval() {
if (cachedGossipTickInterval != null) {
return cachedGossipTickInterval;
}
*/
public static RemoteRpcProviderConfig newInstance(String actorSystemName, boolean metricCaptureEnabled,
int mailboxCapacity) {
- return new Builder(actorSystemName).metricCaptureEnabled(metricCaptureEnabled).
- mailboxCapacity(mailboxCapacity).build();
+ return new Builder(actorSystemName).metricCaptureEnabled(metricCaptureEnabled)
+ .mailboxCapacity(mailboxCapacity).build();
}
- public static class Builder extends CommonConfig.Builder<Builder>{
+ public static class Builder extends CommonConfig.Builder<Builder> {
- public Builder(String actorSystemName){
+ public Builder(String actorSystemName) {
super(actorSystemName);
//Actor names
}
@Override
- public RemoteRpcProviderConfig build(){
+ public RemoteRpcProviderConfig build() {
return new RemoteRpcProviderConfig(merge());
}
}
-
-
}
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, YangInstanceIdentifier>,Serializable {
- private static final long serialVersionUID = 1L;
-
- private final QName context;
- private final QName type;
- private final YangInstanceIdentifier route;
-
- public RouteIdentifierImpl(final QName context, final QName type, final YangInstanceIdentifier route) {
- Preconditions.checkNotNull(type, "Rpc type should not be null");
- this.context = context;
- this.type = type;
- this.route = route;
- }
-
- @Override
- public QName getContext() {
- return context;
- }
-
- @Override
- public QName getType() {
- return type;
- }
-
- @Override
- public YangInstanceIdentifier getRoute() {
- return route;
- }
-
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- final RouteIdentifierImpl that = (RouteIdentifierImpl) o;
-
- if (context == null){
- if (that.getContext() != null) return false;
- }else
- if (!context.equals(that.context)) return false;
-
- if (route == null){
- if (that.getRoute() != null) return false;
- }else
- if (!route.equals(that.route)) return false;
-
- if (type == null){
- if (that.getType() != null) return false;
- }else
- if (!type.equals(that.type)) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 0;
- result = prime * result + (context == null ? 0:context.hashCode());
- result = prime * result + (type == null ? 0:type.hashCode());
- result = prime * result + (route == null ? 0:route.hashCode());
- return result;
- }
-
- @Override
- public String toString() {
- return "RouteIdentifierImpl{" +
- "context=" + context +
- ", type=" + type +
- ", route=" + route +
- '}';
- }
+public class RouteIdentifierImpl
+ implements RpcRouter.RouteIdentifier<QName, QName, YangInstanceIdentifier>, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final QName context;
+ private final QName type;
+ private final YangInstanceIdentifier route;
+
+ public RouteIdentifierImpl(final QName context, final QName type, final YangInstanceIdentifier route) {
+ Preconditions.checkNotNull(type, "Rpc type should not be null");
+ this.context = context;
+ this.type = type;
+ this.route = route;
+ }
+
+ @Override
+ public QName getContext() {
+ return context;
+ }
+
+ @Override
+ public QName getType() {
+ return type;
+ }
+
+ @Override
+ public YangInstanceIdentifier getRoute() {
+ return route;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final RouteIdentifierImpl that = (RouteIdentifierImpl) obj;
+
+ if (context == null) {
+ if (that.getContext() != null) {
+ return false;
+ }
+ } else if (!context.equals(that.context)) {
+ return false;
+ }
+
+ if (route == null) {
+ if (that.getRoute() != null) {
+ return false;
+ }
+ } else if (!route.equals(that.route)) {
+ return false;
+ }
+
+ if (type == null) {
+ if (that.getType() != null) {
+ return false;
+ }
+ } else if (!type.equals(that.type)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 0;
+ result = prime * result + (context == null ? 0 : context.hashCode());
+ result = prime * result + (type == null ? 0 : type.hashCode());
+ result = prime * result + (route == null ? 0 : route.hashCode());
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RouteIdentifierImpl{" + "context=" + context + ", type=" + type + ", route=" + route + '}';
+ }
}
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Actor to initiate execution of remote RPC on other nodes of the cluster.
*/
public class RpcBroker extends AbstractUntypedActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(RpcBroker.class);
private final DOMRpcService rpcService;
private RpcBroker(final DOMRpcService rpcService) {
}
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
private void executeRpc(final ExecuteRpc msg) {
LOG.debug("Executing rpc {}", msg.getRpc());
final NormalizedNode<?, ?> input = RemoteRpcInput.from(msg.getInputNormalizedNode());
}
@Override
- public void onFailure(final Throwable t) {
- LOG.error("executeRpc for {} failed with root cause: {}. For exception details, enable Debug logging.",
- msg.getRpc(), Throwables.getRootCause(t));
- if(LOG.isDebugEnabled()) {
- LOG.debug("Detailed exception for execute RPC failure :{}", t);
+ public void onFailure(final Throwable failure) {
+ LOG.error(
+ "executeRpc for {} failed with root cause: {}. For exception details, enable Debug logging.",
+ msg.getRpc(), Throwables.getRootCause(failure));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Detailed exception for execute RPC failure :{}", failure);
}
- sender.tell(new akka.actor.Status.Failure(t), self);
+ sender.tell(new akka.actor.Status.Failure(failure), self);
}
});
- } catch (final Exception e) {
+ } catch (final RuntimeException e) {
sender.tell(new akka.actor.Status.Failure(e), sender);
}
}
public RpcErrorsException(final String message, final Iterable<RpcError> rpcErrors) {
super(message);
- for(final RpcError rpcError: rpcErrors) {
+ for (final RpcError rpcError: rpcErrors) {
rpcErrorDataList.add(new RpcErrorData(rpcError.getSeverity(), rpcError.getErrorType(),
rpcError.getTag(), rpcError.getApplicationTag(), rpcError.getMessage(),
rpcError.getInfo(), rpcError.getCause()));
public Collection<RpcError> getRpcErrors() {
final Collection<RpcError> rpcErrors = new ArrayList<>();
- for(final RpcErrorData ed: rpcErrorDataList) {
- final RpcError rpcError = ed.severity == ErrorSeverity.ERROR ?
- RpcResultBuilder.newError(ed.errorType, ed.tag, ed.message, ed.applicationTag,
+ for (final RpcErrorData ed: rpcErrorDataList) {
+ final RpcError rpcError = ed.severity == ErrorSeverity.ERROR
+ ? RpcResultBuilder.newError(ed.errorType, ed.tag, ed.message, ed.applicationTag,
ed.info, ed.cause) :
- RpcResultBuilder.newWarning(ed.errorType, ed.tag, ed.message, ed.applicationTag,
+ RpcResultBuilder.newWarning(ed.errorType, ed.tag, ed.message, ed.applicationTag,
ed.info, ed.cause);
rpcErrors.add(rpcError);
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RpcListener implements DOMRpcAvailabilityListener{
+public class RpcListener implements DOMRpcAvailabilityListener {
- private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
- private final ActorRef rpcRegistry;
+ private static final Logger LOG = LoggerFactory.getLogger(RpcListener.class);
+ private final ActorRef rpcRegistry;
- public RpcListener(final ActorRef rpcRegistry) {
- this.rpcRegistry = rpcRegistry;
- }
+ public RpcListener(final ActorRef rpcRegistry) {
+ this.rpcRegistry = rpcRegistry;
+ }
@Override
public void onRpcAvailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
for (final DOMRpcIdentifier rpc : rpcs) {
- final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
+ final RpcRouter.RouteIdentifier<?,?,?> routeId =
+ new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
routeIds.add(routeId);
}
final RpcRegistry.Messages.AddOrUpdateRoutes addRpcMsg = new RpcRegistry.Messages.AddOrUpdateRoutes(routeIds);
@Override
public void onRpcUnavailable(@Nonnull final Collection<DOMRpcIdentifier> rpcs) {
Preconditions.checkArgument(rpcs != null, "Input Collection of DOMRpcIdentifier can not be null.");
- if(LOG.isDebugEnabled()) {
- LOG.debug("Removing registration for [{}]", rpcs);
- }
+
+ LOG.debug("Removing registration for [{}]", rpcs);
+
final List<RpcRouter.RouteIdentifier<?,?,?>> routeIds = new ArrayList<>();
for (final DOMRpcIdentifier rpc : rpcs) {
- final RpcRouter.RouteIdentifier<?,?,?> routeId = new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
+ final RpcRouter.RouteIdentifier<?,?,?> routeId =
+ new RouteIdentifierImpl(null, rpc.getType().getLastComponent(), rpc.getContextReference());
routeIds.add(routeId);
}
final RpcRegistry.Messages.RemoveRoutes removeRpcMsg = new RpcRegistry.Messages.RemoveRoutes(routeIds);
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
+import akka.actor.SupervisorStrategy.Directive;
import akka.japi.Function;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.RpcDefinition;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
/**
- * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown.
- *
- * It also starts the rpc listeners
+ * This class acts as a supervisor, creates all the actors, resumes them, if an exception is thrown. It also starts
+ * the rpc listeners
*/
public class RpcManager extends AbstractUntypedActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(RpcManager.class);
-
private SchemaContext schemaContext;
private ActorRef rpcBroker;
private ActorRef rpcRegistry;
}
- public static Props props(final SchemaContext schemaContext,
- final DOMRpcProviderService rpcProvisionRegistry, final DOMRpcService rpcServices,
- final RemoteRpcProviderConfig config) {
- Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
- Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
- Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
- return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
- }
+ public static Props props(final SchemaContext schemaContext, final DOMRpcProviderService rpcProvisionRegistry,
+ final DOMRpcService rpcServices, final RemoteRpcProviderConfig config) {
+ Preconditions.checkNotNull(schemaContext, "SchemaContext can not be null!");
+ Preconditions.checkNotNull(rpcProvisionRegistry, "RpcProviderService can not be null!");
+ Preconditions.checkNotNull(rpcServices, "RpcService can not be null!");
+ return Props.create(RpcManager.class, schemaContext, rpcProvisionRegistry, rpcServices, config);
+ }
private void createRpcActors() {
LOG.debug("Create rpc registry and broker actors");
- rpcRegistry =
- getContext().actorOf(RpcRegistry.props(config).
- withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
+ rpcRegistry = getContext().actorOf(RpcRegistry.props(config)
+ .withMailbox(config.getMailBoxName()), config.getRpcRegistryName());
- rpcBroker =
- getContext().actorOf(RpcBroker.props(rpcServices).
- withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
+ rpcBroker = getContext().actorOf(RpcBroker.props(rpcServices)
+ .withMailbox(config.getMailBoxName()), config.getRpcBrokerName());
final RpcRegistry.Messages.SetLocalRouter localRouter = new RpcRegistry.Messages.SetLocalRouter(rpcBroker);
rpcRegistry.tell(localRouter, self());
private void registerRoutedRpcDelegate() {
final Set<DOMRpcIdentifier> rpcIdentifiers = new HashSet<>();
final Set<Module> modules = schemaContext.getModules();
- for(final Module module : modules){
- for(final RpcDefinition rpcDefinition : module.getRpcs()){
- if(RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
+ for (final Module module : modules) {
+ for (final RpcDefinition rpcDefinition : module.getRpcs()) {
+ if (RpcRoutingStrategy.from(rpcDefinition).isContextBasedRouted()) {
LOG.debug("Adding routed rpcDefinition for path {}", rpcDefinition.getPath());
rpcIdentifiers.add(DOMRpcIdentifier.create(rpcDefinition.getPath(), YangInstanceIdentifier.EMPTY));
}
}
/**
- * Add all the locally registered RPCs in the clustered routing table
+ * Add all the locally registered RPCs in the clustered routing table.
*/
- private void announceSupportedRpcs(){
+ private void announceSupportedRpcs() {
LOG.debug("Adding all supported rpcs to routing table");
final Set<RpcDefinition> currentlySupportedRpc = schemaContext.getOperations();
final List<DOMRpcIdentifier> rpcs = new ArrayList<>();
for (final RpcDefinition rpcDef : currentlySupportedRpc) {
rpcs.add(DOMRpcIdentifier.create(rpcDef.getPath()));
}
- if(!rpcs.isEmpty()) {
+
+ if (!rpcs.isEmpty()) {
rpcListener.onRpcAvailable(rpcs);
}
}
@Override
protected void handleReceive(final Object message) throws Exception {
- if(message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- }
-
+ if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ }
}
private void updateSchemaContext(final UpdateSchemaContext message) {
- schemaContext = message.getSchemaContext();
- registerRoutedRpcDelegate();
- rpcBroker.tell(message, ActorRef.noSender());
+ schemaContext = message.getSchemaContext();
+ registerRoutedRpcDelegate();
+ rpcBroker.tell(message, ActorRef.noSender());
}
@Override
public SupervisorStrategy supervisorStrategy() {
- return new OneForOneStrategy(10, Duration.create("1 minute"),
- new Function<Throwable, SupervisorStrategy.Directive>() {
- @Override
- public SupervisorStrategy.Directive apply(final Throwable t) {
- LOG.error("An exception happened actor will be resumed", t);
+ return new OneForOneStrategy(10, Duration.create("1 minute"), (Function<Throwable, Directive>) t -> {
+ LOG.error("An exception happened actor will be resumed", t);
- return SupervisorStrategy.resume();
- }
- }
- );
+ return SupervisorStrategy.resume();
+ });
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TerminationMonitor extends UntypedActor{
+public class TerminationMonitor extends UntypedActor {
private static final Logger LOG = LoggerFactory.getLogger(TerminationMonitor.class);
- public TerminationMonitor(){
+ public TerminationMonitor() {
LOG.debug("Created TerminationMonitor");
}
@Override public void onReceive(Object message) throws Exception {
- if(message instanceof Terminated){
+ if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
- if(LOG.isDebugEnabled()) {
- LOG.debug("Actor terminated : {}", terminated.actor());
- }
- }else if(message instanceof Monitor){
- Monitor monitor = (Monitor) message;
- getContext().watch(monitor.getActorRef());
+ LOG.debug("Actor terminated : {}", terminated.actor());
+ } else if (message instanceof Monitor) {
+ Monitor monitor = (Monitor) message;
+ getContext().watch(monitor.getActorRef());
}
}
}
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-/**
- * @author tony
- *
- */
public class ExecuteRpc implements Serializable {
private static final long serialVersionUID = 1128904894827335676L;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public class UpdateSchemaContext {
+ private final SchemaContext schemaContext;
- private final SchemaContext schemaContext;
+ public UpdateSchemaContext(final SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+ }
- public UpdateSchemaContext(final SchemaContext schemaContext) {
- this.schemaContext = schemaContext;
- }
-
- public SchemaContext getSchemaContext() {
- return schemaContext;
- }
+ public SchemaContext getSchemaContext() {
+ return schemaContext;
+ }
}
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
-
import org.opendaylight.controller.remote.rpc.registry.gossip.Copier;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
return copy;
}
- public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ public Option<Pair<ActorRef, Long>> getRouterFor(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
Long updatedTime = table.get(routeId);
if (updatedTime == null || router == null) {
return table.keySet();
}
- public void addRoute(RpcRouter.RouteIdentifier<?,?,?> routeId){
+ public void addRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
table.put(routeId, System.currentTimeMillis());
}
- public void removeRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ public void removeRoute(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
table.remove(routeId);
}
- public boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ public boolean contains(RpcRouter.RouteIdentifier<?, ?, ?> routeId) {
return table.containsKey(routeId);
}
- public boolean isEmpty(){
+ public boolean isEmpty() {
return table.isEmpty();
}
@Override
public String toString() {
- return "RoutingTable{" +
- "table=" + table +
- ", router=" + router +
- '}';
+ return "RoutingTable{" + "table=" + table + ", router=" + router + '}';
}
}
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry.Messages.SetLocalRouter;
import org.opendaylight.controller.remote.rpc.registry.gossip.Bucket;
import org.opendaylight.controller.remote.rpc.registry.gossip.BucketStore;
-import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBean;
import org.opendaylight.controller.remote.rpc.registry.mbeans.RemoteRpcRegistryMXBeanImpl;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
}
/**
- * Register's rpc broker
+ * Registers a rpc broker.
*
* @param message contains {@link akka.actor.ActorRef} for rpc broker
*/
getLocalBucket().getData().setRouter(message.getRouter());
}
- /**
- * @param msg
- */
private void receiveAddRoutes(AddOrUpdateRoutes msg) {
log.debug("AddOrUpdateRoutes: {}", msg.getRouteIdentifiers());
RoutingTable table = getLocalBucket().getData().copy();
- for(RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> routeId : msg.getRouteIdentifiers()) {
table.addRoute(routeId);
}
}
/**
+ * Processes a RemoveRoutes message.
+ *
* @param msg contains list of route ids to remove
*/
private void receiveRemoveRoutes(RemoveRoutes msg) {
/**
* Finds routers for the given rpc.
*
- * @param findRouters
+ * @param findRouters the FindRouters request
*/
private void receiveGetRouter(final FindRouters findRouters) {
log.debug("receiveGetRouter for {}", findRouters.getRouteIdentifier());
final ActorRef sender = getSender();
- if(!findRouters(findRouters, sender)) {
+ if (!findRouters(findRouters, sender)) {
log.debug("No routers found for {} - scheduling {} ms timer", findRouters.getRouteIdentifier(),
findRouterTimeout.toMillis());
final Runnable routesUpdatedRunnable = new Runnable() {
@Override
public void run() {
- if(findRouters(findRouters, sender)) {
+ if (findRouters(findRouters, sender)) {
routesUpdatedCallbacks.remove(this);
timer.get().cancel();
}
routesUpdatedCallbacks.add(routesUpdatedRunnable);
- Runnable timerRunnable = new Runnable() {
- @Override
- public void run() {
- log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
+ Runnable timerRunnable = () -> {
+ log.warn("Timed out finding routers for {}", findRouters.getRouteIdentifier());
- routesUpdatedCallbacks.remove(routesUpdatedRunnable);
- sender.tell(new Messages.FindRoutersReply(
- Collections.<Pair<ActorRef, Long>>emptyList()), self());
- }
+ routesUpdatedCallbacks.remove(routesUpdatedRunnable);
+ sender.tell(new Messages.FindRoutersReply(
+ Collections.<Pair<ActorRef, Long>>emptyList()), self());
};
timer.set(getContext().system().scheduler().scheduleOnce(findRouterTimeout, self(), timerRunnable,
RouteIdentifier<?, ?, ?> routeId = findRouters.getRouteIdentifier();
findRoutes(getLocalBucket().getData(), routeId, routers);
- for(Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
+ for (Bucket<RoutingTable> bucket : getRemoteBuckets().values()) {
findRoutes(bucket.getData(), routeId, routers);
}
log.debug("Found {} routers for {}", routers.size(), findRouters.getRouteIdentifier());
boolean foundRouters = !routers.isEmpty();
- if(foundRouters) {
+ if (foundRouters) {
sender.tell(new Messages.FindRoutersReply(routers), getSelf());
}
}
Option<Pair<ActorRef, Long>> routerWithUpdateTime = table.getRouterFor(routeId);
- if(!routerWithUpdateTime.isEmpty()) {
+ if (!routerWithUpdateTime.isEmpty()) {
routers.add(routerWithUpdateTime.get());
}
}
@Override
protected void onBucketsUpdated() {
- if(routesUpdatedCallbacks.isEmpty()) {
+ if (routesUpdatedCallbacks.isEmpty()) {
return;
}
- for(Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
+ for (Runnable callBack: routesUpdatedCallbacks.toArray(new Runnable[routesUpdatedCallbacks.size()])) {
callBack.run();
}
}
/**
- * All messages used by the RpcRegistry
+ * All messages used by the RpcRegistry.
*/
public static class Messages {
final List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers;
public ContainsRoute(List<RpcRouter.RouteIdentifier<?, ?, ?>> routeIdentifiers) {
- Preconditions.checkArgument(routeIdentifiers != null &&
- !routeIdentifiers.isEmpty(),
- "Route Identifiers must be supplied");
+ Preconditions.checkArgument(routeIdentifiers != null && !routeIdentifiers.isEmpty(),
+ "Route Identifiers must be supplied");
this.routeIdentifiers = routeIdentifiers;
}
@Override
public String toString() {
- return "ContainsRoute{" +
- "routeIdentifiers=" + routeIdentifiers +
- '}';
+ return "ContainsRoute{" + "routeIdentifiers=" + routeIdentifiers + '}';
}
}
@Override
public String toString() {
- return "SetLocalRouter{" +
- "router=" + router +
- '}';
+ return "SetLocalRouter{" + "router=" + router + '}';
}
}
@Override
public String toString() {
- return "FindRouters{" +
- "routeIdentifier=" + routeIdentifier +
- '}';
+ return "FindRouters{" + "routeIdentifier=" + routeIdentifier + '}';
}
}
@Override
public String toString() {
- return "FindRoutersReply{" +
- "routerWithUpdateTime=" + routerWithUpdateTime +
- '}';
+ return "FindRoutersReply{" + "routerWithUpdateTime=" + routerWithUpdateTime + '}';
}
}
}
@Override
public RpcRegistry create() throws Exception {
RpcRegistry registry = new RpcRegistry(config);
- RemoteRpcRegistryMXBean mxBean = new RemoteRpcRegistryMXBeanImpl(registry);
+ new RemoteRpcRegistryMXBeanImpl(registry);
return registry;
}
}
*/
package org.opendaylight.controller.remote.rpc.registry.gossip;
-
public interface Bucket<T extends Copier<T>> {
Long getVersion();
+
T getData();
}
public void setData(T data) {
this.data = data;
- this.version = System.currentTimeMillis()+1;
+ this.version = System.currentTimeMillis() + 1;
}
@Override
@Override
public String toString() {
- return "BucketImpl{" +
- "version=" + version +
- ", data=" + data +
- '}';
+ return "BucketImpl{" + "version=" + version + ", data=" + data + '}';
}
}
* A store that syncs its data across nodes in the cluster.
* It maintains a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Bucket} per node. Buckets are versioned.
* A node can write ONLY to its bucket. This way, write conflicts are avoided.
+ *
* <p>
- * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol)<p>
+ * Buckets are sync'ed across nodes using Gossip protocol (http://en.wikipedia.org/wiki/Gossip_protocol).
* This store uses a {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}.
*
*/
protected final Logger log = LoggerFactory.getLogger(getClass());
/**
- * Bucket owned by the node
+ * Bucket owned by the node.
*/
private final BucketImpl<T> localBucket = new BucketImpl<>();
/**
- * Buckets ownded by other known nodes in the cluster
+ * Buckets ownded by other known nodes in the cluster.
*/
private final Map<Address, Bucket<T>> remoteBuckets = new HashMap<>();
/**
- * Bucket version for every known node in the cluster including this node
+ * Bucket version for every known node in the cluster including this node.
*/
private final Map<Address, Long> versions = new HashMap<>();
/**
- * Cluster address for this node
+ * Cluster address for this node.
*/
private Address selfAddress;
private final RemoteRpcProviderConfig config;
- public BucketStore(RemoteRpcProviderConfig config){
+ public BucketStore(RemoteRpcProviderConfig config) {
this.config = Preconditions.checkNotNull(config);
}
@Override
- public void preStart(){
+ public void preStart() {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
}
}
+ @SuppressWarnings("unchecked")
@Override
protected void handleReceive(Object message) throws Exception {
if (probe != null) {
} else if (message instanceof UpdateRemoteBuckets) {
receiveUpdateRemoteBuckets(((UpdateRemoteBuckets<T>) message).getBuckets());
} else {
- if(log.isDebugEnabled()) {
- log.debug("Unhandled message [{}]", message);
- }
+ log.debug("Unhandled message [{}]", message);
unhandled(message);
}
}
}
/**
- * Returns all the buckets the this node knows about, self owned + remote
+ * Returns all the buckets the this node knows about, self owned + remote.
*/
- void receiveGetAllBuckets(){
+ void receiveGetAllBuckets() {
final ActorRef sender = getSender();
sender.tell(new GetAllBucketsReply<>(getAllBuckets()), getSelf());
}
/**
- * Helper to collect all known buckets
+ * Helper to collect all known buckets.
*
* @return self owned + remote buckets
*/
- Map<Address, Bucket<T>> getAllBuckets(){
+ Map<Address, Bucket<T>> getAllBuckets() {
Map<Address, Bucket<T>> all = new HashMap<>(remoteBuckets.size() + 1);
//first add the local bucket
}
/**
- * Returns buckets for requested members that this node knows about
+ * Returns buckets for requested members that this node knows about.
*
* @param members requested members
*/
- void receiveGetBucketsByMembers(Set<Address> members){
+ void receiveGetBucketsByMembers(Set<Address> members) {
final ActorRef sender = getSender();
Map<Address, Bucket<T>> buckets = getBucketsByMembers(members);
sender.tell(new GetBucketsByMembersReply<>(buckets), getSelf());
}
/**
- * Helper to collect buckets for requested memebers
+ * Helper to collect buckets for requested members.
*
* @param members requested members
- * @return buckets for requested memebers
+ * @return buckets for requested members
*/
Map<Address, Bucket<T>> getBucketsByMembers(Set<Address> members) {
Map<Address, Bucket<T>> buckets = new HashMap<>();
}
//then get buckets for requested remote nodes
- for (Address address : members){
+ for (Address address : members) {
if (remoteBuckets.containsKey(address)) {
buckets.put(address, remoteBuckets.get(address));
}
}
/**
- * Returns versions for all buckets known
+ * Returns versions for all buckets known.
*/
- void receiveGetBucketVersions(){
+ void receiveGetBucketVersions() {
final ActorRef sender = getSender();
GetBucketVersionsReply reply = new GetBucketVersionsReply(versions);
sender.tell(reply, getSelf());
}
/**
- * Update local copy of remote buckets where local copy's version is older
+ * Update local copy of remote buckets where local copy's version is older.
*
* @param receivedBuckets buckets sent by remote
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Gossiper}
*/
- void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets){
+ void receiveUpdateRemoteBuckets(Map<Address, Bucket<T>> receivedBuckets) {
log.debug("{}: receiveUpdateRemoteBuckets: {}", selfAddress, receivedBuckets);
- if (receivedBuckets == null || receivedBuckets.isEmpty())
- {
+ if (receivedBuckets == null || receivedBuckets.isEmpty()) {
return; //nothing to do
}
//Remote cant update self's bucket
receivedBuckets.remove(selfAddress);
- for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()){
+ for (Map.Entry<Address, Bucket<T>> entry : receivedBuckets.entrySet()) {
Long localVersion = versions.get(entry.getKey());
if (localVersion == null) {
}
}
- if(log.isDebugEnabled()) {
- log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
- }
+ log.debug("State after update - Local Bucket [{}], Remote Buckets [{}]", localBucket, remoteBuckets);
onBucketsUpdated();
}
import akka.cluster.Member;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashSet;
* for update.
*
*/
-
public class Gossiper extends AbstractUntypedActorWithMetering {
private final Logger log = LoggerFactory.getLogger(getClass());
private Address selfAddress;
/**
- * All known cluster members
+ * All known cluster members.
*/
private List<Address> clusterMembers = new ArrayList<>();
private final RemoteRpcProviderConfig config;
- public Gossiper(RemoteRpcProviderConfig config){
+ public Gossiper(RemoteRpcProviderConfig config) {
this.config = Preconditions.checkNotNull(config);
}
/**
- * Helpful for testing
+ * Constructor for testing.
+ *
* @param autoStartGossipTicks used for turning off gossip ticks during testing.
* Gossip tick can be manually sent.
*/
- public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config){
+ @VisibleForTesting
+ public Gossiper(Boolean autoStartGossipTicks, RemoteRpcProviderConfig config) {
this(config);
this.autoStartGossipTicks = autoStartGossipTicks;
}
@Override
- public void preStart(){
+ public void preStart() {
ActorRefProvider provider = getContext().provider();
selfAddress = provider.getDefaultAddress();
}
@Override
- public void postStop(){
+ public void postStop() {
if (cluster != null) {
cluster.unsubscribe(getSelf());
}
}
}
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void handleReceive(Object message) throws Exception {
//Usually sent by self via gossip task defined above. But its not enforced.
} else if (message instanceof ClusterEvent.MemberRemoved) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.MemberRemoved) message).member());
- } else if ( message instanceof ClusterEvent.UnreachableMember){
+ } else if ( message instanceof ClusterEvent.UnreachableMember) {
receiveMemberRemoveOrUnreachable(((ClusterEvent.UnreachableMember) message).member());
} else {
*/
void receiveMemberRemoveOrUnreachable(Member member) {
//if its self, then stop itself
- if (selfAddress.equals(member.address())){
+ if (selfAddress.equals(member.address())) {
getContext().stop(getSelf());
return;
}
clusterMembers.remove(member.address());
- if(log.isDebugEnabled()) {
- log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
- }
+ log.debug("Removed member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
- * Add member to the local copy of member list if it doesnt already
- * @param member
+ * Add member to the local copy of member list if it doesn't already.
+ *
+ * @param member the member to add
*/
void receiveMemberUp(Member member) {
if (!clusterMembers.contains(member.address())) {
clusterMembers.add(member.address());
}
- if(log.isDebugEnabled()) {
- log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
- }
+
+ log.debug("Added member [{}], Active member list [{}]", member.address(), clusterMembers);
}
/**
- * Sends Gossip status to other members in the cluster. <br/>
- * 1. If there are no member, ignore the tick. </br>
- * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br/>
+ * Sends Gossip status to other members in the cluster.
+ * <br>
+ * 1. If there are no member, ignore the tick. <br>
+ * 2. If there's only 1 member, send gossip status (bucket versions) to it. <br>
* 3. If there are more than one member, randomly pick one and send gossip status (bucket versions) to it.
*/
- void receiveGossipTick(){
+ void receiveGossipTick() {
if (clusterMembers.size() == 0) {
return; //no members to send gossip status to
}
Integer randomIndex = ThreadLocalRandom.current().nextInt(0, clusterMembers.size());
remoteMemberToGossipTo = clusterMembers.get(randomIndex);
}
- if(log.isTraceEnabled()) {
- log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
- }
+
+ log.trace("Gossiping to [{}]", remoteMemberToGossipTo);
getLocalStatusAndSendTo(remoteMemberToGossipTo);
}
/**
* Process gossip status received from a remote gossiper. Remote versions are compared with
- * the local copy. <p>
- *
+ * the local copy.
+ * <p/>
* For each bucket
* <ul>
* <li>If local copy is newer, the newer buckets are sent in GossipEnvelope to remote</li>
*
* @param status bucket versions from a remote member
*/
- void receiveGossipStatus(GossipStatus status){
+ void receiveGossipStatus(GossipStatus status) {
//Don't accept messages from non-members
if (!clusterMembers.contains(status.from())) {
return;
*
* @param envelope contains buckets from a remote gossiper
*/
- void receiveGossip(GossipEnvelope envelope){
+ <T extends Copier<T>> void receiveGossip(GossipEnvelope<T> envelope) {
//TODO: Add more validations
if (!selfAddress.equals(envelope.to())) {
- if(log.isTraceEnabled()) {
- log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(), envelope.to());
+ if (log.isTraceEnabled()) {
+ log.trace("Ignoring message intended for someone else. From [{}] to [{}]", envelope.from(),
+ envelope.to());
}
return;
}
}
/**
- * Helper to send received buckets to bucket store
+ * Helper to send received buckets to bucket store.
*
- * @param buckets
+ * @param buckets map of Buckets to update
*/
- void updateRemoteBuckets(Map<Address, Bucket> buckets) {
-
- UpdateRemoteBuckets updateRemoteBuckets = new UpdateRemoteBuckets(buckets);
+ <T extends Copier<T>> void updateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
+ UpdateRemoteBuckets<T> updateRemoteBuckets = new UpdateRemoteBuckets<>(buckets);
getContext().parent().tell(updateRemoteBuckets, getSelf());
}
/**
- * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper
+ * Gets the buckets from bucket store for the given node addresses and sends them to remote gossiper.
*
* @param remote remote node to send Buckets to
* @param addresses node addresses whose buckets needs to be sent
*/
- void sendGossipTo(final ActorRef remote, final Set<Address> addresses){
+ void sendGossipTo(final ActorRef remote, final Set<Address> addresses) {
Future<Object> futureReply =
Patterns.ask(getContext().parent(), new GetBucketsByMembers(addresses), config.getAskDuration());
}
/**
- * Gets bucket versions from bucket store and sends to the supplied address
+ * Gets bucket versions from bucket store and sends to the supplied address.
*
* @param remoteActorSystemAddress remote gossiper to send to
*/
- void getLocalStatusAndSendTo(Address remoteActorSystemAddress){
+ void getLocalStatusAndSendTo(Address remoteActorSystemAddress) {
//Get local status from bucket store and send to remote
Future<Object> futureReply =
ActorSelection remoteRef = getContext().system().actorSelection(
remoteActorSystemAddress.toString() + getSelf().path().toStringWithoutAddress());
- if(log.isTraceEnabled()) {
- log.trace("Sending bucket versions to [{}]", remoteRef);
- }
+ log.trace("Sending bucket versions to [{}]", remoteRef);
futureReply.map(getMapperToSendLocalStatus(remoteRef), getContext().dispatcher());
}
/**
- * Helper to send bucket versions received from local store
+ * Helper to send bucket versions received from local store.
+ *
* @param remote remote gossiper to send versions to
* @param localVersions bucket versions received from local store
*/
- void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions){
+ void sendGossipStatusTo(ActorRef remote, Map<Address, Long> localVersions) {
GossipStatus status = new GossipStatus(selfAddress, localVersions);
remote.tell(status, getSelf());
}
- void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions){
+ void sendGossipStatusTo(ActorSelection remote, Map<Address, Long> localVersions) {
GossipStatus status = new GossipStatus(selfAddress, localVersions);
remote.tell(status, getSelf());
/// Private factories to create mappers
///
- private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote){
+ private Mapper<Object, Void> getMapperToSendLocalStatus(final ActorSelection remote) {
return new Mapper<Object, Void>() {
@Override
* @return a {@link akka.dispatch.Mapper} that gets evaluated in future
*
*/
- private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status){
+ private Mapper<Object, Void> getMapperToProcessRemoteStatus(final ActorRef sender, final GossipStatus status) {
final Map<Address, Long> remoteVersions = status.getVersions();
localIsNewer.removeAll(remoteVersions.keySet());
- for (Address address : remoteVersions.keySet()){
+ for (Address address : remoteVersions.keySet()) {
if (localVersions.get(address) == null || remoteVersions.get(address) == null) {
continue; //this condition is taken care of by above diffs
* {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope}
*
* @param sender the remote member that sent
- * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
- * in reply to which bucket is being sent back
+ * {@link org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus}
+ * in reply to which bucket is being sent back
* @return a {@link akka.dispatch.Mapper} that gets evaluated in future
*
*/
private Mapper<Object, Void> getMapperToSendGossip(final ActorRef sender) {
return new Mapper<Object, Void>() {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Void apply(Object msg) {
if (msg instanceof GetBucketsByMembersReply) {
- Map<Address, Bucket> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
- if(log.isTraceEnabled()) {
- log.trace("Buckets to send from {}: {}", selfAddress, buckets);
- }
+ Map<Address, Bucket<?>> buckets = ((GetBucketsByMembersReply) msg).getBuckets();
+ log.trace("Buckets to send from {}: {}", selfAddress, buckets);
GossipEnvelope envelope = new GossipEnvelope(selfAddress, sender.path().address(), buckets);
sender.tell(envelope, getSelf());
}
*/
public class Messages {
- public static class BucketStoreMessages{
+ public static class BucketStoreMessages {
public static class GetAllBuckets implements Serializable {
private static final long serialVersionUID = 1L;
}
- public static class GetBucketsByMembers implements Serializable{
+ public static class GetBucketsByMembers implements Serializable {
private static final long serialVersionUID = 1L;
private final Set<Address> members;
- public GetBucketsByMembers(Set<Address> members){
+ public GetBucketsByMembers(Set<Address> members) {
Preconditions.checkArgument(members != null, "members can not be null");
this.members = members;
}
}
}
- public static class ContainsBuckets<T extends Copier<T>> implements Serializable{
+ public static class ContainsBuckets<T extends Copier<T>> implements Serializable {
private static final long serialVersionUID = -4940160367495308286L;
private final Map<Address, Bucket<T>> buckets;
- public ContainsBuckets(Map<Address, Bucket<T>> buckets){
+ public ContainsBuckets(Map<Address, Bucket<T>> buckets) {
Preconditions.checkArgument(buckets != null, "buckets can not be null");
this.buckets = buckets;
}
public Map<Address, Bucket<T>> getBuckets() {
Map<Address, Bucket<T>> copy = new HashMap<>(buckets.size());
- for (Map.Entry<Address, Bucket<T>> entry : buckets.entrySet()){
+ for (Map.Entry<Address, Bucket<T>> entry : buckets.entrySet()) {
//ignore null entries
- if ( (entry.getKey() == null) || (entry.getValue() == null) ) {
+ if ( entry.getKey() == null || entry.getValue() == null ) {
continue;
}
copy.put(entry.getKey(), entry.getValue());
}
}
- public static class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable{
+ public static class GetAllBucketsReply<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable {
private static final long serialVersionUID = 1L;
+
public GetAllBucketsReply(Map<Address, Bucket<T>> buckets) {
super(buckets);
}
}
- public static class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable{
+ public static class GetBucketsByMembersReply<T extends Copier<T>> extends ContainsBuckets<T>
+ implements Serializable {
private static final long serialVersionUID = 1L;
+
public GetBucketsByMembersReply(Map<Address, Bucket<T>> buckets) {
super(buckets);
}
private static final long serialVersionUID = 1L;
}
- public static class ContainsBucketVersions implements Serializable{
+ public static class ContainsBucketVersions implements Serializable {
private static final long serialVersionUID = -8172148925383801613L;
Map<Address, Long> versions;
}
- public static class GetBucketVersionsReply extends ContainsBucketVersions implements Serializable{
+ public static class GetBucketVersionsReply extends ContainsBucketVersions implements Serializable {
private static final long serialVersionUID = 1L;
+
public GetBucketVersionsReply(Map<Address, Long> versions) {
super(versions);
}
}
- public static class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable{
+ public static class UpdateRemoteBuckets<T extends Copier<T>> extends ContainsBuckets<T>
+ implements Serializable {
private static final long serialVersionUID = 1L;
+
public UpdateRemoteBuckets(Map<Address, Bucket<T>> buckets) {
super(buckets);
}
}
}
- public static class GossiperMessages{
+ public static class GossiperMessages {
public static class Tick implements Serializable {
private static final long serialVersionUID = -4770935099506366773L;
}
private static final long serialVersionUID = 5803354404380026143L;
}
- public static final class GossipStatus extends ContainsBucketVersions implements Serializable{
+ public static final class GossipStatus extends ContainsBucketVersions implements Serializable {
private static final long serialVersionUID = -593037395143883265L;
private final Address from;
}
}
- public static final class GossipEnvelope<T extends Copier<T>> extends ContainsBuckets<T> implements Serializable {
+ public static final class GossipEnvelope<T extends Copier<T>> extends ContainsBuckets<T>
+ implements Serializable {
private static final long serialVersionUID = 8346634072582438818L;
private final Address from;
import java.util.Set;
/**
- * JMX bean to check remote rpc registry
+ * JMX bean to check remote rpc registry.
*/
-
public interface RemoteRpcRegistryMXBean {
Set<String> getGlobalRpc();
package org.opendaylight.controller.remote.rpc.registry.mbeans;
import akka.actor.Address;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
import org.opendaylight.controller.remote.rpc.registry.RoutingTable;
import org.opendaylight.controller.remote.rpc.registry.RpcRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
public class RemoteRpcRegistryMXBeanImpl extends AbstractMXBean implements RemoteRpcRegistryMXBean {
protected final Logger log = LoggerFactory.getLogger(getClass());
- private final String NULL_CONSTANT = "null";
+ private static final String NULL_CONSTANT = "null";
- private final String LOCAL_CONSTANT = "local";
+ private static final String LOCAL_CONSTANT = "local";
- private final String ROUTE_CONSTANT = "route:";
+ private static final String ROUTE_CONSTANT = "route:";
- private final String NAME_CONSTANT = " | name:";
+ private static final String NAME_CONSTANT = " | name:";
private final RpcRegistry rpcRegistry;
public Set<String> getGlobalRpc() {
RoutingTable table = rpcRegistry.getLocalBucket().getData();
Set<String> globalRpc = new HashSet<>(table.getRoutes().size());
- for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
- if(route.getRoute() == null) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
+ if (route.getRoute() == null) {
globalRpc.add(route.getType() != null ? route.getType().toString() : NULL_CONSTANT);
}
}
- if(log.isDebugEnabled()) {
- log.debug("Locally registered global RPCs {}", globalRpc);
- }
+
+ log.debug("Locally registered global RPCs {}", globalRpc);
return globalRpc;
}
public Set<String> getLocalRegisteredRoutedRpc() {
RoutingTable table = rpcRegistry.getLocalBucket().getData();
Set<String> routedRpc = new HashSet<>(table.getRoutes().size());
- for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
- if(route.getRoute() != null) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
+ if (route.getRoute() != null) {
StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
- builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null ?
- route.getType().toString() : NULL_CONSTANT);
+ builder.append(route.getRoute().toString()).append(NAME_CONSTANT).append(route.getType() != null
+ ? route.getType().toString() : NULL_CONSTANT);
routedRpc.add(builder.toString());
}
}
- if(log.isDebugEnabled()) {
- log.debug("Locally registered routed RPCs {}", routedRpc);
- }
+
+ log.debug("Locally registered routed RPCs {}", routedRpc);
return routedRpc;
}
// Get all RPCs from remote bucket
Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
- for(Address address : buckets.keySet()) {
+ for (Address address : buckets.keySet()) {
RoutingTable table = buckets.get(address).getData();
rpcMap.putAll(getRpcMemberMapByName(table, name, address.toString()));
}
- if(log.isDebugEnabled()) {
- log.debug("list of RPCs {} searched by name {}", rpcMap, name);
- }
+
+ log.debug("list of RPCs {} searched by name {}", rpcMap, name);
return rpcMap;
}
Map<String, String> rpcMap = new HashMap<>(getRpcMemberMapByRoute(localTable, routeId, LOCAL_CONSTANT));
Map<Address, Bucket<RoutingTable>> buckets = rpcRegistry.getRemoteBuckets();
- for(Address address : buckets.keySet()) {
+ for (Address address : buckets.keySet()) {
RoutingTable table = buckets.get(address).getData();
rpcMap.putAll(getRpcMemberMapByRoute(table, routeId, address.toString()));
}
- if(log.isDebugEnabled()) {
- log.debug("list of RPCs {} searched by route {}", rpcMap, routeId);
- }
+
+ log.debug("list of RPCs {} searched by route {}", rpcMap, routeId);
return rpcMap;
}
/**
- * Search if the routing table route String contains routeName
+ * Search if the routing table route String contains routeName.
*/
-
private Map<String,String> getRpcMemberMapByRoute(final RoutingTable table, final String routeName,
final String address) {
Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
Map<String, String> rpcMap = new HashMap<>(routes.size());
- for(RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()){
- if(route.getRoute() != null) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> route : table.getRoutes()) {
+ if (route.getRoute() != null) {
String routeString = route.getRoute().toString();
- if(routeString.contains(routeName)) {
+ if (routeString.contains(routeName)) {
StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
- builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null ?
- route.getType().toString() : NULL_CONSTANT);
+ builder.append(routeString).append(NAME_CONSTANT).append(route.getType() != null
+ ? route.getType().toString() : NULL_CONSTANT);
rpcMap.put(builder.toString(), address);
}
}
}
/**
- * Search if the routing table route type contains name
+ * Search if the routing table route type contains name.
*/
private Map<String, String> getRpcMemberMapByName(final RoutingTable table, final String name,
final String address) {
Set<RpcRouter.RouteIdentifier<?, ?, ?>> routes = table.getRoutes();
Map<String, String> rpcMap = new HashMap<>(routes.size());
- for(RpcRouter.RouteIdentifier<?, ?, ?> route : routes){
- if(route.getType() != null) {
+ for (RpcRouter.RouteIdentifier<?, ?, ?> route : routes) {
+ if (route.getType() != null) {
String type = route.getType().toString();
- if(type.contains(name)) {
+ if (type.contains(name)) {
StringBuilder builder = new StringBuilder(ROUTE_CONSTANT);
- builder.append(route.getRoute() != null ? route.getRoute().toString(): NULL_CONSTANT)
+ builder.append(route.getRoute() != null ? route.getRoute().toString() : NULL_CONSTANT)
.append(NAME_CONSTANT).append(type);
rpcMap.put(builder.toString(), address);
}
return rpcMap;
}
-
-
@Override
public String getBucketVersions() {
return rpcRegistry.getVersions().toString();
}
-
-}
\ No newline at end of file
+}
import akka.actor.ActorRef;
import akka.japi.Pair;
import com.google.common.base.Preconditions;
-
import java.util.Collection;
import java.util.Comparator;
import java.util.SortedSet;
import java.util.TreeSet;
/**
- * This class will return First Entry
+ * This class will return First Entry.
*/
-public class LatestEntryRoutingLogic implements RoutingLogic{
+public class LatestEntryRoutingLogic implements RoutingLogic {
- private SortedSet<Pair<ActorRef, Long>> actorRefSet;
+ private final SortedSet<Pair<ActorRef, Long>> actorRefSet;
- public LatestEntryRoutingLogic(Collection<Pair<ActorRef, Long>> entries) {
- Preconditions.checkNotNull(entries, "Entries should not be null");
- Preconditions.checkArgument(!entries.isEmpty(), "Entries collection should not be empty");
+ public LatestEntryRoutingLogic(Collection<Pair<ActorRef, Long>> entries) {
+ Preconditions.checkNotNull(entries, "Entries should not be null");
+ Preconditions.checkArgument(!entries.isEmpty(), "Entries collection should not be empty");
- actorRefSet = new TreeSet<>(new LatestEntryComparator());
- actorRefSet.addAll(entries);
- }
+ actorRefSet = new TreeSet<>(new LatestEntryComparator());
+ actorRefSet.addAll(entries);
+ }
- @Override
- public ActorRef select() {
- return actorRefSet.last().first();
- }
+ @Override
+ public ActorRef select() {
+ return actorRefSet.last().first();
+ }
+ private class LatestEntryComparator implements Comparator<Pair<ActorRef, Long>> {
- private class LatestEntryComparator implements Comparator<Pair<ActorRef, Long>> {
+ @Override
+ public int compare(Pair<ActorRef, Long> o1, Pair<ActorRef, Long> o2) {
+ if (o1 == null && o2 == null) {
+ return 0;
+ }
+ if (o1 == null && o2 != null) {
+ return -1;
+ }
+ if (o1 != null && o2 == null) {
+ return 1;
+ }
- @Override
- public int compare(Pair<ActorRef, Long> o1, Pair<ActorRef, Long> o2) {
- if(o1 == null && o2 == null) {
- return 0;
- }
- if(o1 == null && o2 != null) {
- return -1;
- }
- if(o1 != null && o2 == null) {
- return 1;
- }
-
- return o1.second().compareTo(o2.second());
+ return o1.second().compareTo(o2.second());
+ }
}
-
- }
}
/**
* This Interface is added to abstract out the way rpc execution could be
* routed, if more than one node in cluster is capable of executing the rpc.
- *
* We can pick node randomly, round robin manner or based on last updated time etc.
*/
public interface RoutingLogic {
-
- ActorRef select();
+ ActorRef select();
}
this.predicate = predicate;
}
- public void tell(Object message, ActorRef sender){
- if(predicate.apply(message)) {
+ public void tell(Object message, ActorRef sender) {
+ if (predicate.apply(message)) {
log.info("sending message to probe {}", message);
actorRef.tell(message, sender);
}
static final SchemaPath TEST_RPC_TYPE = SchemaPath.create(true, TEST_RPC);
- static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(new YangInstanceIdentifier.NodeIdentifier(TEST_RPC));
+ static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.create(
+ new YangInstanceIdentifier.NodeIdentifier(TEST_RPC));
public static final DOMRpcIdentifier TEST_RPC_ID = DOMRpcIdentifier.create(TEST_RPC_TYPE, TEST_PATH);
static ActorSystem node1;
}
static void assertRpcErrorEquals(final RpcError rpcError, final ErrorSeverity severity,
- final ErrorType errorType, final String tag, final String message, final String applicationTag, final String info,
- final String causeMsg) {
+ final ErrorType errorType, final String tag, final String message, final String applicationTag,
+ final String info, final String causeMsg) {
assertEquals("getSeverity", severity, rpcError.getSeverity());
assertEquals("getErrorType", errorType, rpcError.getErrorType());
assertEquals("getTag", tag, rpcError.getTag());
assertEquals("getApplicationTag", applicationTag, rpcError.getApplicationTag());
assertEquals("getInfo", info, rpcError.getInfo());
- if(causeMsg == null) {
+ if (causeMsg == null) {
assertNull("Unexpected cause " + rpcError.getCause(), rpcError.getCause());
} else {
assertEquals("Cause message", causeMsg, rpcError.getCause().getMessage());
}
}
- static void assertCompositeNodeEquals(final NormalizedNode<? , ?> exp, final NormalizedNode<? , ? > actual) {
+ static void assertCompositeNodeEquals(final NormalizedNode<? , ?> exp, final NormalizedNode<? , ?> actual) {
assertEquals(exp, actual);
}
}
static void assertFailedRpcResult(final DOMRpcResult rpcResult, final ErrorSeverity severity,
- final ErrorType errorType, final String tag, final String message, final String applicationTag, final String info,
- final String causeMsg) {
-
+ final ErrorType errorType, final String tag, final String message, final String applicationTag,
+ final String info, final String causeMsg) {
assertNotNull("RpcResult was null", rpcResult);
final Collection<RpcError> rpcErrors = rpcResult.getErrors();
assertEquals("RpcErrors count", 1, rpcErrors.size());
import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-/***
+/**
* Unit tests for RemoteRpcImplementation.
*
* @author Thomas Pantelis
/**
- * This test method invokes and executes the remote rpc
+ * This test method invokes and executes the remote rpc.
*/
@Test
public void testInvokeRpc() throws Exception {
}
/**
- * This test method invokes and executes the remote rpc
+ * This test method invokes and executes the remote rpc.
*/
@Test
public void testInvokeRpcWithNullInput() throws Exception {
/**
- * This test method invokes and executes the remote rpc
+ * This test method invokes and executes the remote rpc.
*/
@Test
public void testInvokeRpcWithNoOutput() throws Exception {
/**
- * This test method invokes and executes the remote rpc
+ * This test method invokes and executes the remote rpc.
*/
@Test(expected = DOMRpcException.class)
public void testInvokeRpcWithRemoteFailedFuture() throws Exception {
- final ContainerNode rpcOutput = null;
- final DOMRpcResult rpcResult = new DefaultDOMRpcResult(rpcOutput);
-
final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
@SuppressWarnings({"unchecked", "rawtypes"})
final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
(ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
when(domRpcService2.invokeRpc(eq(TEST_RPC_TYPE), inputCaptor.capture())).thenReturn(
- Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcException(
- "Test Exception") {}));
+ Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new RemoteDOMRpcException(
+ "Test Exception", null)));
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
/**
* This test method invokes and tests exceptions when akka timeout occured
- *
- * Currently ignored since this test with current config takes around 15 seconds
- * to complete.
- *
+ * Currently ignored since this test with current config takes around 15 seconds to complete.
*/
@Ignore
@Test(expected = RemoteDOMRpcException.class)
public void testInvokeRpcWithAkkaTimeoutException() throws Exception {
final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
- @SuppressWarnings({"unchecked", "rawtypes"})
- final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
- (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
@Test(expected = DOMRpcException.class)
public void testInvokeRpcWithLookupException() throws Exception {
final NormalizedNode<?, ?> invokeRpcInput = makeRPCInput("foo");
- @SuppressWarnings({"unchecked", "rawtypes"})
- final ArgumentCaptor<NormalizedNode<?, ?>> inputCaptor =
- (ArgumentCaptor) ArgumentCaptor.forClass(NormalizedNode.class);
final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
assertTrue(frontEndFuture instanceof RemoteDOMRpcFuture);
}
/**
- * This test method invokes and executes the remote rpc
+ * This test method invokes and executes the remote rpc.
*/
@Test(expected = DOMRpcImplementationNotAvailableException.class)
public void testInvokeRpcWithLoopException() throws Exception {
final NormalizedNode<?, ?> invokeRpcInput = RemoteRpcInput.from(makeRPCInput("foo"));
- final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture = remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
+ final CheckedFuture<DOMRpcResult, DOMRpcException> frontEndFuture =
+ remoteRpcImpl1.invokeRpc(TEST_RPC_ID, invokeRpcInput);
frontEndFuture.checkedGet(5, TimeUnit.SECONDS);
}
-
-
- private RemoteRpcProviderConfig getConfig() {
- return new RemoteRpcProviderConfig.Builder("unit-test").build();
- }
}
import akka.testkit.TestActorRef;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.opendaylight.controller.cluster.common.actor.AkkaConfigurationReader;
import scala.concurrent.duration.FiniteDuration;
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-
public class RemoteRpcProviderConfigTest {
@Test
public void testConfigDefaults() {
-
- Config c = ConfigFactory.parseFile(new File("application.conf"));
RemoteRpcProviderConfig config = new RemoteRpcProviderConfig.Builder("unit-test").build();
//Assert on configurations from common config
Assert.assertNotNull(config.getRpcRegistryPath());
Assert.assertNotNull(config.getAskDuration());
Assert.assertNotNull(config.getGossipTickInterval());
-
-
-
}
@Test
public static class ConfigTestActor extends UntypedActor {
- private Config actorSystemConfig;
+ private final Config actorSystemConfig;
public ConfigTestActor() {
this.actorSystemConfig = getContext().system().settings().config();
/**
* Only for testing. NEVER expose actor's internal state like this.
- *
- * @return
*/
public Config getConfig() {
return actorSystemConfig;
}
}
-}
\ No newline at end of file
+}
import scala.concurrent.duration.Duration;
public class RemoteRpcProviderTest {
+ static ActorSystem system;
+ static RemoteRpcProviderConfig moduleConfig;
- static ActorSystem system;
- static RemoteRpcProviderConfig moduleConfig;
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
+ final Config config = moduleConfig.get();
+ system = ActorSystem.create("odl-cluster-rpc", config);
- @BeforeClass
- public static void setup() throws InterruptedException {
- moduleConfig = new RemoteRpcProviderConfig.Builder("odl-cluster-rpc").build();
- final Config config = moduleConfig.get();
- system = ActorSystem.create("odl-cluster-rpc", config);
-
- }
+ }
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
@Test
public void testRemoteRpcProvider() throws Exception {
@Test
public void testExecuteRpcFailureWithException() {
-
new JavaTestKit(node1) {
{
-
when(domRpcService1.invokeRpc(eq(TEST_RPC_TYPE), Mockito.<NormalizedNode<?, ?>>any()))
- .thenReturn(
- Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(new DOMRpcImplementationNotAvailableException(
- "NOT FOUND")));
+ .thenReturn(Futures.<DOMRpcResult, DOMRpcException>immediateFailedCheckedFuture(
+ new DOMRpcImplementationNotAvailableException("NOT FOUND")));
final ExecuteRpc executeMsg = ExecuteRpc.from(TEST_RPC_ID, null);
Assert.assertTrue(rpcResponse.cause() instanceof DOMRpcException);
}
};
-
}
-
}
package org.opendaylight.controller.remote.rpc.registry;
import static org.junit.Assert.fail;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.net.URI;
import java.net.URISyntaxException;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.api.RpcRouter.RouteIdentifier;
import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
public class RpcRegistryTest {
+ private static final Logger LOG = LoggerFactory.getLogger(RpcRegistryTest.class);
private static ActorSystem node1;
private static ActorSystem node2;
@BeforeClass
public static void staticSetup() throws InterruptedException {
- AkkaConfigurationReader reader = new AkkaConfigurationReader() {
- @Override
- public Config read() {
- return ConfigFactory.load();
- }
- };
+ AkkaConfigurationReader reader = () -> ConfigFactory.load();
- RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms").
- withConfigReader(reader).build();
+ RemoteRpcProviderConfig config1 = new RemoteRpcProviderConfig.Builder("memberA").gossipTickInterval("200ms")
+ .withConfigReader(reader).build();
RemoteRpcProviderConfig config2 = new RemoteRpcProviderConfig.Builder("memberB").gossipTickInterval("200ms")
.withConfigReader(reader).build();
RemoteRpcProviderConfig config3 = new RemoteRpcProviderConfig.Builder("memberC").gossipTickInterval("200ms")
static void waitForMembersUp(ActorSystem node, UniqueAddress... addresses) {
Set<UniqueAddress> otherMembersSet = Sets.newHashSet(addresses);
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
CurrentClusterState state = Cluster.get(node).state();
- for(Member m: state.getMembers()) {
- if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress()) &&
- otherMembersSet.isEmpty()) {
+ for (Member m : state.getMembers()) {
+ if (m.status() == MemberStatus.up() && otherMembersSet.remove(m.uniqueAddress())
+ && otherMembersSet.isEmpty()) {
return;
}
}
registry3 = node3.actorOf(Props.create(RpcRegistry.class, config(node3)));
}
- private RemoteRpcProviderConfig config(ActorSystem node){
+ private RemoteRpcProviderConfig config(ActorSystem node) {
return new RemoteRpcProviderConfig(node.settings().config());
}
/**
* One node cluster. 1. Register rpc, ensure router can be found 2. Then remove rpc, ensure its
* deleted
- *
- * @throws URISyntaxException
- * @throws InterruptedException
*/
@Test
public void testAddRemoveRpcOnSameNode() throws Exception {
-
- System.out.println("testAddRemoveRpcOnSameNode starting");
+ LOG.info("testAddRemoveRpcOnSameNode starting");
final JavaTestKit mockBroker = new JavaTestKit(node1);
verifyEmptyBucket(mockBroker, registry1, nodeAddress);
- System.out.println("testAddRemoveRpcOnSameNode ending");
+ LOG.info("testAddRemoveRpcOnSameNode ending");
}
/**
* Three node cluster. 1. Register rpc on 1 node, ensure 2nd node gets updated 2. Remove rpc on
* 1 node, ensure 2nd node gets updated
- *
- * @throws URISyntaxException
- * @throws InterruptedException
*/
@Test
public void testRpcAddRemoveInCluster() throws Exception {
- System.out.println("testRpcAddRemoveInCluster starting");
+ LOG.info("testRpcAddRemoveInCluster starting");
final JavaTestKit mockBroker1 = new JavaTestKit(node1);
final JavaTestKit mockBroker2 = new JavaTestKit(node2);
verifyEmptyBucket(mockBroker2, registry2, node1Address);
- System.out.println("testRpcAddRemoveInCluster ending");
+ LOG.info("testRpcAddRemoveInCluster ending");
}
private void verifyEmptyBucket(JavaTestKit testKit, ActorRef registry, Address address)
throws AssertionError {
Map<Address, Bucket<RoutingTable>> buckets;
- int nTries = 0;
- while(true) {
+ int numTries = 0;
+ while (true) {
buckets = retrieveBuckets(registry1, testKit, address);
try {
verifyBucket(buckets.get(address), Collections.<RouteIdentifier<?, ?, ?>>emptyList());
break;
} catch (AssertionError e) {
- if(++nTries >= 50) {
+ if (++numTries >= 50) {
throw e;
}
}
/**
* Three node cluster. Register rpc on 2 nodes. Ensure 3rd gets updated.
- *
- * @throws Exception
*/
@Test
public void testRpcAddedOnMultiNodes() throws Exception {
private void verifyBucket(Bucket<RoutingTable> bucket, List<RouteIdentifier<?, ?, ?>> expRouteIds) {
RoutingTable table = bucket.getData();
Assert.assertNotNull("Bucket RoutingTable is null", table);
- for(RouteIdentifier<?, ?, ?> r: expRouteIds) {
- if(!table.contains(r)) {
+ for (RouteIdentifier<?, ?, ?> r : expRouteIds) {
+ if (!table.contains(r)) {
Assert.fail("RoutingTable does not contain " + r + ". Actual: " + table);
}
}
private Map<Address, Bucket<RoutingTable>> retrieveBuckets(ActorRef bucketStore, JavaTestKit testKit,
Address... addresses) {
- int nTries = 0;
- while(true) {
+ int numTries = 0;
+ while (true) {
bucketStore.tell(new GetAllBuckets(), testKit.getRef());
@SuppressWarnings("unchecked")
GetAllBucketsReply<RoutingTable> reply = testKit.expectMsgClass(Duration.create(3, TimeUnit.SECONDS),
Map<Address, Bucket<RoutingTable>> buckets = reply.getBuckets();
boolean foundAll = true;
- for(Address addr: addresses) {
+ for (Address addr : addresses) {
Bucket<RoutingTable> bucket = buckets.get(addr);
- if(bucket == null) {
+ if (bucket == null) {
foundAll = false;
break;
}
}
- if(foundAll) {
+ if (foundAll) {
return buckets;
}
- if(++nTries >= 50) {
+ if (++numTries >= 50) {
Assert.fail("Missing expected buckets for addresses: " + Arrays.toString(addresses)
+ ", Actual: " + buckets);
}
final int nRoutes = 500;
final RouteIdentifier<?, ?, ?>[] added = new RouteIdentifier<?, ?, ?>[nRoutes];
- for(int i = 0; i < nRoutes; i++) {
+ for (int i = 0; i < nRoutes; i++) {
final RouteIdentifierImpl routeId = new RouteIdentifierImpl(null,
new QName(new URI("/mockrpc"), "type" + i), null);
added[i] = routeId;
GetAllBuckets getAllBuckets = new GetAllBuckets();
FiniteDuration duration = Duration.create(3, TimeUnit.SECONDS);
- int nTries = 0;
- while(true) {
+ int numTries = 0;
+ while (true) {
registry1.tell(getAllBuckets, testKit.getRef());
@SuppressWarnings("unchecked")
GetAllBucketsReply<RoutingTable> reply = testKit.expectMsgClass(duration, GetAllBucketsReply.class);
Bucket<RoutingTable> localBucket = reply.getBuckets().values().iterator().next();
RoutingTable table = localBucket.getData();
- if(table != null && table.size() == nRoutes) {
- for(RouteIdentifier<?, ?, ?> r: added) {
+ if (table != null && table.size() == nRoutes) {
+ for (RouteIdentifier<?, ?, ?> r : added) {
Assert.assertEquals("RoutingTable contains " + r, true, table.contains(r));
}
break;
}
- if(++nTries >= 50) {
+ if (++numTries >= 50) {
Assert.fail("Expected # routes: " + nRoutes + ", Actual: " + table.size());
}
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
+import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.typesafe.config.ConfigFactory;
import java.util.HashMap;
@BeforeClass
public static void setup() {
-
system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("unit-test"));
system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
}
@AfterClass
public static void teardown() {
- system.shutdown();
+ JavaTestKit.shutdownActorSystem(system);
}
/**
- * Given remote buckets
- * Should merge with local copy of remote buckets
+ * Given remote buckets, should merge with local copy of remote buckets.
*/
@Test
- public void testReceiveUpdateRemoteBuckets(){
+ public void testReceiveUpdateRemoteBuckets() {
- BucketStore<T> store = createStore();
+ final BucketStore<T> store = createStore();
Address localAddress = system.provider().getDefaultAddress();
Bucket<T> localBucket = new BucketImpl<>();
Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Update a bucket
- Bucket<T> b3_new = new BucketImpl<>();
+ Bucket<T> b3New = new BucketImpl<>();
remoteBuckets.clear();
- remoteBuckets.put(a3, b3_new);
+ remoteBuckets.put(a3, b3New);
remoteBuckets.put(a1, null);
remoteBuckets.put(a2, null);
store.receiveUpdateRemoteBuckets(remoteBuckets);
//Should only update a3
remoteBucketsInStore = store.getRemoteBuckets();
- Bucket<T> b3_inStore = remoteBucketsInStore.get(a3);
- Assert.assertEquals(b3_new.getVersion(), b3_inStore.getVersion());
+ Bucket<T> b3InStore = remoteBucketsInStore.get(a3);
+ Assert.assertEquals(b3New.getVersion(), b3InStore.getVersion());
//Should NOT update a1 and a2
- Bucket<T> b1_inStore = remoteBucketsInStore.get(a1);
- Bucket<T> b2_inStore = remoteBucketsInStore.get(a2);
- Assert.assertEquals(b1.getVersion(), b1_inStore.getVersion());
- Assert.assertEquals(b2.getVersion(), b2_inStore.getVersion());
+ Bucket<T> b1InStore = remoteBucketsInStore.get(a1);
+ Bucket<T> b2InStore = remoteBucketsInStore.get(a2);
+ Assert.assertEquals(b1.getVersion(), b1InStore.getVersion());
+ Assert.assertEquals(b2.getVersion(), b2InStore.getVersion());
Assert.assertTrue(remoteBucketsInStore.size() == 4);
//Should update versions map
Assert.assertEquals(4, versionsInStore.size());
Assert.assertEquals(b1.getVersion(), versionsInStore.get(a1));
Assert.assertEquals(b2.getVersion(), versionsInStore.get(a2));
- Assert.assertEquals(b3_new.getVersion(), versionsInStore.get(a3));
+ Assert.assertEquals(b3New.getVersion(), versionsInStore.get(a3));
Assert.assertEquals(b4.getVersion(), versionsInStore.get(a4));
//Send older version of bucket
//Should NOT update a3
remoteBucketsInStore = store.getRemoteBuckets();
- b3_inStore = remoteBucketsInStore.get(a3);
- Assert.assertTrue(b3_inStore.getVersion().longValue() == b3_new.getVersion().longValue());
+ b3InStore = remoteBucketsInStore.get(a3);
+ Assert.assertTrue(b3InStore.getVersion().longValue() == b3New.getVersion().longValue());
}
*
* @return instance of BucketStore class
*/
- private static BucketStore<T> createStore(){
+ private static BucketStore<T> createStore() {
final Props props = Props.create(BucketStore.class, new RemoteRpcProviderConfig(system.settings().config()));
final TestActorRef<BucketStore<T>> testRef = TestActorRef.create(system, props, "testStore");
return testRef.underlyingActor();
}
-
}
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
-import static org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
+
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Props;
+import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import com.typesafe.config.ConfigFactory;
import java.util.ArrayList;
import org.junit.Test;
import org.opendaylight.controller.remote.rpc.RemoteRpcProviderConfig;
import org.opendaylight.controller.remote.rpc.TerminationMonitor;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipEnvelope;
+import org.opendaylight.controller.remote.rpc.registry.gossip.Messages.GossiperMessages.GossipStatus;
public class GossiperTest {
@AfterClass
public static void teardown() {
- if (system != null)
- system.shutdown();
+ JavaTestKit.shutdownActorSystem(system);
}
@Before
- public void createMocks(){
+ public void createMocks() {
mockGossiper = spy(gossiper);
}
@After
- public void resetMocks(){
+ public void resetMocks() {
reset(mockGossiper);
}
@Test
- public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore(){
+ public void testReceiveGossipTick_WhenNoRemoteMemberShouldIgnore() {
mockGossiper.setClusterMembers(Collections.<Address>emptyList());
doNothing().when(mockGossiper).getLocalStatusAndSendTo(any(Address.class));
}
@Test
- public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus(){
+ public void testReceiveGossipTick_WhenRemoteMemberExistsShouldSendStatus() {
List<Address> members = new ArrayList<>();
Address remote = new Address("tcp", "member");
members.add(remote);
verify(mockGossiper, times(1)).getLocalStatusAndSendTo(any(Address.class));
}
+ @SuppressWarnings("unchecked")
@Test
- public void testReceiveGossipStatus_WhenSenderIsNonMemberShouldIgnore(){
+ public void testReceiveGossipStatus_WhenSenderIsNonMemberShouldIgnore() {
Address nonMember = new Address("tcp", "non-member");
GossipStatus remoteStatus = new GossipStatus(nonMember, mock(Map.class));
verify(mockGossiper, times(0)).getSender();
}
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
- public void testReceiveGossip_WhenNotAddressedToSelfShouldIgnore(){
+ public void testReceiveGossipWhenNotAddressedToSelfShouldIgnore() {
Address notSelf = new Address("tcp", "not-self");
GossipEnvelope envelope = new GossipEnvelope(notSelf, notSelf, mock(Map.class));
*
* @return instance of Gossiper class
*/
- private static Gossiper createGossiper(){
-
- final Props props = Props.create(Gossiper.class, false, new RemoteRpcProviderConfig(system.settings().config()));
+ private static Gossiper createGossiper() {
+ final Props props = Props.create(Gossiper.class, false,
+ new RemoteRpcProviderConfig(system.settings().config()));
final TestActorRef<Gossiper> testRef = TestActorRef.create(system, props, "testGossiper");
return testRef.underlyingActor();
}
-}
\ No newline at end of file
+}
package org.opendaylight.controller.remote.rpc.utils;
import static org.junit.Assert.assertTrue;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import com.typesafe.config.ConfigFactory;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.List;
-
public class LatestEntryRoutingLogicTest {
-
- static ActorSystem system;
-
- @BeforeClass
- public static void setup() throws InterruptedException {
- system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- system = null;
- }
-
- @Test
- public void testRoutingLogic() {
- List<Pair<ActorRef, Long>> pairList = new ArrayList<>();
- TestProbe probe1 = new TestProbe(system);
- TestProbe probe2 = new TestProbe(system);
- TestProbe probe3 = new TestProbe(system);
- ActorRef actor1 = probe1.ref();
- ActorRef actor2 = probe2.ref();
- ActorRef actor3 = probe3.ref();
- pairList.add(new Pair<>(actor1, 1000L));
- pairList.add(new Pair<>(actor2, 3000L));
- pairList.add(new Pair<>(actor3, 2000L));
- RoutingLogic logic = new LatestEntryRoutingLogic(pairList);
- assertTrue(logic.select().equals(actor2));
- }
+ static ActorSystem system;
+
+ @BeforeClass
+ public static void setup() throws InterruptedException {
+ system = ActorSystem.create("opendaylight-rpc", ConfigFactory.load().getConfig("odl-cluster-rpc"));
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(system);
+ system = null;
+ }
+
+ @Test
+ public void testRoutingLogic() {
+ List<Pair<ActorRef, Long>> pairList = new ArrayList<>();
+ TestProbe probe1 = new TestProbe(system);
+ TestProbe probe2 = new TestProbe(system);
+ TestProbe probe3 = new TestProbe(system);
+ ActorRef actor1 = probe1.ref();
+ ActorRef actor2 = probe2.ref();
+ ActorRef actor3 = probe3.ref();
+ pairList.add(new Pair<>(actor1, 1000L));
+ pairList.add(new Pair<>(actor2, 3000L));
+ pairList.add(new Pair<>(actor3, 2000L));
+ RoutingLogic logic = new LatestEntryRoutingLogic(pairList);
+ assertTrue(logic.select().equals(actor2));
+ }
}