BUG-6975: Integrate Programming with Cluster Singleton Service 16/52516/6
authorClaudio D. Gasparini <claudio.gasparini@pantheon.tech>
Wed, 1 Mar 2017 16:25:53 +0000 (17:25 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 7 Mar 2017 07:18:58 +0000 (08:18 +0100)
Integrate Programming with Cluster Singleton Service

Change-Id: I95e3c8c1dc1e596c060256d3f59fec9cfe882456
Signed-off-by: Claudio D. Gasparini <claudio.gasparini@pantheon.tech>
features/pcep/features-pcep/src/main/features/features.xml
programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/InstructionDeployedImpl.java
programming/impl/src/main/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImpl.java
programming/impl/src/main/resources/org/opendaylight/blueprint/programming.xml
programming/impl/src/test/java/org/opendaylight/bgpcep/programming/impl/ProgrammingServiceImplTest.java
programming/spi/pom.xml
programming/spi/src/main/java/org/opendaylight/bgpcep/programming/spi/InstructionScheduler.java

index e4484882763eff07e6b9831863852d8a6f4f3b17..e43f22337c2492df39ff498f29744e8e69a8659a 100644 (file)
@@ -64,6 +64,7 @@
     <feature name='odl-bgpcep-programming-api' version='${project.version}'>
         <feature version='${project.version}'>odl-bgpcep-pcep-dependencies</feature>
         <feature version='${config.version}'>odl-config-api</feature>
+        <bundle>mvn:org.opendaylight.mdsal/mdsal-singleton-common-api/{{VERSION}}</bundle>
         <bundle>mvn:org.opendaylight.bgpcep/topology-api/{{VERSION}}</bundle>
         <bundle>mvn:org.opendaylight.bgpcep/topology-tunnel-api/{{VERSION}}</bundle>
         <bundle>mvn:org.opendaylight.bgpcep/programming-api/{{VERSION}}</bundle>
index 03fdfd22b770b4927a9660645cb15a3590ddd603..27f1a614b9e608955ec88aad2c278b7e1d5be14e 100644 (file)
@@ -36,6 +36,7 @@ import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.OdlProgramming;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.OdlProgrammingBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.odl.programming.config.rev170301.odl.programming.OdlProgrammingConfig;
@@ -63,14 +64,17 @@ public final class InstructionDeployedImpl implements IntructionDeployer,
     private final Map<String, ProgrammingServiceImpl> programmingServices = new HashMap<>();
     private final ListenerRegistration<InstructionDeployedImpl> registration;
     private final InstanceIdentifier<OdlProgramming> iid;
+    private final ClusterSingletonServiceProvider cssp;
 
     public InstructionDeployedImpl(final DataBroker dataProvider, final RpcProviderRegistry rpcProviderRegistry,
-        final NotificationProviderService notifs, final Timer timer, final BundleContext bundleContext) {
+        final NotificationProviderService notifs, final Timer timer, final ClusterSingletonServiceProvider cssp,
+        final BundleContext bundleContext) {
         this.dataProvider = Preconditions.checkNotNull(dataProvider);
         this.notifs = Preconditions.checkNotNull(notifs);
         this.timer = Preconditions.checkNotNull(timer);
         this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
         this.bundleContext = Preconditions.checkNotNull(bundleContext);
+        this.cssp = Preconditions.checkNotNull(cssp);
         this.iid = InstanceIdentifier.create(OdlProgramming.class);
 
         final WriteTransaction wTx = dataProvider.newWriteOnlyTransaction();
@@ -99,8 +103,8 @@ public final class InstructionDeployedImpl implements IntructionDeployer,
         LOG.debug("Creating Instruction Scheduler {}.", instructionId);
 
         final ProgrammingServiceImpl programmingInst =
-            new ProgrammingServiceImpl(this.dataProvider, this.notifs, this.exec, this.rpcProviderRegistry,
-                this.timer, new InstructionsQueueKey(instructionId));
+            new ProgrammingServiceImpl(this.dataProvider, this.notifs, this.exec, this.rpcProviderRegistry, this.cssp,
+                this.timer, instructionId);
         this.programmingServices.put(instructionId, programmingInst);
         final Dictionary<String, String> properties = new Hashtable<>();
         properties.put(InstructionScheduler.class.getName(), instructionId);
index c87466dfde9822de0d82992e5dd15203e31e9b92..d61745fec1c25176fab6e296efc76d55a60b5a5a 100644 (file)
@@ -37,6 +37,10 @@ import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFaile
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RpcRegistration;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionOutputBuilder;
@@ -67,7 +71,8 @@ import org.osgi.framework.ServiceRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class ProgrammingServiceImpl implements AutoCloseable, InstructionScheduler, ProgrammingService {
+public final class ProgrammingServiceImpl implements AutoCloseable, ClusterSingletonService, InstructionScheduler,
+    ProgrammingService {
     private static final Logger LOG = LoggerFactory.getLogger(ProgrammingServiceImpl.class);
 
     private final Map<InstructionId, InstructionImpl> insns = new HashMap<>();
@@ -76,7 +81,11 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS
     private final ListeningExecutorService executor;
     private final DataBroker dataProvider;
     private final Timer timer;
-    private final RpcRegistration<ProgrammingService> reg;
+    private final String instructionId;
+    private final ServiceGroupIdentifier sgi;
+    private final ClusterSingletonServiceRegistration csspReg;
+    private final RpcProviderRegistry rpcProviderRegistry;
+    private RpcRegistration<ProgrammingService> reg;
     private ServiceRegistration<?> serviceRegistration;
 
     private final class InstructionPusher implements QueueInstruction {
@@ -109,7 +118,7 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS
                     public void onFailure(final Throwable t) {
                         LOG.error("Failed to update Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
                     }
-                });;
+                });
             }
 
             ProgrammingServiceImpl.this.notifs.publish(new InstructionStatusChangedBuilder().setId(this.builder.getId()).setStatus(status).setDetails(details).build());
@@ -135,19 +144,29 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS
         }
     }
 
