import java.io.IOException;
import java.util.List;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
private RemoteOperationTxProcessor operationsProcessor;
private List<SourceIdentifier> sourceIdentifiers;
+ private DOMRpcService deviceRpc;
private SlaveSalFacade slaveSalManager;
public static Props props(final NetconfTopologySetup setup,
operationsProcessor =
new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
id);
+ this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
+
sender().tell(new MasterActorDataInitialized(), self());
LOG.debug("{}: Master is ready.", id);
final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
+ } else if (message instanceof InvokeRpcMessage) {
+
+ final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
+ invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
+
} else if (message instanceof RegisterMountPoint) { //slaves
sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
});
}
- private void registerSlaveMountPoint(final ActorRef masterReference) {
+ private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
+ final ActorRef recipient) {
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
+ deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
+
+ Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
+ @Override
+ public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
+ if (domRpcResult == null) {
+ recipient.tell(new EmptyResultResponse(), getSender());
+ return;
+ }
+ NormalizedNodeMessage nodeMessageReply = null;
+ if (domRpcResult.getResult() != null) {
+ nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
+ domRpcResult.getResult());
+ }
+ recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ recipient.tell(throwable, getSelf());
+ }
+ });
+ }
+
+ private void registerSlaveMountPoint(ActorRef masterReference) {
if (this.slaveSalManager != null) {
slaveSalManager.close();
}
final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
getSchemaContext(masterReference);
- final DOMRpcService deviceRpc = getDOMRpcService();
+ final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
@Override
});
}
- private DOMRpcService getDOMRpcService() {
- return new ProxyDOMRpcService();
+ private DOMRpcService getDOMRpcService(ActorRef masterReference) {
+ return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
}
private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {