package org.opendaylight.bgpcep.programming.impl;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.Timer;
+import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.Executors;
+import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
+import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
+import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
+import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
+import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.OdlProgramming;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.OdlProgrammingBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.odl.programming.OdlProgrammingConfig;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.odl.programming.OdlProgrammingConfigBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.odl.programming.OdlProgrammingConfigKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.InstructionsQueueKey;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class InstructionDeployedImpl implements IntructionDeployer, AutoCloseable {
+public final class InstructionDeployedImpl implements IntructionDeployer,
+ ClusteredDataTreeChangeListener<OdlProgramming>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(InstructionDeployedImpl.class);
private final RpcProviderRegistry rpcProviderRegistry;
private final BundleContext bundleContext;
@GuardedBy("this")
private final Map<String, ProgrammingServiceImpl> programmingServices = new HashMap<>();
+ private final ListenerRegistration<InstructionDeployedImpl> registration;
+ private final InstanceIdentifier<OdlProgramming> iid;
public InstructionDeployedImpl(final DataBroker dataProvider, final RpcProviderRegistry rpcProviderRegistry,
final NotificationProviderService notifs, final Timer timer, final BundleContext bundleContext) {
this.timer = Preconditions.checkNotNull(timer);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.bundleContext = Preconditions.checkNotNull(bundleContext);
+ this.iid = InstanceIdentifier.create(OdlProgramming.class);
+
+ final WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
+ wTx.merge(LogicalDatastoreType.CONFIGURATION, this.iid, new OdlProgrammingBuilder().build());
+ Futures.addCallback(wTx.submit(), new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Instruction Instance {} initialized successfully.", InstructionDeployedImpl.this.iid);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("Failed to initialize Instruction Instance {}.", InstructionDeployedImpl.this.iid, t);
+ }
+ });
+
+ this.registration = dataProvider.registerDataTreeChangeListener(
+ new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, this.iid), this);
}
- @Override
- public synchronized void createInstruction(final String instructionId) {
+ private synchronized void createInstruction(final String instructionId) {
if (this.programmingServices.containsKey(instructionId)) {
LOG.warn("Instruction Scheduler {} already exist. New instance won't be created", instructionId);
return;
programmingInst.setServiceRegistration(serviceRegistration);
}
- @Override
- public synchronized void removeInstruction(final String instructionId) {
+ private synchronized void removeInstruction(final String instructionId) {
final ProgrammingServiceImpl service = this.programmingServices.remove(instructionId);
if (service != null) {
LOG.debug("Closing Instruction Scheduler {}.", instructionId);
}
}
+ @Override
+ public ListenableFuture<Void> writeConfiguration(final String instructionId) {
+ final OdlProgrammingConfig instruction = new OdlProgrammingConfigBuilder()
+ .setInstructionQueueId(instructionId).build();
+ final WriteTransaction wTx = this.dataProvider.newWriteOnlyTransaction();
+ wTx.put(LogicalDatastoreType.CONFIGURATION, this.iid.child(OdlProgrammingConfig.class,
+ new OdlProgrammingConfigKey(instructionId)), instruction, true);
+ final CheckedFuture<Void, TransactionCommitFailedException> future = wTx.submit();
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Instruction Instance {} initialized successfully.", instructionId);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("Failed to initialize Instruction Instance {}.", instructionId, t);
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public ListenableFuture<Void> removeConfiguration(final String instructionId) {
+ final WriteTransaction wTx = this.dataProvider.newWriteOnlyTransaction();
+ wTx.delete(LogicalDatastoreType.CONFIGURATION, this.iid.child(OdlProgrammingConfig.class,
+ new OdlProgrammingConfigKey(instructionId)));
+ final CheckedFuture<Void, TransactionCommitFailedException> future = wTx.submit();
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(final Void result) {
+ LOG.debug("Instruction Instance {} removed successfully.", instructionId);
+ }
+
+ @Override
+ public void onFailure(final Throwable t) {
+ LOG.error("Failed to remove Instruction Instance {}.", instructionId, t);
+ }
+ });
+ return future;
+ }
+
@Override
public synchronized void close() throws Exception {
+ this.registration.close();
this.exec.shutdown();
this.programmingServices.values().forEach(ProgrammingServiceImpl::close);
}
+
+ @Override
+ public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<OdlProgramming>> changes) {
+ final DataTreeModification<OdlProgramming> dataTreeModification = Iterables.getOnlyElement(changes);
+ final DataObjectModification<OdlProgramming> rootNode = dataTreeModification.getRootNode();
+ rootNode.getModifiedChildren()
+ .forEach(dto->handleModification((DataObjectModification<OdlProgrammingConfig>) dto));
+ }
+
+ private void handleModification(final DataObjectModification<OdlProgrammingConfig> config) {
+ final ModificationType modificationType = config.getModificationType();
+ LOG.trace("Programming configuration has changed: {}, type modification {}", config, modificationType);
+ switch (modificationType) {
+ case DELETE:
+ removeInstruction(config.getDataBefore().getInstructionQueueId());
+ break;
+ case SUBTREE_MODIFIED:
+ case WRITE:
+ createInstruction(config.getDataAfter().getInstructionQueueId());
+ break;
+ default:
+ break;
+ }
+ }
}
import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
import org.opendaylight.controller.config.api.osgi.WaitingServiceTracker;
import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @deprecated Replaced by blueprint wiring
*/
public final class InstructionSchedulerImplModule extends
org.opendaylight.controller.config.yang.programming.impl.AbstractInstructionSchedulerImplModule {
+ private static final Logger LOG = LoggerFactory.getLogger(InstructionSchedulerImplModule.class);
private BundleContext bundleContext;
final String instructionId = getInstructionQueueId() != null ? getInstructionQueueId() :
getIdentifier().getInstanceName();
- intructionDeployer.createInstruction(instructionId);
+ try {
+ intructionDeployer.writeConfiguration(instructionId).get();
+ } catch (final Exception e) {
+ LOG.error("Failed to instantiate Instruction at {}", instructionId, e);
+ throw new IllegalStateException("Failed to instantiate provider", e);
+ }
final WaitingServiceTracker<InstructionScheduler> instructionSchedulerTracker = WaitingServiceTracker
.create(InstructionScheduler.class,
- this.bundleContext, "(" + InstructionScheduler.class.getName() + "=" + instructionId + ")");
+ this.bundleContext, "(" + InstructionScheduler.class.getName() + "=" + instructionId + ")");
final InstructionScheduler instructionScheduler = instructionSchedulerTracker
.waitForService(WaitingServiceTracker.FIVE_MINUTES);
@Override
protected Object handleInvocation(final Object proxy, final Method method, final Object[] args) throws Throwable {
if (method.getName().equals("close")) {
- intructionDeployer.removeInstruction(instructionId);
+ intructionDeployer.removeConfiguration(instructionId);
intructionDeployerTracker.close();
return null;
} else {