Do not use Builders in InMemoryDataStoreTest
[mdsal.git] / singleton-service / mdsal-singleton-dom-impl / src / main / java / org / opendaylight / mdsal / singleton / dom / impl / EOSClusterSingletonServiceProvider.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.singleton.dom.impl;
9
10 import static com.google.common.base.Verify.verify;
11 import static com.google.common.base.Verify.verifyNotNull;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.annotations.VisibleForTesting;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import java.util.ArrayList;
19 import java.util.List;
20 import java.util.concurrent.ConcurrentHashMap;
21 import java.util.concurrent.ConcurrentMap;
22 import java.util.concurrent.ExecutionException;
23 import javax.annotation.PreDestroy;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
28 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipStateChange;
29 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
30 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
31 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
32 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService;
33 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
34 import org.opendaylight.yangtools.concepts.Registration;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
36 import org.osgi.service.component.annotations.Activate;
37 import org.osgi.service.component.annotations.Component;
38 import org.osgi.service.component.annotations.Deactivate;
39 import org.osgi.service.component.annotations.Reference;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 /**
44  * Implementation of {@link ClusterSingletonServiceProvider} working on top a {@link DOMEntityOwnershipService}.
45  */
46 @Singleton
47 @Component(service = ClusterSingletonServiceProvider.class)
48 public final class EOSClusterSingletonServiceProvider
49         implements ClusterSingletonServiceProvider, DOMEntityOwnershipListener, AutoCloseable {
50     private static final Logger LOG = LoggerFactory.getLogger(EOSClusterSingletonServiceProvider.class);
51
52     @VisibleForTesting
53     static final @NonNull String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
54     @VisibleForTesting
55     static final @NonNull String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
56
57     private final ConcurrentMap<String, ServiceGroup> serviceGroupMap = new ConcurrentHashMap<>();
58     private final DOMEntityOwnershipService entityOwnershipService;
59
60     /* EOS Entity Listeners Registration */
61     private Registration serviceEntityListenerReg;
62     private Registration asyncCloseEntityListenerReg;
63
64     @Inject
65     @Activate
66     public EOSClusterSingletonServiceProvider(@Reference final DOMEntityOwnershipService entityOwnershipService) {
67         this.entityOwnershipService = requireNonNull(entityOwnershipService);
68         serviceEntityListenerReg = entityOwnershipService.registerListener(SERVICE_ENTITY_TYPE, this);
69         asyncCloseEntityListenerReg = entityOwnershipService.registerListener(CLOSE_SERVICE_ENTITY_TYPE, this);
70         LOG.info("Cluster Singleton Service started");
71     }
72
73     @PreDestroy
74     @Deactivate
75     @Override
76     public synchronized void close() throws ExecutionException, InterruptedException {
77         if (serviceEntityListenerReg == null) {
78             // Idempotent
79             return;
80         }
81
82         LOG.info("Cluster Singleton Service stopping");
83         serviceEntityListenerReg.close();
84         serviceEntityListenerReg = null;
85
86         final var future = Futures.allAsList(serviceGroupMap.values().stream()
87             .map(ServiceGroup::closeClusterSingletonGroup)
88             .toList());
89         try {
90             LOG.debug("Waiting for service groups to stop");
91             future.get();
92         } finally {
93             asyncCloseEntityListenerReg.close();
94             asyncCloseEntityListenerReg = null;
95             serviceGroupMap.clear();
96         }
97
98         LOG.info("Cluster Singleton Service stopped");
99     }
100
101     @Override
102     public synchronized Registration registerClusterSingletonService(final ClusterSingletonService service) {
103         LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service, this);
104
105         final var serviceIdentifier = service.getIdentifier().value();
106         final ServiceGroup serviceGroup;
107         final var existing = serviceGroupMap.get(serviceIdentifier);
108         if (existing == null) {
109             serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
110             serviceGroupMap.put(serviceIdentifier, serviceGroup);
111
112             try {
113                 initializeOrRemoveGroup(serviceGroup);
114             } catch (CandidateAlreadyRegisteredException e) {
115                 throw new IllegalArgumentException("Service group already registered", e);
116             }
117         } else {
118             serviceGroup = existing;
119         }
120
121         final var reg = new ServiceRegistration(service) {
122             @Override
123             protected void removeRegistration() {
124                 // We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
125                 // shutdown of the group and user registering it again.
126                 EOSClusterSingletonServiceProvider.this.removeRegistration(serviceIdentifier, this);
127             }
128         };
129
130         serviceGroup.registerService(reg);
131         return reg;
132     }
133
134     private ServiceGroup createGroup(final String serviceIdentifier, final List<ServiceRegistration> services) {
135         return new ActiveServiceGroup(serviceIdentifier, entityOwnershipService,
136             createEntity(SERVICE_ENTITY_TYPE, serviceIdentifier),
137             createEntity(CLOSE_SERVICE_ENTITY_TYPE, serviceIdentifier), services);
138     }
139
140     private void initializeOrRemoveGroup(final ServiceGroup group) throws CandidateAlreadyRegisteredException {
141         try {
142             group.initialize();
143         } catch (CandidateAlreadyRegisteredException e) {
144             serviceGroupMap.remove(group.getIdentifier(), group);
145             throw e;
146         }
147     }
148
149     private void removeRegistration(final String serviceIdentifier, final ServiceRegistration reg) {
150         final PlaceholderServiceGroup placeHolder;
151         final ListenableFuture<?> future;
152         synchronized (this) {
153             final var lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier));
154             future = lookup.unregisterService(reg);
155             if (future == null) {
156                 return;
157             }
158
159             // Close the group and replace it with a placeholder
160             LOG.debug("Closing service group {}", serviceIdentifier);
161             placeHolder = new PlaceholderServiceGroup(lookup, future);
162
163             final var identifier = reg.getInstance().getIdentifier().value();
164             verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
165             LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
166
167             lookup.closeClusterSingletonGroup();
168         }
169
170         future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
171     }
172
173     private synchronized void finishShutdown(final PlaceholderServiceGroup placeHolder) {
174         final var identifier = placeHolder.getIdentifier();
175         LOG.debug("Service group {} closed", identifier);
176
177         final var services = placeHolder.getServices();
178         if (services.isEmpty()) {
179             // No services, we are all done
180             if (serviceGroupMap.remove(identifier, placeHolder)) {
181                 LOG.debug("Service group {} removed", placeHolder);
182             } else {
183                 LOG.debug("Service group {} superseded by {}", placeHolder, serviceGroupMap.get(identifier));
184             }
185             return;
186         }
187
188         // Placeholder is being retired, we are reusing its services as the seed for the group.
189         final var group = createGroup(identifier, services);
190         verify(serviceGroupMap.replace(identifier, placeHolder, group));
191         placeHolder.setSuccessor(group);
192         LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
193
194         try {
195             initializeOrRemoveGroup(group);
196         } catch (CandidateAlreadyRegisteredException e) {
197             LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
198         }
199     }
200
201     @Override
202     public void ownershipChanged(final DOMEntity entity, final EntityOwnershipStateChange change,
203             final boolean inJeopardy) {
204         LOG.debug("Ownership change for ClusterSingletonService Provider on {} {} inJeopardy={}", entity, change,
205             inJeopardy);
206
207         final var serviceIdentifier = getServiceIdentifierFromEntity(entity);
208         final var serviceHolder = serviceGroupMap.get(serviceIdentifier);
209         if (serviceHolder != null) {
210             serviceHolder.ownershipChanged(entity, change, inJeopardy);
211         } else {
212             LOG.debug("ClusterSingletonServiceGroup was not found for serviceIdentifier {}", serviceIdentifier);
213         }
214     }
215
216     @VisibleForTesting
217     static String getServiceIdentifierFromEntity(final DOMEntity entity) {
218         final var yii = entity.getIdentifier();
219         final var niiwp = (NodeIdentifierWithPredicates) yii.getLastPathArgument();
220         return niiwp.values().iterator().next().toString();
221     }
222
223     /**
224      * Creates an extended {@link DOMEntity} instance.
225      *
226      * @param entityType the type of the entity
227      * @param entityIdentifier the identifier of the entity
228      * @return instance of Entity extended GenericEntity type
229      */
230     @VisibleForTesting
231     static DOMEntity createEntity(final String entityType, final String entityIdentifier) {
232         return new DOMEntity(entityType, entityIdentifier);
233     }
234 }