import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.opendaylight.controller.cluster.schema.provider.RemoteYangTextSourceProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.RemoteSchemaProvider;
import org.opendaylight.controller.cluster.schema.provider.impl.YangTextSchemaSourceSerializationProxy;
+import org.opendaylight.mdsal.dom.api.DOMActionResult;
+import org.opendaylight.mdsal.dom.api.DOMActionService;
import org.opendaylight.mdsal.dom.api.DOMDataBroker;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteTransaction;
import org.opendaylight.mdsal.dom.api.DOMRpcResult;
import org.opendaylight.mdsal.dom.api.DOMRpcService;
import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId;
+import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMActionService;
import org.opendaylight.netconf.topology.singleton.impl.ProxyDOMRpcService;
import org.opendaylight.netconf.topology.singleton.impl.ProxyYangTextSourceProvider;
import org.opendaylight.netconf.topology.singleton.impl.SlaveSalFacade;
import org.opendaylight.netconf.topology.singleton.impl.utils.NetconfTopologySetup;
import org.opendaylight.netconf.topology.singleton.messages.AskForMasterMountPoint;
+import org.opendaylight.netconf.topology.singleton.messages.ContainerNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.CreateInitialMasterActorData;
import org.opendaylight.netconf.topology.singleton.messages.MasterActorDataInitialized;
import org.opendaylight.netconf.topology.singleton.messages.NormalizedNodeMessage;
import org.opendaylight.netconf.topology.singleton.messages.RegisterMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.UnregisterSlaveMountPoint;
import org.opendaylight.netconf.topology.singleton.messages.YangTextSchemaSourceRequest;
+import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessage;
+import org.opendaylight.netconf.topology.singleton.messages.action.InvokeActionMessageReply;
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessage;
import org.opendaylight.netconf.topology.singleton.messages.rpc.InvokeRpcMessageReply;
import org.opendaylight.netconf.topology.singleton.messages.transactions.EmptyResultResponse;
import org.opendaylight.netconf.topology.singleton.messages.transactions.NewWriteTransactionRequest;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.opendaylight.yangtools.yang.model.repo.api.EffectiveModelContextFactory;
import org.opendaylight.yangtools.yang.model.repo.api.SchemaRepository;
private NetconfTopologySetup setup;
private List<SourceIdentifier> sourceIdentifiers;
private DOMRpcService deviceRpc;
+ private DOMActionService deviceAction;
private SlaveSalFacade slaveSalManager;
private DOMDataBroker deviceDataBroker;
//readTxActor can be shared
final DOMDataTreeReadTransaction tx = deviceDataBroker.newReadOnlyTransaction();
readTxActor = context().actorOf(ReadTransactionActor.props(tx));
this.deviceRpc = masterActorData.getDeviceRpc();
+ this.deviceAction = masterActorData.getDeviceAction();
sender().tell(new MasterActorDataInitialized(), self());
LOG.debug("{}: Master is ready.", id);
- } else if (message instanceof RefreshSetupMasterActorData) {
+ } else if (message instanceof RefreshSetupMasterActorData) {
setup = ((RefreshSetupMasterActorData) message).getNetconfTopologyDeviceSetup();
id = ((RefreshSetupMasterActorData) message).getRemoteDeviceId();
sender().tell(new MasterActorDataInitialized(), self());
} else if (message instanceof AskForMasterMountPoint) { // master
- AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint)message;
+ AskForMasterMountPoint askForMasterMountPoint = (AskForMasterMountPoint) message;
// only master contains reference to deviceDataBroker
if (deviceDataBroker != null) {
LOG.debug("{}: Sending RegisterMountPoint reply to {}", id, askForMasterMountPoint.getSlaveActorRef());
askForMasterMountPoint.getSlaveActorRef().tell(new RegisterMountPoint(sourceIdentifiers, self()),
- sender());
+ sender());
} else {
LOG.warn("{}: Received {} but we don't appear to be the master", id, askForMasterMountPoint);
sender().tell(new Failure(new NotMasterException(self())), self());
} else if (message instanceof InvokeRpcMessage) { // master
final InvokeRpcMessage invokeRpcMessage = (InvokeRpcMessage) message;
invokeSlaveRpc(invokeRpcMessage.getSchemaPath(), invokeRpcMessage.getNormalizedNodeMessage(), sender());
-
+ } else if (message instanceof InvokeActionMessage) { // master
+ final InvokeActionMessage invokeActionMessage = (InvokeActionMessage) message;
+ LOG.info("InvokeActionMessage Details : {}", invokeActionMessage.toString());
+ invokeSlaveAction(invokeActionMessage.getSchemaPath(), invokeActionMessage.getContainerNodeMessage(),
+ invokeActionMessage.getDOMDataTreeIdentifier(), sender());
} else if (message instanceof RegisterMountPoint) { //slaves
- RegisterMountPoint registerMountPoint = (RegisterMountPoint)message;
+ RegisterMountPoint registerMountPoint = (RegisterMountPoint) message;
sourceIdentifiers = registerMountPoint.getSourceIndentifiers();
registerSlaveMountPoint(registerMountPoint.getMasterActorRef());
sender().tell(new Success(null), self());
setup = ((RefreshSlaveActor) message).getSetup();
schemaRepository = ((RefreshSlaveActor) message).getSchemaRepository();
}
-
}
@Override
LOG.debug("{}: invokeSlaveRpc for {}, input: {} on rpc service {}", id, schemaPath, normalizedNodeMessage,
deviceRpc);
- final ListenableFuture<DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
+ final ListenableFuture<? extends DOMRpcResult> rpcResult = deviceRpc.invokeRpc(schemaPath,
normalizedNodeMessage != null ? normalizedNodeMessage.getNode() : null);
Futures.addCallback(rpcResult, new FutureCallback<DOMRpcResult>() {
}
NormalizedNodeMessage nodeMessageReply = null;
if (domRpcResult.getResult() != null) {
- nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.EMPTY,
+ nodeMessageReply = new NormalizedNodeMessage(YangInstanceIdentifier.empty(),
domRpcResult.getResult());
}
recipient.tell(new InvokeRpcMessageReply(nodeMessageReply, domRpcResult.getErrors()), getSelf());
}, MoreExecutors.directExecutor());
}
+ /**
+ * Invoking Action on Slave Node in Odl Cluster Environment.
+ *
+ * @param schemaPath {@link SchemaPath}
+ * @param containerNodeMessage {@link ContainerNodeMessage}
+ * @param domDataTreeIdentifier {@link DOMDataTreeIdentifier}
+ * @param recipient {@link ActorRef}
+ */
+ private void invokeSlaveAction(final SchemaPath schemaPath, final ContainerNodeMessage containerNodeMessage,
+ final DOMDataTreeIdentifier domDataTreeIdentifier, final ActorRef recipient) {
+ LOG.info("{}: invokeSlaveAction for {}, input: {}, identifier: {} on action service {}", id, schemaPath,
+ containerNodeMessage, domDataTreeIdentifier, deviceAction);
+
+ final ListenableFuture<? extends DOMActionResult> actionResult = deviceAction.invokeAction(schemaPath,
+ domDataTreeIdentifier, containerNodeMessage != null ? containerNodeMessage.getNode() : null);
+
+ Futures.addCallback(actionResult, new FutureCallback<DOMActionResult>() {
+
+ @Override
+ public void onSuccess(final DOMActionResult domActionResult) {
+ LOG.debug("{}: invokeSlaveAction for {}, domActionResult: {}", id, schemaPath, domActionResult);
+ if (domActionResult == null) {
+ recipient.tell(new EmptyResultResponse(), getSender());
+ return;
+ }
+
+ //Check DomActionResult containing Ok onSuccess pass empty nodeMessageReply
+ ContainerNodeMessage nodeMessageReply = domActionResult.getOutput().map(ContainerNodeMessage::new)
+ .orElse(null);
+ recipient.tell(new InvokeActionMessageReply(nodeMessageReply, domActionResult.getErrors()), getSelf());
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ recipient.tell(new Failure(throwable), getSelf());
+ }
+ }, MoreExecutors.directExecutor());
+ }
+
private void registerSlaveMountPoint(final ActorRef masterReference) {
unregisterSlaveMountPoint();
resolveSchemaContext(createSchemaContextFactory(masterReference), slaveSalManager, masterReference, 1);
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
private DOMRpcService getDOMRpcService(final ActorRef masterReference) {
return new ProxyDOMRpcService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
}
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
+ private DOMActionService getDOMActionService(final ActorRef masterReference) {
+ return new ProxyDOMActionService(setup.getActorSystem(), masterReference, id, actorResponseWaitTime);
+ }
+
private EffectiveModelContextFactory createSchemaContextFactory(final ActorRef masterReference) {
final RemoteYangTextSourceProvider remoteYangTextSourceProvider =
new ProxyYangTextSourceProvider(masterReference, getContext().dispatcher(), actorResponseWaitTime);
final SlaveSalFacade localSlaveSalManager, final ActorRef masterReference, final int tries) {
final ListenableFuture<EffectiveModelContext> schemaContextFuture =
schemaContextFactory.createEffectiveModelContext(sourceIdentifiers);
- Futures.addCallback(schemaContextFuture, new FutureCallback<SchemaContext>() {
+ Futures.addCallback(schemaContextFuture, new FutureCallback<EffectiveModelContext>() {
@Override
- public void onSuccess(final SchemaContext result) {
+ public void onSuccess(final EffectiveModelContext result) {
executeInSelf(() -> {
// Make sure the slaveSalManager instance hasn't changed since we initiated the schema context
// resolution.
LOG.info("{}: Schema context resolved: {} - registering slave mount point",
id, result.getModules());
slaveSalManager.registerSlaveMountPoint(result, getDOMRpcService(masterReference),
- masterReference);
+ getDOMActionService(masterReference), masterReference);
}
});
}
registeredSchemas = null;
}
}
-}
+}
\ No newline at end of file