X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=netconf%2Fnetconf-topology-singleton%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fnetconf%2Ftopology%2Fsingleton%2Fimpl%2FProxyDOMRpcService.java;h=5c5fd4a77e56a31d942ae9a36682fe8351a9dfea;hb=6975248489c083942ec93131dac8e6eb95d66806;hp=c1c843014b00e395ac8697d77080b958dfbea9fe;hpb=dd80556b7d2d71f32cb3eeb4ed1489b520535eaf;p=netconf.git diff --git a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java index c1c843014b..5c5fd4a77e 100644 --- a/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java +++ b/netconf/netconf-topology-singleton/src/main/java/org/opendaylight/netconf/topology/singleton/impl/ProxyDOMRpcService.java @@ -1,37 +1,105 @@ /* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ - package org.opendaylight.netconf.topology.singleton.impl; -import com.google.common.util.concurrent.CheckedFuture; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcException; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult; -import org.opendaylight.controller.md.sal.dom.api.DOMRpcService; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.SettableFuture; +import java.util.Collection; +import org.opendaylight.mdsal.dom.api.DOMRpcAvailabilityListener; +import org.opendaylight.mdsal.dom.api.DOMRpcResult; +import org.opendaylight.mdsal.dom.api.DOMRpcService; +import org.opendaylight.mdsal.dom.spi.DefaultDOMRpcResult; +import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; +import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException; +import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage; +import org.opendaylight.netconf.topology.singleton.messages.SchemaPathMessage; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage; +import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply; +import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; public class ProxyDOMRpcService implements DOMRpcService { + private static final Logger LOG = LoggerFactory.getLogger(ProxyDOMRpcService.class); + + private final ActorRef masterActorRef; + private final ActorSystem actorSystem; + private final RemoteDeviceId id; + private final Timeout actorResponseWaitTime; + + public ProxyDOMRpcService(final ActorSystem actorSystem, final ActorRef masterActorRef, + final RemoteDeviceId remoteDeviceId, final Timeout actorResponseWaitTime) { + this.actorSystem = actorSystem; + this.masterActorRef = masterActorRef; + id = remoteDeviceId; + this.actorResponseWaitTime = actorResponseWaitTime; + } - @Nonnull @Override - public CheckedFuture invokeRpc(@Nonnull final SchemaPath type, - @Nullable final NormalizedNode input) { - throw new UnsupportedOperationException("InvokeRpc: DOMRpc service not working in cluster."); + public FluentFuture invokeRpc(final SchemaPath type, final NormalizedNode input) { + LOG.trace("{}: Rpc operation invoked with schema type: {} and node: {}.", id, type, input); + + final NormalizedNodeMessage normalizedNodeMessage = input != null + ? new NormalizedNodeMessage(YangInstanceIdentifier.empty(), input) : null; + final Future scalaFuture = Patterns.ask(masterActorRef, + new InvokeRpcMessage(new SchemaPathMessage(type), normalizedNodeMessage), actorResponseWaitTime); + + final SettableFuture settableFuture = SettableFuture.create(); + + scalaFuture.onComplete(new OnComplete() { + @Override + public void onComplete(final Throwable failure, final Object response) { + if (failure != null) { + if (failure instanceof ClusteringRpcException) { + settableFuture.setException(failure); + } else { + settableFuture.setException( + new ClusteringRpcException(id + ": Exception during remote rpc invocation.", failure)); + } + return; + } + + if (response instanceof EmptyResultResponse) { + settableFuture.set(null); + return; + } + + final Collection errors = ((InvokeRpcMessageReply) response).getRpcErrors(); + final NormalizedNodeMessage normalizedNodeMessageResult = + ((InvokeRpcMessageReply) response).getNormalizedNodeMessage(); + final DOMRpcResult result; + if (normalizedNodeMessageResult == null) { + result = new DefaultDOMRpcResult(ImmutableList.copyOf(errors)); + } else { + result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode(), errors); + } + settableFuture.set(result); + } + }, actorSystem.dispatcher()); + + return FluentFuture.from(settableFuture); } - @Nonnull @Override - public ListenerRegistration registerRpcListener( - @Nonnull final T listener) { + public ListenerRegistration registerRpcListener(final T listener) { + // NOOP, only proxy throw new UnsupportedOperationException("RegisterRpcListener: DOMRpc service not working in cluster."); } }