- master invoke RPC and send back to slave.
Change-Id: Ica407f800da3d902f722d835bf6d658163c01bb5
Signed-off-by: Rudolf Brisuda <rbrisuda@cisco.com>
<artifactId>akka-testkit_2.11</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.yangtools</groupId>
+ <artifactId>yang-data-codec-xml</artifactId>
+ </dependency>
</dependencies>
</project>
.collect(Collectors.toList());
// send initial data to master actor and create actor for providing it
- return Patterns.ask(masterActorRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers),
- NetconfTopologyUtils.TIMEOUT);
+ return Patterns.ask(masterActorRef, new CreateInitialMasterActorData(deviceDataBroker, sourceIdentifiers,
+ deviceRpc), NetconfTopologyUtils.TIMEOUT);
}
private void updateDeviceData() {
/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
package org.opendaylight.netconf.topology.singleton.impl;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import com.google.common.base.Function;
import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.SettableFuture;
+import java.util.Collection;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
+import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
+import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.SchemaPathMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
public class ProxyDOMRpcService implements DOMRpcService {
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfTopologyManager.class);
+
+ private final ActorRef masterActorRef;
+ private final ActorSystem actorSystem;
+ private final RemoteDeviceId id;
+
+ public ProxyDOMRpcService(final ActorSystem actorSystem, final ActorRef masterActorRef,
+ final RemoteDeviceId remoteDeviceId) {
+ this.actorSystem = actorSystem;
+ this.masterActorRef = masterActorRef;
+ id = remoteDeviceId;
+ }
+
@Nonnull
@Override
public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type,
@Nullable final NormalizedNode<?, ?> input) {
- throw new UnsupportedOperationException("InvokeRpc: DOMRpc service not working in cluster.");
+ LOG.trace("{}: Rpc operation invoked with schema type: {} and node: {}.", id, type, input);
+
+ final NormalizedNodeMessage normalizedNodeMessage =
+ new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY, input);
+ final Future<Object> scalaFuture =
+ Patterns.ask(masterActorRef,
+ new InvokeRpcMessage(new SchemaPathMessage(type), normalizedNodeMessage),
+ NetconfTopologyUtils.TIMEOUT);
+
+ final SettableFuture<DOMRpcResult> settableFuture = SettableFuture.create();
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> checkedFuture = Futures.makeChecked(settableFuture,
+ new Function<Exception, DOMRpcException>() {
+
+ @Nullable
+ @Override
+ public DOMRpcException apply(@Nullable final Exception exception) {
+ return new ClusteringRpcException(id + ": Exception during remote rpc invocation.",
+ exception);
+ }
+ });
+
+ scalaFuture.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) throws Throwable {
+ if (failure != null) {
+ settableFuture.setException(failure);
+ return;
+ }
+ if (success instanceof Throwable) {
+ settableFuture.setException((Throwable) success);
+ return;
+ }
+ if (success instanceof EmptyResultResponse || success == null) {
+ settableFuture.set(null);
+ return;
+ }
+ final Collection<RpcError> errors = ((InvokeRpcMessageReply) success).getRpcErrors();
+ final NormalizedNodeMessage normalizedNodeMessageResult =
+ ((InvokeRpcMessageReply) success).getNormalizedNodeMessage();
+ final DOMRpcResult result;
+ if (normalizedNodeMessageResult == null ){
+ result = new DefaultDOMRpcResult(errors);
+ } else {
+ if (errors == null) {
+ result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode());
+ } else {
+ result = new DefaultDOMRpcResult(normalizedNodeMessageResult.getNode(), errors);
+ }
+ }
+ settableFuture.set(result);
+ }
+ }, actorSystem.dispatcher());
+
+ return checkedFuture;
+
}
@Nonnull
@Override
public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(
@Nonnull final T listener) {
+ // NOOP, only proxy
throw new UnsupportedOperationException("RegisterRpcListener: DOMRpc service not working in cluster.");
}
}
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitReply;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.RemoteOperationTxProcessor;
import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
+import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ExistsRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.TransactionRequest;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaContextFactory;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaResolutionException;
private RemoteOperationTxProcessor operationsProcessor;
private List<SourceIdentifier> sourceIdentifiers;
+ private DOMRpcService deviceRpc;
private SlaveSalFacade slaveSalManager;
public static Props props(final NetconfTopologySetup setup,
operationsProcessor =
new RemoteOperationTxProcessorImpl(((CreateInitialMasterActorData) message).getDeviceDataBroker(),
id);
+ this.deviceRpc = ((CreateInitialMasterActorData) message).getDeviceRpc();
+
sender().tell(new MasterActorDataInitialized(), self());
LOG.debug("{}: Master is ready.", id);
final YangTextSchemaSourceRequest yangTextSchemaSourceRequest = (YangTextSchemaSourceRequest) message;
sendYangTextSchemaSourceProxy(yangTextSchemaSourceRequest.getSourceIdentifier(), sender());
+ } else if (message instanceof InvokeRpcMessage) {
+
+ final InvokeRpcMessage invokeRpcMessage = ((InvokeRpcMessage) message);
+ invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
+
} else if (message instanceof RegisterMountPoint) { //slaves
sourceIdentifiers = ((RegisterMountPoint) message).getSourceIndentifiers();
});
}
- private void registerSlaveMountPoint(final ActorRef masterReference) {
+ private void invokeSlaveRpc(final SchemaPath schemaPath, final NormalizedNodeMessage normalizedNodeMessage,
+ final ActorRef recipient) {
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> rpcResult =
+ deviceRpc.invokeRpc(schemaPath, normalizedNodeMessage.getNode());
+
+ Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
+ @Override
+ public void onSuccess(@Nullable final DOMRpcResult domRpcResult) {
+ if (domRpcResult == null) {
+ recipient.tell(new EmptyResultResponse(), getSender());
+ return;
+ }
+ NormalizedNodeMessage nodeMessageReply = null;
+ if (domRpcResult.getResult() != null) {
+ nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
+ domRpcResult.getResult());
+ }
+ recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
+ }
+
+ @Override
+ public void onFailure(@Nonnull final Throwable throwable) {
+ recipient.tell(throwable, getSelf());
+ }
+ });
+ }
+
+ private void registerSlaveMountPoint(ActorRef masterReference) {
if (this.slaveSalManager != null) {
slaveSalManager.close();
}
final CheckedFuture<SchemaContext, SchemaResolutionException> remoteSchemaContext =
getSchemaContext(masterReference);
- final DOMRpcService deviceRpc = getDOMRpcService();
+ final DOMRpcService deviceRpc = getDOMRpcService(masterReference);
Futures.addCallback(remoteSchemaContext, new FutureCallback<SchemaContext>() {
@Override
});
}
- private DOMRpcService getDOMRpcService() {
- return new ProxyDOMRpcService();
+ private DOMRpcService getDOMRpcService(ActorRef masterReference) {
+ return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id);
}
private CheckedFuture<SchemaContext, SchemaResolutionException> getSchemaContext(ActorRef masterReference) {
import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
-import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.CancelRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.DeleteRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyReadResponse;
import org.opendaylight.netconf.topology.singleton.messages.transactions.MergeRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.PutRequest;
import org.opendaylight.netconf.topology.singleton.messages.transactions.ReadRequest;
+import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitFailedReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.SubmitRequest;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
promise.success(Optional.absent());
return;
}
-
promise.success(Optional.of((NormalizedNodeMessage) success));
}
}, actorSystem.dispatcher());
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.impl.utils;
+
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+
+public class ClusteringRpcException extends DOMRpcException {
+ public ClusteringRpcException(String message) {
+ super(message);
+ }
+
+ public ClusteringRpcException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
import java.io.Serializable;
import java.util.List;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
/**
private final DOMDataBroker deviceDataBroker;
private final List<SourceIdentifier> allSourceIdentifiers;
+ private final DOMRpcService deviceRpc;
public CreateInitialMasterActorData(final DOMDataBroker deviceDataBroker,
- final List<SourceIdentifier> allSourceIdentifiers) {
+ final List<SourceIdentifier> allSourceIdentifiers,
+ final DOMRpcService deviceRpc) {
this.deviceDataBroker = deviceDataBroker;
this.allSourceIdentifiers = allSourceIdentifiers;
+ this.deviceRpc = deviceRpc;
}
public DOMDataBroker getDeviceDataBroker() {
public List<SourceIdentifier> getSourceIndentifiers() {
return allSourceIdentifiers;
}
+
+ public DOMRpcService getDeviceRpc() {
+ return deviceRpc;
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages;
+
+import com.google.common.collect.Iterables;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class SchemaPathMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private SchemaPath schemaPath;
+
+ public SchemaPathMessage(final SchemaPath schemaPath) {
+ this.schemaPath = schemaPath;
+ }
+
+ public SchemaPath getSchemaPath() {
+ return schemaPath;
+ }
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 2L;
+
+ private SchemaPathMessage schemaPathMessage;
+
+ public Proxy() {
+ //due to Externalizable
+ }
+
+ Proxy(final SchemaPathMessage schemaPathMessage) {
+ this.schemaPathMessage = schemaPathMessage;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeInt(Iterables.size(schemaPathMessage.getSchemaPath().getPathTowardsRoot()));
+
+ for (final QName qName : schemaPathMessage.getSchemaPath().getPathTowardsRoot()) {
+ out.writeObject(qName);
+ }
+
+ out.writeBoolean(schemaPathMessage.getSchemaPath().isAbsolute());
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ final int sizePath = in.readInt();
+ final QName[] paths = new QName[sizePath];
+ for (int i = 0; i < sizePath; i++) {
+ paths[i] = (QName) in.readObject();
+ }
+ final boolean absolute = in.readBoolean();
+ schemaPathMessage = new SchemaPathMessage(SchemaPath.create(absolute, paths));
+ }
+
+ private Object readResolve() {
+ return schemaPathMessage;
+ }
+ }
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.rpc;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.netconf.topology.singleton.messages.SchemaPathMessage;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
+
+public class InvokeRpcMessage implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final SchemaPathMessage schemaPathMessage;
+ private final NormalizedNodeMessage normalizedNodeMessage;
+
+ public InvokeRpcMessage(final SchemaPathMessage schemaPathMessage,
+ final NormalizedNodeMessage normalizedNodeMessage) {
+ this.schemaPathMessage = schemaPathMessage;
+ this.normalizedNodeMessage = normalizedNodeMessage;
+ }
+ private SchemaPathMessage getSchemaPathMessage() {
+ return schemaPathMessage;
+ }
+
+ public SchemaPath getSchemaPath() {
+ return schemaPathMessage.getSchemaPath();
+ }
+
+ public NormalizedNodeMessage getNormalizedNodeMessage() {
+ return normalizedNodeMessage;
+ }
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 2L;
+
+ private InvokeRpcMessage invokeRpcMessage;
+
+ public Proxy() {
+ //due to Externalizable
+ }
+
+ Proxy(final InvokeRpcMessage invokeRpcMessage) {
+ this.invokeRpcMessage = invokeRpcMessage;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeObject(invokeRpcMessage.getSchemaPathMessage());
+ out.writeObject(invokeRpcMessage.getNormalizedNodeMessage());
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ invokeRpcMessage = new InvokeRpcMessage((SchemaPathMessage) in.readObject(),
+ (NormalizedNodeMessage) in.readObject());
+ }
+
+ private Object readResolve() {
+ return invokeRpcMessage;
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.rpc;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.LinkedList;
+import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
+import org.opendaylight.yangtools.yang.common.RpcError;
+
+public class InvokeRpcMessageReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final Collection<RpcError> rpcErrors;
+ private final NormalizedNodeMessage normalizedNodeMessage;
+
+ public InvokeRpcMessageReply(final NormalizedNodeMessage normalizedNodeMessage,
+ final Collection<RpcError> rpcErrors) {
+ this.normalizedNodeMessage = normalizedNodeMessage;
+ this.rpcErrors = rpcErrors;
+ }
+
+ public NormalizedNodeMessage getNormalizedNodeMessage() {
+ return normalizedNodeMessage;
+ }
+
+ public Collection<RpcError> getRpcErrors() {
+ return rpcErrors;
+ }
+
+ private Object writeReplace() {
+ return new Proxy(this);
+ }
+
+ private static class Proxy implements Externalizable {
+ private static final long serialVersionUID = 2L;
+
+ private InvokeRpcMessageReply invokeRpcMessageReply;
+
+ public Proxy() {
+ //due to Externalizable
+ }
+
+ Proxy(final InvokeRpcMessageReply invokeRpcMessageReply) {
+ this.invokeRpcMessageReply = invokeRpcMessageReply;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(invokeRpcMessageReply.getRpcErrors().size());
+ for (final RpcError rpcError : invokeRpcMessageReply.getRpcErrors()) {
+ out.writeObject(rpcError);
+ }
+ out.writeObject(invokeRpcMessageReply.getNormalizedNodeMessage());
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ final int size = in.readInt();
+ final Collection<RpcError> rpcErrors = new LinkedList<>();
+ for (int i = 0; i < size; i++) {
+ rpcErrors.add((RpcError) in.readObject());
+ }
+ final NormalizedNodeMessage normalizedNodeMessage = (NormalizedNodeMessage) in.readObject();
+ invokeRpcMessageReply = new InvokeRpcMessageReply(normalizedNodeMessage, rpcErrors);
+ }
+
+ private Object readResolve() {
+ return invokeRpcMessageReply;
+ }
+ }
+
+}
*/
public class EmptyReadResponse implements Serializable {
private static final long serialVersionUID = 1L;
+
}
--- /dev/null
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.netconf.topology.singleton.messages.transactions;
+
+import java.io.Serializable;
+
+/**
+ * Message is sended when RPC result does not exist or is empty
+ */
+public class EmptyResultResponse implements Serializable {
+ private static final long serialVersionUID = 1L;
+}
+
package org.opendaylight.netconf.topology.singleton.impl;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.MockitoAnnotations.initMocks;
import static org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologyUtils.DEFAULT_SCHEMA_REPOSITORY;
import akka.actor.ActorContext;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.mockito.Mock;
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.spi.DefaultDOMRpcResult;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.impl.actors.NetconfNodeActor;
+import org.opendaylight.netconf.topology.singleton.impl.utils.ClusteringRpcException;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
import org.opendaylight.netconf.topology.singleton.messages.RefreshSetupMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.opendaylight.yangtools.yang.model.repo.api.MissingSchemaSourceException;
+import org.opendaylight.yangtools.yang.model.repo.api.RevisionSourceIdentifier;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaSourceException;
import org.opendaylight.yangtools.yang.model.repo.api.SourceIdentifier;
private ActorRef masterRef;
private RemoteDeviceId remoteDeviceId;
+ @Mock
+ private DOMRpcService domRpcService;
+
@Before
public void setup() throws UnknownHostException {
-
+ initMocks
+ (this);
remoteDeviceId = new RemoteDeviceId("netconf-topology",
new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9999));
-
final NetconfTopologySetup setup = mock(NetconfTopologySetup.class);
final Props props = NetconfNodeActor.props(setup, remoteDeviceId, DEFAULT_SCHEMA_REPOSITORY,
/* Test init master data */
final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers),
- TIMEOUT);
+ Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
+ domRpcService), TIMEOUT);
final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
assertTrue(success instanceof MasterActorDataInitialized);
final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
final List<SourceIdentifier> sourceIdentifiers =
- Lists.newArrayList(SourceIdentifier.create("testID", Optional.absent()));
+ Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent()));
// init master data
final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers),
- TIMEOUT);
+ Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
+ domRpcService), TIMEOUT);
final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
@Test
public void testYangTextSchemaSourceRequestMessage() throws Exception {
final SchemaRepository schemaRepository = mock(SchemaRepository.class);
- final SourceIdentifier sourceIdentifier = SourceIdentifier.create("testID", Optional.absent());
+ final SourceIdentifier sourceIdentifier = RevisionSourceIdentifier.create("testID", Optional.absent());
final Props props = NetconfNodeActor.props(mock(NetconfTopologySetup.class), remoteDeviceId,
DEFAULT_SCHEMA_REPOSITORY, schemaRepository);
}
+ @Test
+ public void testProxyDOMRpcService() throws Exception {
+
+ final DOMDataBroker domDataBroker = mock(DOMDataBroker.class);
+ final List<SourceIdentifier> sourceIdentifiers =
+ Lists.newArrayList(RevisionSourceIdentifier.create("testID", Optional.absent()));
+
+ // init master data
+
+ final Future<Object> initialDataToActor =
+ Patterns.ask(masterRef, new CreateInitialMasterActorData(domDataBroker, sourceIdentifiers,
+ domRpcService), TIMEOUT);
+
+ final Object successInit = Await.result(initialDataToActor, TIMEOUT.duration());
+
+ assertTrue(successInit instanceof MasterActorDataInitialized);
+
+ // test if slave get right identifiers from master
+
+ final ProxyDOMRpcService slaveDomRPCService = new ProxyDOMRpcService(system, masterRef, remoteDeviceId);
+
+ final SchemaPath schemaPath = SchemaPath.create(true, QName.create("TestQname"));
+ final NormalizedNode<?, ?> outputNode = ImmutableContainerNodeBuilder.create()
+ .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(QName.create("TestQname")))
+ .withChild(ImmutableNodes.leafNode(QName.create("NodeQname"), "foo")).build();
+ final RpcError rpcError = RpcResultBuilder.newError(RpcError.ErrorType.RPC, null, "Rpc invocation failed.");
+ // EmptyResultResponse message
+
+ doReturn(Futures.immediateCheckedFuture(null)).when(domRpcService).invokeRpc(any(), any());
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureEmpty =
+ slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+ final Object resultNull = resultFutureEmpty.checkedGet(2, TimeUnit.SECONDS);
+
+ assertEquals(null, resultNull);
+
+ // InvokeRpcMessageReply message
+
+ doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode))).
+ when(domRpcService).invokeRpc(any(), any());
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureNn =
+ slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+ final DOMRpcResult resultNn = resultFutureNn.checkedGet(2, TimeUnit.SECONDS);
+
+ assertEquals(outputNode, resultNn.getResult());
+ assertTrue(resultNn.getErrors().isEmpty());
+
+ // InvokeRpcMessageReply message only error
+
+ doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(rpcError)))
+ .when(domRpcService).invokeRpc(any(), any());
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureError =
+ slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+ final DOMRpcResult resultError = resultFutureError.checkedGet(2, TimeUnit.SECONDS);
+
+ assertNull(resultError.getResult());
+ assertEquals(rpcError, resultError.getErrors().iterator().next());
+
+ // InvokeRpcMessageReply message error + result
+
+ doReturn(Futures.immediateCheckedFuture(new DefaultDOMRpcResult(outputNode, rpcError)))
+ .when(domRpcService).invokeRpc(any(), any());
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureOutputError =
+ slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+ final DOMRpcResult resultOutputError = resultFutureOutputError.checkedGet(2, TimeUnit.SECONDS);
+
+ assertEquals(outputNode, resultOutputError.getResult());
+ assertEquals(rpcError, resultOutputError.getErrors().iterator().next());
+
+ // Throwable message
+
+ exception.expect(DOMRpcException.class);
+
+ doReturn(Futures.immediateFailedCheckedFuture(new ClusteringRpcException("")))
+ .when(domRpcService).invokeRpc(any(), any());
+
+ final CheckedFuture<DOMRpcResult, DOMRpcException> resultFutureThrowable =
+ slaveDomRPCService.invokeRpc(schemaPath, outputNode);
+
+ resultFutureThrowable.checkedGet(2, TimeUnit.SECONDS);
+
+ }
+
private String convertStreamToString(java.io.InputStream is) {
java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A");
return s.hasNext() ? s.next() : "";
@Override
public NetconfConnectorDTO createDeviceCommunicator(final NodeId nodeId, final NetconfNode node,
final ActorRef deviceContextActorRef) {
+
final NetconfConnectorDTO connectorDTO = new NetconfConnectorDTO(communicator, salFacade);
doReturn(Futures.immediateCheckedFuture(null)).when(communicator).initializeRemoteConnection(any(), any());
-
return connectorDTO;
}
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
@Mock
private DOMDataReadOnlyTransaction readTx;
+ @Mock
+ private DOMRpcService domRpcService;
+
@Before
public void setup() throws UnknownHostException {
initMocks(this);
private void initializeDataTest() throws Exception {
final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers),
- TIMEOUT);
+ Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+ domRpcService), TIMEOUT);
final Object success = Await.result(initialDataToActor, TIMEOUT.duration());
import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker;
import org.opendaylight.controller.md.sal.dom.api.DOMDataReadOnlyTransaction;
import org.opendaylight.controller.md.sal.dom.api.DOMDataWriteTransaction;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
import org.opendaylight.netconf.topology.singleton.api.NetconfDOMTransaction;
import org.opendaylight.netconf.topology.singleton.impl.NetconfDOMDataBroker;
@Mock
private DOMDataWriteTransaction writeTx;
+ @Mock
+ private DOMRpcService domRpcService;
+
@Before
public void setup() throws UnknownHostException {
initMocks(this);
private void initializeDataTest() throws Exception {
final Future<Object> initialDataToActor =
- Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers),
- TIMEOUT);
+ Patterns.ask(masterRef, new CreateInitialMasterActorData(masterDataBroker, sourceIdentifiers,
+ domRpcService), TIMEOUT);
final Object success = Await.result(initialDataToActor, TIMEOUT.duration());