f1e9effcb17e197643f427938848025fbacf2dd2
[openflowplugin.git] / applications / reconciliation-framework / src / main / java / org / opendaylight / openflowplugin / applications / reconciliation / impl / ReconciliationManagerImpl.java
1 /*
2  * Copyright (c) 2017 Ericsson India Global Services Pvt Ltd. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowplugin.applications.reconciliation.impl;
9
10 import com.google.common.collect.ImmutableMap;
11 import com.google.common.util.concurrent.Futures;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Map.Entry;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20 import java.util.concurrent.ConcurrentSkipListMap;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.stream.Collectors;
23 import javax.annotation.PreDestroy;
24 import javax.inject.Inject;
25 import javax.inject.Singleton;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
28 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeException;
29 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
30 import org.opendaylight.openflowplugin.api.openflow.mastership.ReconciliationFrameworkEvent;
31 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
32 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
33 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationNotificationListener;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
35 import org.opendaylight.yangtools.concepts.Registration;
36 import org.osgi.service.component.annotations.Activate;
37 import org.osgi.service.component.annotations.Component;
38 import org.osgi.service.component.annotations.Deactivate;
39 import org.osgi.service.component.annotations.Reference;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 @Singleton
44 @Component(service = ReconciliationManager.class, immediate = true)
45 public final class ReconciliationManagerImpl
46         implements ReconciliationManager, ReconciliationFrameworkEvent, AutoCloseable {
47     private static final Logger LOG = LoggerFactory.getLogger(ReconciliationManagerImpl.class);
48
49     private final AtomicReference<ResultState> decidedResultState = new AtomicReference<>(ResultState.DONOTHING);
50     private final ConcurrentMap<Integer, List<ReconciliationNotificationListener>> registeredServices =
51             new ConcurrentSkipListMap<>();
52     private final ConcurrentMap<DeviceInfo, ListenableFuture<ResultState>> futureMap = new ConcurrentHashMap<>();
53     private final ConcurrentMap<ResultState, Integer> resultStateMap = new ConcurrentHashMap<>();
54     private final Registration reg;
55
56     @Inject
57     @Activate
58     public ReconciliationManagerImpl(@Reference final MastershipChangeServiceManager mastershipChangeServiceManager)
59             throws MastershipChangeException {
60         reg = mastershipChangeServiceManager.reconciliationFrameworkRegistration(this);
61         LOG.info("ReconciliationManager started");
62     }
63
64     @PreDestroy
65     @Deactivate
66     @Override
67     public void close() {
68         reg.close();
69         LOG.info("ReconciliationManager stopped");
70     }
71
72     @Override
73     public NotificationRegistration registerService(final ReconciliationNotificationListener reconciliationTask) {
74         LOG.debug("Registered service {} with priority {} and intent {}", reconciliationTask.getName(),
75                   reconciliationTask.getPriority(), reconciliationTask.getResultState());
76         registeredServices.computeIfAbsent(reconciliationTask.getPriority(), services -> new ArrayList<>())
77                 .add(reconciliationTask);
78         final var registration = new ReconciliationServiceDelegate(() -> {
79             LOG.debug("Service un-registered from Reconciliation framework {}", reconciliationTask.getName());
80             registeredServices.computeIfPresent(reconciliationTask.getPriority(), (priority, services) -> services)
81                     .remove(reconciliationTask);
82             decideResultState(reconciliationTask.getResultState());
83         });
84         decideResultState(reconciliationTask.getResultState());
85         return registration;
86     }
87
88     private void decideResultState(final ResultState resultState) {
89         resultStateMap.compute(resultState, (unused, count) -> count == null ? 1 : count + 1);
90
91         // FIXME: can we do this more efficiently?
92         Entry<ResultState, Integer> maxEntry = null;
93         for (var entry : resultStateMap.entrySet()) {
94             if (maxEntry == null || entry.getValue() > maxEntry.getValue()) {
95                 maxEntry = entry;
96             }
97         }
98         decidedResultState.set(maxEntry.getKey());
99     }
100
101     @Override
102     public Map<Integer, List<ReconciliationNotificationListener>> getRegisteredServices() {
103         return ImmutableMap.copyOf(registeredServices);
104     }
105
106     @Override
107     public ListenableFuture<ResultState> onDevicePrepared(@NonNull final DeviceInfo node) {
108         LOG.debug("Triggering reconciliation for node : {}", node.getNodeId());
109         return futureMap.computeIfAbsent(node, value -> reconcileNode(node));
110     }
111
112     @Override
113     public ListenableFuture<Void> onDeviceDisconnected(@NonNull final DeviceInfo node) {
114         LOG.info("Stopping reconciliation for node {}", node.getNodeId());
115         return futureMap.containsKey(node) ? cancelNodeReconciliation(node) : Futures.immediateVoidFuture();
116     }
117
118     private ListenableFuture<ResultState> reconcileNode(final DeviceInfo node) {
119         var lastFuture = Futures.<ResultState>immediateFuture(null);
120         for (var services : registeredServices.values()) {
121             lastFuture = reconcileServices(lastFuture, services, node);
122         }
123         return lastFuture;
124     }
125
126     private ListenableFuture<ResultState> reconcileServices(final ListenableFuture<ResultState> prevFuture,
127             final List<ReconciliationNotificationListener> servicesForPriority, final DeviceInfo node) {
128         return Futures.transformAsync(prevFuture,
129             prevResult -> Futures.transform(Futures.allAsList(servicesForPriority.stream()
130                 .map(service -> service.startReconciliation(node)).collect(Collectors.toList())),
131                 results -> decidedResultState.get(),
132                 MoreExecutors.directExecutor()),
133             MoreExecutors.directExecutor());
134     }
135
136     private ListenableFuture<Void> cancelNodeReconciliation(final DeviceInfo node) {
137         var lastFuture = Futures.immediateVoidFuture();
138         futureMap.get(node).cancel(true);
139         futureMap.remove(node);
140         for (var services : registeredServices.values()) {
141             lastFuture = cancelServiceReconciliation(lastFuture, services, node);
142         }
143         return lastFuture;
144     }
145
146     private static ListenableFuture<Void> cancelServiceReconciliation(final ListenableFuture<Void> prevFuture,
147             final List<ReconciliationNotificationListener> servicesForPriority, final DeviceInfo node) {
148         return Futures.transformAsync(prevFuture,
149             prevResult -> Futures.transform(Futures.allAsList(servicesForPriority.stream()
150                 .map(service -> service.endReconciliation(node))
151                 .collect(Collectors.toList())),
152                 results -> null,
153                 MoreExecutors.directExecutor()),
154             MoreExecutors.directExecutor());
155     }
156 }