-    public ProgrammingServiceImpl(final DataBroker dataProvider, final NotificationProviderService notifs,
+    ProgrammingServiceImpl(final DataBroker dataProvider, final NotificationProviderService notifs,
         final ListeningExecutorService executor, final RpcProviderRegistry rpcProviderRegistry,
-        final Timer timer, final InstructionsQueueKey instructionsQueueKey) {
+        final ClusterSingletonServiceProvider cssp, final Timer timer, final String instructionId) {
         this.dataProvider = Preconditions.checkNotNull(dataProvider);
+        this.instructionId = Preconditions.checkNotNull(instructionId);
         this.notifs = Preconditions.checkNotNull(notifs);
         this.executor = Preconditions.checkNotNull(executor);
+        this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry);
         this.timer = Preconditions.checkNotNull(timer);
-        this.qid = KeyedInstanceIdentifier.builder(InstructionsQueue.class, instructionsQueueKey).build();
-        this.reg = rpcProviderRegistry.addRpcImplementation(ProgrammingService.class, this);
+        this.qid = KeyedInstanceIdentifier.builder(InstructionsQueue.class,  new InstructionsQueueKey(this.instructionId)).build();
+        this.sgi = ServiceGroupIdentifier.create("programming-"+ this.instructionId + "-service-group");
+        this.csspReg = cssp.registerClusterSingletonService(this);
+    }
+
+    @Override
+    public void instantiateServiceInstance() {
+        LOG.info("Instruction Queue service {} instantiated", this.sgi.getValue());
+
+        this.reg = this.rpcProviderRegistry.addRpcImplementation(ProgrammingService.class, this);
 
         final WriteTransaction t = this.dataProvider.newWriteOnlyTransaction();
         t.put(LogicalDatastoreType.OPERATIONAL, this.qid, new InstructionsQueueBuilder()
-            .setKey(instructionsQueueKey).setInstruction(Collections.emptyList()).build());
+            .setKey(new InstructionsQueueKey(this.instructionId)).setInstruction(Collections.emptyList()).build());
         Futures.addCallback(t.submit(), new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
@@ -161,6 +180,11 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS
         });
     }
 
