/* * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.netconf.sal.connect.netconf.sal; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; import org.opendaylight.mdsal.common.api.CommitInfo; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.common.api.TransactionCommitFailedException; import org.opendaylight.mdsal.dom.api.DOMRpcResult; import org.opendaylight.mdsal.dom.api.DOMRpcService; import org.opendaylight.netconf.api.DocumentedException; import org.opendaylight.netconf.api.ModifyAction; import org.opendaylight.netconf.api.NetconfDocumentedException; import org.opendaylight.netconf.dom.api.NetconfDataTreeService; import org.opendaylight.netconf.sal.connect.netconf.listener.NetconfSessionPreferences; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfBaseOps; import org.opendaylight.netconf.sal.connect.netconf.util.NetconfRpcFutureCallback; import org.opendaylight.netconf.sal.connect.util.RemoteDeviceId; import org.opendaylight.yangtools.rfc8528.data.api.MountPointContext; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NetconfDataTreeServiceImpl implements NetconfDataTreeService { private static final Logger LOG = LoggerFactory.getLogger(NetconfDataTreeServiceImpl.class); private final RemoteDeviceId id; private final NetconfBaseOps netconfOps; private final boolean rollbackSupport; private final boolean candidateSupported; private final boolean runningWritable; private boolean isLockAllowed = true; public NetconfDataTreeServiceImpl(final RemoteDeviceId id, final MountPointContext mountContext, final DOMRpcService rpc, final NetconfSessionPreferences netconfSessionPreferences) { this.id = id; this.netconfOps = new NetconfBaseOps(rpc, mountContext); // get specific attributes from netconf preferences and get rid of it // no need to keep the entire preferences object, its quite big with all the capability QNames candidateSupported = netconfSessionPreferences.isCandidateSupported(); runningWritable = netconfSessionPreferences.isRunningWritable(); rollbackSupport = netconfSessionPreferences.isRollbackSupported(); Preconditions.checkArgument(candidateSupported || runningWritable, "Device %s has advertised neither :writable-running nor :candidate capability." + "At least one of these should be advertised. Failed to establish a session.", id.getName()); } @Override public synchronized List> lock() { final List> resultsFutures = new ArrayList<>(); if (candidateSupported) { lockCandidate(resultsFutures); if (runningWritable) { lockRunning(resultsFutures); } } else { lockRunning(resultsFutures); } return resultsFutures; } @Override public synchronized void unlock() { if (candidateSupported) { unlockCandidate(); if (runningWritable) { unlockRunning(); } } else { unlockRunning(); } } /** * This has to be non blocking since it is called from a callback on commit * and its netty threadpool that is really sensitive to blocking calls. */ @Override public void discardChanges() { if (candidateSupported) { netconfOps.discardChanges(new NetconfRpcFutureCallback("Discarding candidate", id)); } } @Override public ListenableFuture>> get(YangInstanceIdentifier path) { return netconfOps.getData(new NetconfRpcFutureCallback("Data read", id), Optional.ofNullable(path)); } @Override public ListenableFuture>> getConfig(final YangInstanceIdentifier path) { return netconfOps.getConfigRunningData( new NetconfRpcFutureCallback("Data read", id), Optional.ofNullable(path)); } @Override public synchronized ListenableFuture merge(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data, final Optional defaultOperation) { checkEditable(store); final DataContainerChild editStructure = netconfOps.createEditConfigStrcture(Optional.ofNullable(data), Optional.of(ModifyAction.MERGE), path); return editConfig(defaultOperation, editStructure); } @Override public synchronized ListenableFuture replace( final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data, final Optional defaultOperation) { checkEditable(store); final DataContainerChild editStructure = netconfOps.createEditConfigStrcture(Optional.ofNullable(data), Optional.of(ModifyAction.REPLACE), path); return editConfig(defaultOperation, editStructure); } @Override public synchronized ListenableFuture create(final LogicalDatastoreType store, final YangInstanceIdentifier path, final NormalizedNode data, final Optional defaultOperation) { checkEditable(store); final DataContainerChild editStructure = netconfOps.createEditConfigStrcture(Optional.ofNullable(data), Optional.of(ModifyAction.CREATE), path); return editConfig(defaultOperation, editStructure); } @Override public synchronized ListenableFuture delete(final LogicalDatastoreType store, final YangInstanceIdentifier path) { final DataContainerChild editStructure = netconfOps.createEditConfigStrcture(Optional.empty(), Optional.of(ModifyAction.DELETE), path); return editConfig(Optional.empty(), editStructure); } @Override public synchronized ListenableFuture remove(final LogicalDatastoreType store, final YangInstanceIdentifier path) { final DataContainerChild editStructure = netconfOps.createEditConfigStrcture(Optional.empty(), Optional.of(ModifyAction.REMOVE), path); return editConfig(Optional.empty(), editStructure); } @Override public ListenableFuture commit( List> resultsFutures) { final SettableFuture resultFuture = SettableFuture.create(); Futures.addCallback(performCommit(resultsFutures), new FutureCallback<>() { @Override public void onSuccess(final RpcResult result) { if (!result.isSuccessful()) { final Collection errors = result.getErrors(); resultFuture.setException(new TransactionCommitFailedException( String.format("Commit of transaction %s failed", this), errors.toArray(new RpcError[errors.size()]))); return; } resultFuture.set(CommitInfo.empty()); } @Override public void onFailure(final Throwable failure) { resultFuture.setException(new TransactionCommitFailedException( String.format("Commit of transaction %s failed", this), failure)); } }, MoreExecutors.directExecutor()); return resultFuture; } @Override public Object getDeviceId() { return id; } void setLockAllowed(final boolean isLockAllowedOrig) { this.isLockAllowed = isLockAllowedOrig; } private ListenableFuture editConfig(final Optional defaultOperation, final DataContainerChild editStructure) { if (candidateSupported) { return editConfigCandidate(defaultOperation, editStructure); } else { return editConfigRunning(defaultOperation, editStructure); } } private ListenableFuture editConfigRunning(final Optional defaultOperation, final DataContainerChild editStructure) { final NetconfRpcFutureCallback callback = new NetconfRpcFutureCallback("Edit running", id); if (defaultOperation.isPresent()) { return netconfOps.editConfigRunning(callback, editStructure, defaultOperation.get(), rollbackSupport); } else { return netconfOps.editConfigRunning(callback, editStructure, rollbackSupport); } } private ListenableFuture editConfigCandidate(final Optional defaultOperation, final DataContainerChild editStructure) { final NetconfRpcFutureCallback callback = new NetconfRpcFutureCallback("Edit candidate", id); if (defaultOperation.isPresent()) { return netconfOps.editConfigCandidate(callback, editStructure, defaultOperation.get(), rollbackSupport); } else { return netconfOps.editConfigCandidate(callback, editStructure, rollbackSupport); } } private void lockRunning(List> resultsFutures) { if (isLockAllowed) { resultsFutures.add(netconfOps.lockRunning(new NetconfRpcFutureCallback("Lock running", id))); } else { LOG.trace("Lock is not allowed: {}", id); } } private void unlockRunning() { if (isLockAllowed) { netconfOps.unlockRunning(new NetconfRpcFutureCallback("Unlock running", id)); } else { LOG.trace("Unlock is not allowed: {}", id); } } private void lockCandidate(List> resultsFutures) { if (isLockAllowed) { resultsFutures.add(netconfOps.lockCandidate(new NetconfRpcFutureCallback("Lock candidate", id) { @Override public void onFailure(Throwable throwable) { super.onFailure(throwable); discardChanges(); } })); } else { LOG.trace("Lock is not allowed: {}", id); } } private void unlockCandidate() { if (isLockAllowed) { netconfOps.unlockCandidate(new NetconfRpcFutureCallback("Unlock candidate", id)); } else { LOG.trace("Unlock is not allowed: {}", id); } } private void checkEditable(final LogicalDatastoreType store) { checkArgument(store == LogicalDatastoreType.CONFIGURATION, "Can edit only configuration data, not %s", store); } private synchronized ListenableFuture> performCommit( final List> resultsFutures) { resultsFutures.add(netconfOps.commit(new NetconfRpcFutureCallback("Commit", id))); final ListenableFuture> result = resultsToStatus(id, resultsFutures); Futures.addCallback(result, new FutureCallback<>() { @Override public void onSuccess(final RpcResult result) { unlock(); } @Override public void onFailure(final Throwable throwable) { discardChanges(); unlock(); } }, MoreExecutors.directExecutor()); return result; } private static ListenableFuture> resultsToStatus( final RemoteDeviceId id, List> resultsFutures) { final SettableFuture> transformed = SettableFuture.create(); Futures.addCallback(Futures.allAsList(resultsFutures), new FutureCallback<>() { @Override public void onSuccess(final List domRpcResults) { if (!transformed.isDone()) { extractResult(domRpcResults, transformed, id); } } @Override public void onFailure(final Throwable throwable) { final NetconfDocumentedException exception = new NetconfDocumentedException( id + ":RPC during tx returned an exception" + throwable.getMessage(), new Exception(throwable), DocumentedException.ErrorType.APPLICATION, DocumentedException.ErrorTag.OPERATION_FAILED, DocumentedException.ErrorSeverity.ERROR); transformed.setException(exception); } }, MoreExecutors.directExecutor()); return transformed; } @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD", justification = "https://github.com/spotbugs/spotbugs/issues/811") private static void extractResult(final List domRpcResults, final SettableFuture> transformed, final RemoteDeviceId id) { DocumentedException.ErrorType errType = DocumentedException.ErrorType.APPLICATION; DocumentedException.ErrorSeverity errSeverity = DocumentedException.ErrorSeverity.ERROR; StringBuilder msgBuilder = new StringBuilder(); boolean errorsEncouneterd = false; String errorTag = "operation-failed"; for (final DOMRpcResult domRpcResult : domRpcResults) { if (!domRpcResult.getErrors().isEmpty()) { errorsEncouneterd = true; final RpcError error = domRpcResult.getErrors().iterator().next(); final RpcError.ErrorType errorType = error.getErrorType(); switch (errorType) { case RPC: errType = DocumentedException.ErrorType.RPC; break; case PROTOCOL: errType = DocumentedException.ErrorType.PROTOCOL; break; case TRANSPORT: errType = DocumentedException.ErrorType.TRANSPORT; break; case APPLICATION: default: errType = DocumentedException.ErrorType.APPLICATION; break; } final RpcError.ErrorSeverity severity = error.getSeverity(); switch (severity) { case WARNING: errSeverity = DocumentedException.ErrorSeverity.WARNING; break; case ERROR: default: errSeverity = DocumentedException.ErrorSeverity.ERROR; break; } msgBuilder.append(error.getMessage()); msgBuilder.append(error.getInfo()); errorTag = error.getTag(); } } if (errorsEncouneterd) { final NetconfDocumentedException exception = new NetconfDocumentedException(id + ":RPC during tx failed. " + msgBuilder.toString(), errType, DocumentedException.ErrorTag.from(errorTag), errSeverity); transformed.setException(exception); return; } transformed.set(RpcResultBuilder.success().build()); } }