2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.bgpcep.programming.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.collect.Iterables;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.ListeningExecutorService;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import io.netty.util.Timer;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.Dictionary;
23 import java.util.HashMap;
24 import java.util.Hashtable;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.Executors;
28 import javax.annotation.Nonnull;
29 import javax.annotation.concurrent.GuardedBy;
30 import org.opendaylight.bgpcep.programming.spi.InstructionScheduler;
31 import org.opendaylight.controller.md.sal.binding.api.ClusteredDataTreeChangeListener;
32 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
33 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification;
34 import org.opendaylight.controller.md.sal.binding.api.DataObjectModification.ModificationType;
35 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
36 import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
37 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
38 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
39 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
40 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
41 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.OdlProgramming;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.OdlProgrammingBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.odl.programming.OdlProgrammingConfig;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.odl.programming.OdlProgrammingConfigBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.odl.programming.OdlProgrammingConfigKey;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.binding.DataObject;
49 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
50 import org.osgi.framework.BundleContext;
51 import org.osgi.framework.ServiceRegistration;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 public final class InstructionDeployerImpl implements IntructionDeployer,
56 ClusteredDataTreeChangeListener<OdlProgramming>, AutoCloseable {
57 private static final Logger LOG = LoggerFactory.getLogger(InstructionDeployerImpl.class);
59 private final RpcProviderRegistry rpcProviderRegistry;
60 private final ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
61 private final DataBroker dataProvider;
62 private final NotificationPublishService notifs;
63 private final Timer timer;
64 private final BundleContext bundleContext;
66 private final Map<String, ProgrammingServiceImpl> programmingServices = new HashMap<>();
67 private final ListenerRegistration<InstructionDeployerImpl> registration;
68 private final InstanceIdentifier<OdlProgramming> iid;
69 private final ClusterSingletonServiceProvider cssp;
72 class WriteConfiguration {
73 private final String instructionId;
75 WriteConfiguration(final String instructionId) {
76 this.instructionId = instructionId;
80 final OdlProgrammingConfig instruction = new OdlProgrammingConfigBuilder()
81 .setInstructionQueueId(this.instructionId).build();
82 final WriteTransaction wTx = InstructionDeployerImpl.this.dataProvider.newWriteOnlyTransaction();
83 wTx.put(LogicalDatastoreType.CONFIGURATION, InstructionDeployerImpl.this.iid.child(
84 OdlProgrammingConfig.class, new OdlProgrammingConfigKey(this.instructionId)), instruction, true);
87 LOG.debug("Instruction Instance {} initialized successfully.", WriteConfiguration.this.instructionId);
88 } catch (final ExecutionException |InterruptedException e) {
89 LOG.error("Failed to initialize Instruction Instance {}.", WriteConfiguration.this.instructionId, e);
93 ListenableFuture<Void> remove() {
94 final WriteTransaction wTx = InstructionDeployerImpl.this.dataProvider.newWriteOnlyTransaction();
95 wTx.delete(LogicalDatastoreType.CONFIGURATION, InstructionDeployerImpl.this.iid.child(
96 OdlProgrammingConfig.class, new OdlProgrammingConfigKey(this.instructionId)));
97 ListenableFuture<Void> futureRemove = wTx.submit();
98 Futures.addCallback(futureRemove, new FutureCallback<Void>() {
100 public void onSuccess(final Void result) {
101 LOG.debug("Instruction Instance {} removed successfully.", WriteConfiguration.this.instructionId);
105 public void onFailure(final Throwable t) {
106 LOG.error("Failed to remove Instruction Instance {}.", WriteConfiguration.this.instructionId, t);
108 }, MoreExecutors.directExecutor());
113 public InstructionDeployerImpl(final DataBroker dataProvider, final RpcProviderRegistry rpcProviderRegistry,
114 final NotificationPublishService notifs, final Timer timer, final ClusterSingletonServiceProvider cssp,
115 final BundleContext bundleContext) {
116 this.dataProvider = requireNonNull(dataProvider);
117 this.notifs = requireNonNull(notifs);
118 this.timer = requireNonNull(timer);
119 this.rpcProviderRegistry = requireNonNull(rpcProviderRegistry);
120 this.bundleContext = requireNonNull(bundleContext);
121 this.cssp = requireNonNull(cssp);
122 this.iid = InstanceIdentifier.create(OdlProgramming.class);
124 final WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
125 wTx.merge(LogicalDatastoreType.CONFIGURATION, this.iid, new OdlProgrammingBuilder()
126 .setOdlProgrammingConfig(Collections.emptyList()).build());
127 Futures.addCallback(wTx.submit(), new FutureCallback<Void>() {
129 public void onSuccess(final Void result) {
130 LOG.debug("Instruction Instance {} initialized successfully.", InstructionDeployerImpl.this.iid);
134 public void onFailure(final Throwable t) {
135 LOG.error("Failed to initialize Instruction Instance {}.", InstructionDeployerImpl.this.iid, t);
137 }, MoreExecutors.directExecutor());
139 this.registration = dataProvider.registerDataTreeChangeListener(
140 new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, this.iid), this);
144 InstanceIdentifier<OdlProgramming> getInstructionIID(){
149 private synchronized void createInstruction(final String instructionId,
150 final WriteConfiguration writeConfiguration) {
151 if (this.programmingServices.containsKey(instructionId)) {
152 LOG.warn("Instruction Scheduler {} already exist. New instance won't be created", instructionId);
155 LOG.debug("Creating Instruction Scheduler {}.", instructionId);
157 final ProgrammingServiceImpl programmingInst = new ProgrammingServiceImpl(this.dataProvider, this.notifs,
158 this.exec, this.rpcProviderRegistry, this.cssp, this.timer, instructionId, writeConfiguration);
159 this.programmingServices.put(instructionId, programmingInst);
160 final Dictionary<String, String> properties = new Hashtable<>();
161 properties.put(InstructionScheduler.class.getName(), instructionId);
162 final ServiceRegistration<?> serviceRegistration = this.bundleContext
163 .registerService(InstructionScheduler.class.getName(), programmingInst, properties);
164 programmingInst.setServiceRegistration(serviceRegistration);
167 private synchronized void removeInstruction(final String instructionId) {
168 final ProgrammingServiceImpl service = this.programmingServices.remove(instructionId);
169 if (service != null) {
170 LOG.debug("Closing Instruction Scheduler {}.", instructionId);
176 public void writeConfiguration(final String instructionId) {
177 createInstruction(instructionId, new WriteConfiguration(instructionId));
181 public void removeConfiguration(final String instructionId) {
182 removeInstruction(instructionId);
186 public synchronized void close() throws Exception {
187 this.registration.close();
188 this.exec.shutdown();
189 this.programmingServices.values().forEach(ProgrammingServiceImpl::close);
193 public synchronized void onDataTreeChanged(@Nonnull final Collection<DataTreeModification<OdlProgramming>> changes) {
194 final DataTreeModification<OdlProgramming> dataTreeModification = Iterables.getOnlyElement(changes);
195 final Collection<DataObjectModification<? extends DataObject>> rootNode = dataTreeModification.getRootNode()
196 .getModifiedChildren();
197 if (rootNode.isEmpty()) {
200 rootNode.forEach(dto->handleModification((DataObjectModification<OdlProgrammingConfig>) dto));
203 private void handleModification(final DataObjectModification<OdlProgrammingConfig> config) {
204 final ModificationType modificationType = config.getModificationType();
205 LOG.trace("Programming configuration has changed: {}, type modification {}", config, modificationType);
206 switch (modificationType) {
208 removeInstruction(config.getDataBefore().getInstructionQueueId());
210 case SUBTREE_MODIFIED:
212 createInstruction(config.getDataAfter().getInstructionQueueId(), null);