+    @Override
+    public ServiceGroupIdentifier getIdentifier() {
+        return this.sgi;
+    }
+
     @Override
     public ListenableFuture<RpcResult<CancelInstructionOutput>> cancelInstruction(final CancelInstructionInput input) {
         return this.executor.submit(() -> realCancelInstruction(input));
@@ -325,6 +349,10 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS
         return ret;
     }
 
+    public String getInstructionID() {
+        return this.instructionId;
+    }
+
     private synchronized void timeoutInstruction(final InstructionId id) {
         final InstructionImpl i = this.insns.get(id);
         if (i == null) {
@@ -362,7 +390,9 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS
     }
 
     @Override
-    public synchronized void close() {
+    public ListenableFuture<Void> closeServiceInstance() {
+        LOG.info("Closing Instruction Queue service {}", this.sgi.getValue());
+
         this.reg.close();
         for (final InstructionImpl i : this.insns.values()) {
             i.tryCancel(null);
@@ -382,6 +412,18 @@ public final class ProgrammingServiceImpl implements AutoCloseable, InstructionS
                 LOG.error("Failed to shutdown Instruction Queue {}", ProgrammingServiceImpl.this.qid, t);
             }
         });
+        return future;
+    }
+
+    @Override
+    public synchronized void close() {
+        if (this.csspReg != null) {
+            try {
+                this.csspReg.close();
+            } catch (final Exception e) {
+                LOG.debug("Failed to close Instruction Scheduler service");
+            }
+        }
         if (this.serviceRegistration != null) {
             this.serviceRegistration.unregister();
             this.serviceRegistration = null;
index e883f181367313acbce0a006ce1906e439f52a5c..05706a98f3740eb8dd0ef38f2b10243609c39f25 100644 (file)
@@ -16,6 +16,8 @@
     <reference id="notificationService"
                interface="org.opendaylight.controller.sal.binding.api.NotificationProviderService" />
     <reference id="timer" interface="io.netty.util.Timer" odl:type="global-timer"/>
+    <reference id="clusterSingletonServiceProvider"
+               interface="org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider"/>
 
     <bean id="IntructionDeployerImpl" class="org.opendaylight.bgpcep.programming.impl.InstructionDeployedImpl"
           destroy-method="close">
@@ -23,6 +25,7 @@
         <argument ref="rpcRegistry"/>
         <argument ref="notificationService"/>
         <argument ref="timer"/>
+        <argument ref="clusterSingletonServiceProvider"/>
         <argument ref="blueprintBundleContext"/>
     </bean>
 
index 69571f370f9017aa874f956c6f286e971129b446..930b3c1bb8496d81502a9c88bb83f24528917136 100644 (file)
@@ -13,6 +13,8 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -38,6 +40,9 @@ import org.opendaylight.controller.md.sal.binding.test.AbstractDataBrokerTest;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
+import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CancelInstructionInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.CleanInstructionsInput;
@@ -60,19 +65,33 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 public class ProgrammingServiceImplTest extends AbstractDataBrokerTest {
 
     private static final int INSTRUCTION_DEADLINE_OFFSET_IN_SECONDS = 3;
-    private static final InstructionsQueueKey INSTRUCTIONS_QUEUE_KEY = new InstructionsQueueKey("test-instraction-queue");
+    private static final String INSTRUCTIONS_QUEUE_KEY = "test-instraction-queue";
     private final Timer timer = new HashedWheelTimer();
     private MockedExecutorWrapper mockedExecutorWrapper;
     private MockedNotificationServiceWrapper mockedNotificationServiceWrapper;
     private ProgrammingServiceImpl testedProgrammingService;
     @Mock
+    private ClusterSingletonServiceProvider cssp;
+    @Mock
+    private ClusterSingletonServiceRegistration singletonServiceRegistration;
+    @Mock
     private RpcProviderRegistry rpcRegistry;
     @Mock
     private RoutedRpcRegistration<ProgrammingService> registration;
+    private ClusterSingletonService singletonService;
 
     @Before
     public void setUp() throws Exception {
         MockitoAnnotations.initMocks(this);
+        doAnswer(invocationOnMock -> {
+            this.singletonService = (ClusterSingletonService) invocationOnMock.getArguments()[0];
+            return this.singletonServiceRegistration;
+        }).when(this.cssp).registerClusterSingletonService(any(ClusterSingletonService.class));
+
+        doAnswer(invocationOnMock -> {
+            this.singletonService.closeServiceInstance();
+            return null;
+        }).when(this.singletonServiceRegistration).close();
         doReturn(this.registration).when(this.rpcRegistry).addRpcImplementation(Mockito.any(),
             Mockito.any(ProgrammingService.class));
         doNothing().when(this.registration).close();
@@ -81,11 +100,14 @@ public class ProgrammingServiceImplTest extends AbstractDataBrokerTest {
 
         this.testedProgrammingService = new ProgrammingServiceImpl(getDataBroker(),
             this.mockedNotificationServiceWrapper.getMockedNotificationService(),
-            this.mockedExecutorWrapper.getMockedExecutor(), this.rpcRegistry, this.timer, INSTRUCTIONS_QUEUE_KEY);
+            this.mockedExecutorWrapper.getMockedExecutor(), this.rpcRegistry, this.cssp, this.timer,
+            INSTRUCTIONS_QUEUE_KEY);
+        this.singletonService.instantiateServiceInstance();
     }
 
     @After
     public void tearDown() throws Exception {
+        this.singletonService.closeServiceInstance();
         this.testedProgrammingService.close();
     }
 
@@ -350,7 +372,9 @@ public class ProgrammingServiceImplTest extends AbstractDataBrokerTest {
 
     private boolean assertInstructionExists(final InstructionId id) {
         try {
-            return getDataBroker().newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.builder(InstructionsQueue.class, INSTRUCTIONS_QUEUE_KEY).build().child(org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.Instruction.class,
+            return getDataBroker().newReadOnlyTransaction().read(LogicalDatastoreType.OPERATIONAL,
+                InstanceIdentifier.builder(InstructionsQueue.class, new InstructionsQueueKey(INSTRUCTIONS_QUEUE_KEY))
+                    .build().child(org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.instruction.queue.Instruction.class,
                     new InstructionKey(id))).get().isPresent();
         } catch (InterruptedException | ExecutionException e) {
             return false;
index 6c7de28eec52211af1d6ba2cdecc936ae8e2b665..a79c8e2468e22b4aca1ae4a568953effa8c675ed 100644 (file)
             <groupId>org.opendaylight.mdsal</groupId>
             <artifactId>yang-binding</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.opendaylight.mdsal</groupId>
+            <artifactId>mdsal-singleton-common-api</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.opendaylight.yangtools</groupId>
             <artifactId>yang-common</artifactId>
index b38c847b76f9b6569546dbf4a7fdc14e0f7bdf7f..81f3fec84d1fd4c9075b69578196d790e5c5c83a 100644 (file)
@@ -8,9 +8,12 @@
 package org.opendaylight.bgpcep.programming.spi;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import javax.annotation.Nonnull;
+import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.programming.rev150720.SubmitInstructionInput;
+import org.opendaylight.yangtools.concepts.Identifiable;
 
-public interface InstructionScheduler {
+public interface InstructionScheduler extends Identifiable<ServiceGroupIdentifier> {
     /**
      * Schedule a new instruction for execution. This method tries to enqueue an instruction. It will return a Future
      * which represents the scheduling progress. When the future becomes successful, the requestor is expected to start
@@ -18,8 +21,15 @@ public interface InstructionScheduler {
      *
      * @param input Instruction scheduling information
      * @return Scheduling future.
-     *
      * @throws SchedulerException if a failure to schedule the instruction occurs.
      */
     ListenableFuture<Instruction> scheduleInstruction(SubmitInstructionInput input) throws SchedulerException;
+
+    /**
+     * Returns InstructionID
+     *
+     * @return Instruction ID
+     */
+    @Nonnull
+    String getInstructionID();
 }