<feature name='odl-bgpcep-programming-api' version='${project.version}'>
<feature version='${project.version}'>odl-bgpcep-pcep-dependencies</feature>
<feature version='${config.version}'>odl-config-api</feature>
+ <bundle>mvn:org.opendaylight.mdsal/mdsal-singleton-common-api/{{VERSION}}</bundle>
<bundle>mvn:org.opendaylight.bgpcep/topology-api/{{VERSION}}</bundle>
<bundle>mvn:org.opendaylight.bgpcep/topology-tunnel-api/{{VERSION}}</bundle>
<bundle>mvn:org.opendaylight.bgpcep/programming-api/{{VERSION}}</bundle>
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.OdlProgramming;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.OdlProgrammingBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.odl.programming.OdlProgrammingConfig;
private final Map<String, ProgrammingServiceImpl> programmingServices = new HashMap<>();
private final ListenerRegistration<InstructionDeployedImpl> registration;
private final InstanceIdentifier<OdlProgramming> iid;
+ private final ClusterSingletonServiceProvider cssp;
public InstructionDeployedImpl(final DataBroker dataProvider, final RpcProviderRegistry rpcProviderRegistry,
- final NotificationProviderService notifs, final Timer timer, final BundleContext bundleContext) {
+ final NotificationProviderService notifs, final Timer timer, final ClusterSingletonServiceProvider cssp,
+ final BundleContext bundleContext) {
this.dataProvider = Preconditions.checkNotNull(dataProvider);
this.notifs = Preconditions.checkNotNull(notifs);
this.timer = Preconditions.checkNotNull(timer);
this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.bundleContext = Preconditions.checkNotNull(bundleContext);
+ this.cssp = Preconditions.checkNotNull(cssp);
this.iid = InstanceIdentifier.create(OdlProgramming.class);
final WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
LOG.debug("Creating Instruction Scheduler {}.", instructionId);
final ProgrammingServiceImpl programmingInst =
- new ProgrammingServiceImpl(this.dataProvider, this.notifs, this.exec, this.rpcProviderRegistry,
- this.timer, new InstructionsQueueKey(instructionId));
+ new ProgrammingServiceImpl(this.dataProvider, this.notifs, this.exec, this.rpcProviderRegistry, this.cssp,
+ this.timer, instructionId);
this.programmingServices.put(instructionId, programmingInst);
final Dictionary<String, String> properties = new Hashtable<>();
properties.put(InstructionScheduler.class.getName(), instructionId);
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutputBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public final class ProgrammingServiceImpl implements AutoCloseable, InstructionScheduler, ProgrammingService {
+public final class ProgrammingServiceImpl implements AutoCloseable, ClusterSingletonService, InstructionScheduler,
+ ProgrammingService {
private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
private final Map<InstructionId, InstructionImpl> insns = new HashMap<>();
private final ListeningExecutorService executor;
private final DataBroker dataProvider;
private final Timer timer;
- private final RpcRegistration<ProgrammingService> reg;
+ private final String instructionId;
+ private final ServiceGroupIdentifier sgi;
+ private final ClusterSingletonServiceRegistration csspReg;
+ private final RpcProviderRegistry rpcProviderRegistry;
+ private RpcRegistration<ProgrammingService> reg;
private ServiceRegistration<?> serviceRegistration;
private final class InstructionPusher implements QueueInstruction {
public void onFailure(final Throwable t) {
LOG.error("Failed to update Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
}
- });;
+ });
}
ProgrammingServiceImpl.this.notifs.publish(new InstructionStatusChangedBuilder().setId(this.builder.getId()).setStatus(status).setDetails(details).build());
}
}
- public ProgrammingServiceImpl(final DataBroker dataProvider, final NotificationProviderService notifs,
+ ProgrammingServiceImpl(final DataBroker dataProvider, final NotificationProviderService notifs,
final ListeningExecutorService executor, final RpcProviderRegistry rpcProviderRegistry,
- final Timer timer, final InstructionsQueueKey instructionsQueueKey) {
+ final ClusterSingletonServiceProvider cssp, final Timer timer, final String instructionId) {
this.dataProvider = Preconditions.checkNotNull(dataProvider);
+ this.instructionId = Preconditions.checkNotNull(instructionId);
this.notifs = Preconditions.checkNotNull(notifs);
this.executor = Preconditions.checkNotNull(executor);
+ this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
this.timer = Preconditions.checkNotNull(timer);
- this.qid = KeyedInstanceIdentifier.builder(InstructionsQueue.class, instructionsQueueKey).build();
- this.reg = rpcProviderRegistry.addRpcImplementation(ProgrammingService.class, this);
+ this.qid = KeyedInstanceIdentifier.builder(InstructionsQueue.class, new InstructionsQueueKey(this.instructionId)).build();
+ this.sgi = ServiceGroupIdentifier.create("programming-"+ this.instructionId + "-service-group");
+ this.csspReg = cssp.registerClusterSingletonService(this);
+ }
+
+ @Override
+ public void instantiateServiceInstance() {
+ LOG.info("Instruction Queue service {} instantiated", this.sgi.getValue());
+
+ this.reg = this.rpcProviderRegistry.addRpcImplementation(ProgrammingService.class, this);
final WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
t.put(LogicalDatastoreType.OPERATIONAL, this.qid, new InstructionsQueueBuilder()
- .setKey(instructionsQueueKey).setInstruction(Collections.emptyList()).build());
+ .setKey(new InstructionsQueueKey(this.instructionId)).setInstruction(Collections.emptyList()).build());
Futures.addCallback(t.submit(), new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
});
}
+ @Override
+ public ServiceGroupIdentifier getIdentifier() {
+ return this.sgi;
+ }
+
@Override
public ListenableFuture<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
return this.executor.submit(() -> realCancelInstruction(input));
return ret;
}
+ public String getInstructionID() {
+ return this.instructionId;
+ }
+
private synchronized void timeoutInstruction(final InstructionId id) {
final InstructionImpl i = this.insns.get(id);
if (i == null) {
}
@Override
- public synchronized void close() {
+ public ListenableFuture<Void> closeServiceInstance() {
+ LOG.info("Closing Instruction Queue service {}", this.sgi.getValue());
+
this.reg.close();
for (final InstructionImpl i : this.insns.values()) {
i.tryCancel(null);
LOG.error("Failed to shutdown Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
}
});
+ return future;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (this.csspReg != null) {
+ try {
+ this.csspReg.close();
+ } catch (final Exception e) {
+ LOG.debug("Failed to close Instruction Scheduler service");
+ }
+ }
if (this.serviceRegistration != null) {
this.serviceRegistration.unregister();
this.serviceRegistration = null;
<reference id="notificationService"
interface="org.opendaylight.controller.sal.binding.api.NotificationProviderService" />
<reference id="timer" interface="io.netty.util.Timer" odl:type="global-timer"/>
+ <reference id="clusterSingletonServiceProvider"
+ interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
<bean id="IntructionDeployerImpl" class="org.opendaylight.bgpcep.programming.impl.InstructionDeployedImpl"
destroy-method="close">
<argument ref="rpcRegistry"/>
<argument ref="notificationService"/>
<argument ref="timer"/>
+ <argument ref="clusterSingletonServiceProvider"/>
<argument ref="blueprintBundleContext"/>
</bean>
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsInput;
public class ProgrammingServiceImplTest extends AbstractDataBrokerTest {
private static final int INSTRUCTION_DEADLINE_OFFSET_IN_SECONDS = 3;
- private static final InstructionsQueueKey INSTRUCTIONS_QUEUE_KEY = new InstructionsQueueKey("test-instraction-queue");
+ private static final String INSTRUCTIONS_QUEUE_KEY = "test-instraction-queue";
private final Timer timer = new HashedWheelTimer();
private MockedExecutorWrapper mockedExecutorWrapper;
private MockedNotificationServiceWrapper mockedNotificationServiceWrapper;
private ProgrammingServiceImpl testedProgrammingService;
@Mock
+ private ClusterSingletonServiceProvider cssp;
+ @Mock
+ private ClusterSingletonServiceRegistration singletonServiceRegistration;
+ @Mock
private RpcProviderRegistry rpcRegistry;
@Mock
private RoutedRpcRegistration<ProgrammingService> registration;
+ private ClusterSingletonService singletonService;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
+ doAnswer(invocationOnMock -> {
+ this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
+ return this.singletonServiceRegistration;
+ }).when(this.cssp).registerClusterSingletonService(any(ClusterSingletonService.class));
+
+ doAnswer(invocationOnMock -> {
+ this.singletonService.closeServiceInstance();
+ return null;
+ }).when(this.singletonServiceRegistration).close();
doReturn(this.registration).when(this.rpcRegistry).addRpcImplementation(Mockito.any(),
Mockito.any(ProgrammingService.class));
doNothing().when(this.registration).close();
this.testedProgrammingService = new ProgrammingServiceImpl(getDataBroker(),
this.mockedNotificationServiceWrapper.getMockedNotificationService(),
- this.mockedExecutorWrapper.getMockedExecutor(), this.rpcRegistry, this.timer, INSTRUCTIONS_QUEUE_KEY);
+ this.mockedExecutorWrapper.getMockedExecutor(), this.rpcRegistry, this.cssp, this.timer,
+ INSTRUCTIONS_QUEUE_KEY);
+ this.singletonService.instantiateServiceInstance();
}
@After
public void tearDown() throws Exception {
+ this.singletonService.closeServiceInstance();
this.testedProgrammingService.close();
}
private boolean assertInstructionExists(final InstructionId id) {
try {
- return getDataBroker().newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(InstructionsQueue.class, INSTRUCTIONS_QUEUE_KEY).build().child(org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.Instruction.class,
+ return getDataBroker().newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL,
+ InstanceIdentifier.builder(InstructionsQueue.class, new InstructionsQueueKey(INSTRUCTIONS_QUEUE_KEY))
+ .build().child(org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.Instruction.class,
new InstructionKey(id))).get().isPresent();
} catch (InterruptedException | ExecutionException e) {
return false;
<groupId>org.opendaylight.mdsal</groupId>
<artifactId>yang-binding</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opendaylight.mdsal</groupId>
+ <artifactId>mdsal-singleton-common-api</artifactId>
+ </dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-common</artifactId>
package org.opendaylight.bgpcep.programming.spi;
import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.SubmitInstructionInput;
+import org.opendaylight.yangtools.concepts.Identifiable;
-public interface InstructionScheduler {
+public interface InstructionScheduler extends Identifiable<ServiceGroupIdentifier> {
/**
* Schedule a new instruction for execution. This method tries to enqueue an instruction. It will return a Future
* which represents the scheduling progress. When the future becomes successful, the requestor is expected to start
*
* @param input Instruction scheduling information
* @return Scheduling future.
- *
* @throws SchedulerException if a failure to schedule the instruction occurs.
*/
ListenableFuture<Instruction> scheduleInstruction(SubmitInstructionInput input) throws SchedulerException;
+
+ /**
+ * Returns InstructionID
+ *
+ * @return Instruction ID
+ */
+ @Nonnull
+ String getInstructionID();
}