/* * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.clustering.it.provider; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.sharding.DistributedShardFactory; import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService; import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService; import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener; import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler; import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler; import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask; import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService; import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService; import org.opendaylight.controller.clustering.it.provider.impl.WriteTransactionsHandler; import org.opendaylight.controller.clustering.it.provider.impl.YnlListener; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.NotificationService; import org.opendaylight.controller.md.sal.dom.api.DOMDataBroker; import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration; import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; import org.opendaylight.controller.sal.core.api.model.SchemaService; import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer; import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier; import org.opendaylight.mdsal.dom.api.DOMDataTreeListener; import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException; import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException; import org.opendaylight.mdsal.dom.api.DOMDataTreeService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.AddShardReplicaInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CheckPublishNotificationsOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.IsClientAbortedOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.OdlMdsalLowlevelControlService; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterDefaultConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RegisterSingletonConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemoveShardReplicaInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterBoundConstantInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterFlappingSingletonOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeYnlOutput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsInput; import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.WriteTransactionsOutput; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MdsalLowLevelTestProvider implements OdlMdsalLowlevelControlService { private static final Logger LOG = LoggerFactory.getLogger(MdsalLowLevelTestProvider.class); private final RpcProviderRegistry rpcRegistry; private final BindingAwareBroker.RpcRegistration registration; private final DistributedShardFactory distributedShardFactory; private final DOMDataTreeService domDataTreeService; private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer; private final DOMDataBroker domDataBroker; private final NotificationPublishService notificationPublishService; private final NotificationService notificationService; private final SchemaService schemaService; private final ClusterSingletonServiceProvider singletonService; private final DOMRpcProviderService domRpcService; private final PrefixShardHandler prefixShardHandler; private Map, DOMRpcImplementationRegistration> routedRegistrations = new HashMap<>(); private Map> ynlRegistrations = new HashMap<>(); private DOMRpcImplementationRegistration globalGetConstantRegistration = null; private ClusterSingletonServiceRegistration getSingletonConstantRegistration; private FlappingSingletonService flappingSingletonService; private Map publishNotificationsTasks = new HashMap<>(); private ListenerRegistration ddtlReg; private IdIntsDOMDataTreeLIstener idIntsDdtl; public MdsalLowLevelTestProvider(final RpcProviderRegistry rpcRegistry, final DOMRpcProviderService domRpcService, final ClusterSingletonServiceProvider singletonService, final SchemaService schemaService, final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer, final NotificationPublishService notificationPublishService, final NotificationService notificationService, final DOMDataBroker domDataBroker, final DOMDataTreeService domDataTreeService, final DistributedShardFactory distributedShardFactory) { this.rpcRegistry = rpcRegistry; this.domRpcService = domRpcService; this.singletonService = singletonService; this.schemaService = schemaService; this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer; this.notificationPublishService = notificationPublishService; this.notificationService = notificationService; this.domDataBroker = domDataBroker; this.domDataTreeService = domDataTreeService; this.distributedShardFactory = distributedShardFactory; registration = rpcRegistry.addRpcImplementation(OdlMdsalLowlevelControlService.class, this); prefixShardHandler = new PrefixShardHandler(distributedShardFactory, bindingNormalizedNodeSerializer); } @Override public Future> unregisterSingletonConstant() { LOG.debug("unregister-singleton-constant"); if (getSingletonConstantRegistration == null) { LOG.debug("No get-singleton-constant registration present."); final RpcError rpcError = RpcResultBuilder .newError(ErrorType.APPLICATION, "missing-registration", "No get-singleton-constant rpc registration present."); final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); return Futures.immediateFuture(result); } try { getSingletonConstantRegistration.close(); getSingletonConstantRegistration = null; return Futures.immediateFuture(RpcResultBuilder.success().build()); } catch (final Exception e) { LOG.debug("There was a problem closing the singleton constant service", e); final RpcError rpcError = RpcResultBuilder .newError(ErrorType.APPLICATION, "error-closing", "There was a problem closing get-singleton-constant"); final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); return Futures.immediateFuture(result); } } @Override public Future> startPublishNotifications(final StartPublishNotificationsInput input) { LOG.debug("publish-notifications, input: {}", input); final PublishNotificationsTask task = new PublishNotificationsTask(notificationPublishService, input.getId(), input.getSeconds(), input.getNotificationsPerSecond()); publishNotificationsTasks.put(input.getId(), task); task.start(); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> subscribeDtcl() { return null; } @Override public Future> writeTransactions(final WriteTransactionsInput input) { LOG.debug("write-transactions, input: {}", input); final WriteTransactionsHandler writeTransactionsHandler = new WriteTransactionsHandler(domDataBroker, input); final SettableFuture> settableFuture = SettableFuture.create(); writeTransactionsHandler.start(settableFuture); return settableFuture; } @Override public Future> isClientAborted() { return null; } @Override public Future> removeShardReplica(RemoveShardReplicaInput input) { return null; } @Override public Future> subscribeYnl(final SubscribeYnlInput input) { LOG.debug("subscribe-ynl, input: {}", input); if (ynlRegistrations.containsKey(input.getId())) { final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", "There is already ynl listener registered for this id: " + input.getId()); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } ynlRegistrations.put(input.getId(), notificationService.registerNotificationListener(new YnlListener(input.getId()))); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> removePrefixShard(final RemovePrefixShardInput input) { LOG.debug("remove-prefix-shard, input: {}", input); return prefixShardHandler.onRemovePrefixShard(input); } @Override public Future> becomePrefixLeader(final BecomePrefixLeaderInput input) { return null; } @Override public Future> unregisterBoundConstant(final UnregisterBoundConstantInput input) { LOG.debug("unregister-bound-constant, {}", input); final DOMRpcImplementationRegistration registration = routedRegistrations.remove(input.getContext()); if (registration == null) { LOG.debug("No get-contexted-constant registration for context: {}", input.getContext()); final RpcError rpcError = RpcResultBuilder .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present."); final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); return Futures.immediateFuture(result); } registration.close(); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> registerSingletonConstant(final RegisterSingletonConstantInput input) { LOG.debug("Received register-singleton-constant rpc, input: {}", input); if (input.getConstant() == null) { final RpcError error = RpcResultBuilder.newError( ErrorType.RPC, "Invalid input.", "Constant value is null"); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } getSingletonConstantRegistration = SingletonGetConstantService.registerNew(singletonService, domRpcService, input.getConstant()); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> registerDefaultConstant(RegisterDefaultConstantInput input) { return null; } @Override public Future> unregisterConstant() { if (globalGetConstantRegistration == null) { final RpcError rpcError = RpcResultBuilder .newError(ErrorType.APPLICATION, "missing-registration", "No get-constant rpc registration present."); final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); return Futures.immediateFuture(result); } globalGetConstantRegistration.close(); globalGetConstantRegistration = null; return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> unregisterFlappingSingleton() { LOG.debug("unregister-flapping-singleton received."); if (flappingSingletonService == null) { final RpcError rpcError = RpcResultBuilder .newError(ErrorType.APPLICATION, "missing-registration", "No flapping-singleton registration present."); final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); return Futures.immediateFuture(result); } final long flapCount = flappingSingletonService.setInactive(); flappingSingletonService = null; final UnregisterFlappingSingletonOutput output = new UnregisterFlappingSingletonOutputBuilder().setFlapCount(flapCount).build(); return Futures.immediateFuture(RpcResultBuilder.success(output).build()); } @Override public Future> addShardReplica(AddShardReplicaInput input) { return null; } @Override public Future> subscribeDdtl() { if (ddtlReg != null) { final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", "There is already dataTreeChangeListener registered on id-ints list."); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } idIntsDdtl = new IdIntsDOMDataTreeLIstener(); try { ddtlReg = domDataTreeService.registerListener(idIntsDdtl, Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, ProduceTransactionsHandler.ID_INTS_YID)) , true, Collections.emptyList()); } catch (DOMDataTreeLoopException e) { LOG.error("Failed to register DOMDataTreeListener.", e); } return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> registerBoundConstant(final RegisterBoundConstantInput input) { LOG.debug("register-bound-constant: {}", input); if (input.getContext() == null) { final RpcError error = RpcResultBuilder.newError( ErrorType.RPC, "Invalid input.", "Context value is null"); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } if (input.getConstant() == null) { final RpcError error = RpcResultBuilder.newError( ErrorType.RPC, "Invalid input.", "Constant value is null"); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } if (routedRegistrations.containsKey(input.getContext())) { final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", "There is already a rpc registered for context: " + input.getContext()); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } final DOMRpcImplementationRegistration registration = RoutedGetConstantService.registerNew(bindingNormalizedNodeSerializer, domRpcService, input.getConstant(), input.getContext()); routedRegistrations.put(input.getContext(), registration); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> registerFlappingSingleton() { LOG.debug("Received register-flapping-singleton."); if (flappingSingletonService != null) { final RpcError error = RpcResultBuilder.newError( ErrorType.RPC, "Registration present.", "flappin-singleton already registered"); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } flappingSingletonService = new FlappingSingletonService(singletonService); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> unsubscribeDtcl() { return null; } @Override public Future> createPrefixShard(final CreatePrefixShardInput input) { LOG.debug("create-prefix-shard, input: {}", input); return prefixShardHandler.onCreatePrefixShard(input); } @Override public Future> deconfigureIdIntsShard() { return null; } @Override public Future> unsubscribeYnl(final UnsubscribeYnlInput input) { LOG.debug("Received unsubscribe-ynl, input: {}", input); if (!ynlRegistrations.containsKey(input.getId())) { final RpcError rpcError = RpcResultBuilder .newError(ErrorType.APPLICATION, "missing-registration", "No ynl listener with this id registered."); final RpcResult result = RpcResultBuilder.failed().withRpcError(rpcError).build(); return Futures.immediateFuture(result); } final ListenerRegistration registration = ynlRegistrations.remove(input.getId()); final UnsubscribeYnlOutput output = registration.getInstance().getOutput(); registration.close(); return Futures.immediateFuture(RpcResultBuilder.success().withResult(output).build()); } @Override public Future> checkPublishNotifications( final CheckPublishNotificationsInput input) { final PublishNotificationsTask task = publishNotificationsTasks.get(input.getId()); if (task == null) { return Futures.immediateFuture(RpcResultBuilder.success( new CheckPublishNotificationsOutputBuilder().setActive(false)).build()); } final CheckPublishNotificationsOutputBuilder checkPublishNotificationsOutputBuilder = new CheckPublishNotificationsOutputBuilder().setActive(!task.isFinished()); if (task.getLastError() != null) { final StringWriter sw = new StringWriter(); final PrintWriter pw = new PrintWriter(sw); task.getLastError().printStackTrace(pw); checkPublishNotificationsOutputBuilder.setLastError(task.getLastError().toString() + sw.toString()); } final CheckPublishNotificationsOutput output = checkPublishNotificationsOutputBuilder.setPublishCount(task.getCurrentNotif()).build(); return Futures.immediateFuture(RpcResultBuilder.success(output).build()); } @Override public Future> produceTransactions(final ProduceTransactionsInput input) { LOG.debug("producer-transactions, input: {}", input); final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(domDataTreeService, input); final SettableFuture> settableFuture = SettableFuture.create(); handler.start(settableFuture); return settableFuture; } @Override public Future> registerConstant(final RegisterConstantInput input) { LOG.debug("Received register-constant rpc, input: {}", input); if (input.getConstant() == null) { final RpcError error = RpcResultBuilder.newError( ErrorType.RPC, "Invalid input.", "Constant value is null"); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } if (globalGetConstantRegistration != null) { final RpcError error = RpcResultBuilder.newError(ErrorType.RPC, "Registration present.", "There is already a get-constant rpc registered."); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } globalGetConstantRegistration = GetConstantService.registerNew(domRpcService, input.getConstant()); return Futures.immediateFuture(RpcResultBuilder.success().build()); } @Override public Future> unregisterDefaultConstant() { return null; } @Override public Future> unsubscribeDdtl() { LOG.debug("Received unsubscribe-ddtl."); if (idIntsDdtl == null || ddtlReg == null) { final RpcError error = RpcResultBuilder.newError( ErrorType.RPC, "Ddtl missing.", "No DOMDataTreeListener registered."); return Futures.immediateFuture(RpcResultBuilder.failed().withRpcError(error).build()); } ddtlReg.close(); ddtlReg = null; final ReadListener readListener = new ReadListener(); try { final ListenerRegistration registration = domDataTreeService.registerListener(readListener, Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, ProduceTransactionsHandler.ID_INTS_YID)) , true, Collections.emptyList()); final DataTreeCandidate dataTreeCandidate = readListener.getFirstNotif().get(); registration.close(); if (!dataTreeCandidate.getRootNode().getDataAfter().isPresent()) { final RpcError error = RpcResultBuilder.newError( ErrorType.APPLICATION, "Final read empty.", "No data read from id-ints list."); return Futures.immediateFuture(RpcResultBuilder.failed() .withRpcError(error).build()); } final NormalizedNode lastRead = dataTreeCandidate.getRootNode().getDataAfter().get(); return Futures.immediateFuture( RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder() .setCopyMatches(idIntsDdtl.checkEqual(lastRead))).build()); } catch (final DOMDataTreeLoopException | InterruptedException | ExecutionException e) { LOG.error("Unable to read data to verify ddtl data.", e); final RpcError error = RpcResultBuilder.newError( ErrorType.APPLICATION, "Read failed.", "Final read from id-ints failed."); return Futures.immediateFuture(RpcResultBuilder.failed() .withRpcError(error).build()); } } private static class ReadListener implements DOMDataTreeListener { private Collection changes = null; private SettableFuture readFuture; @Override public synchronized void onDataTreeChanged(@Nonnull final Collection changes, @Nonnull final Map> subtrees) { Preconditions.checkArgument(changes.size() == 1); if (this.changes == null) { this.changes = changes; readFuture.set(changes.iterator().next()); } } @Override public void onDataTreeFailed(@Nonnull final Collection causes) { LOG.error("Read Listener failed. {}", causes); } public synchronized ListenableFuture getFirstNotif() { if (changes != null) { return Futures.immediateFuture(changes.iterator().next()); } readFuture = SettableFuture.create(); return readFuture; } } }