2 * Copyright (C) 2014 EBay Software Foundation
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Ashwin Raveendran
10 package org.opendaylight.ovsdb.lib;
12 import com.fasterxml.jackson.databind.JsonNode;
13 import com.fasterxml.jackson.databind.node.ObjectNode;
14 import com.google.common.base.Function;
15 import com.google.common.collect.ImmutableMap;
16 import com.google.common.collect.Lists;
17 import com.google.common.collect.Maps;
18 import com.google.common.util.concurrent.FutureCallback;
19 import com.google.common.util.concurrent.Futures;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
23 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
24 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
25 import org.opendaylight.ovsdb.lib.message.TableUpdate;
26 import org.opendaylight.ovsdb.lib.message.TableUpdates;
27 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
28 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
29 import org.opendaylight.ovsdb.lib.operations.Operation;
30 import org.opendaylight.ovsdb.lib.operations.OperationResult;
31 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
32 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
33 import org.opendaylight.ovsdb.lib.schema.TableSchema;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import java.util.HashMap;
38 import java.util.Iterator;
39 import java.util.List;
41 import java.util.Queue;
42 import java.util.UUID;
43 import java.util.concurrent.ExecutorService;
46 public class OvsDBClientImpl implements OvsDBClient {
48 protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
49 private ExecutorService executorService;
51 private Map<String, DatabaseSchema> schema = Maps.newHashMap();
52 private HashMap<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
53 private Queue<Throwable> exceptions;
54 private OvsdbRPC.Callback rpcCallback;
56 public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
58 this.executorService = executorService;
64 void setupUpdateListener() {
65 if (rpcCallback == null) {
66 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
68 public void update(Object node, UpdateNotification upadateNotification) {
69 Object key = upadateNotification.getContext();
70 CallbackContext callbackContext = monitorCallbacks.get(key);
71 MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
72 if (monitorCallBack == null) {
74 logger.info("callback received with context {}, but no known handler. Ignoring!", key);
77 _transformingCallback(upadateNotification.getUpdates(), monitorCallBack, callbackContext.schema);
81 public void locked(Object node, List<String> ids) {
86 public void stolen(Object node, List<String> ids) {
90 this.rpcCallback = temp;
91 rpc.registerCallback(temp);
96 protected void _transformingCallback(JsonNode tableUpdatesJson, MonitorCallBack monitorCallBack, DatabaseSchema dbSchema) {
97 //todo(ashwin): we should move all the JSON parsing logic to a utility class
98 if (tableUpdatesJson instanceof ObjectNode) {
99 Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
100 ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
101 for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();){
102 Map.Entry<String, JsonNode> entry = itr.next();
104 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
105 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
106 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
109 TableUpdates updates = new TableUpdates(tableUpdateMap);
110 monitorCallBack.update(updates);
115 public ListenableFuture<List<OperationResult>> transact(List<Operation> operations) {
117 //todo, we may not need transactionbuilder if we can have JSON objects
118 TransactBuilder builder = new TransactBuilder();
119 for (Operation o : operations) {
120 builder.addOperation(o);
123 return rpc.transact(builder);
127 public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
128 List<MonitorRequest<E>> monitorRequest,
129 final MonitorCallBack callback) {
131 final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
132 new Function<MonitorRequest<E>, String>() {
134 public String apply(MonitorRequest<E> input) {
135 return input.getTableName();
139 final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
140 registerCallback(monitorHandle, callback, dbSchema);
142 ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
144 public List<Object> params() {
145 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
148 Futures.addCallback(monitor, new FutureCallback<JsonNode>() {
150 public void onSuccess(JsonNode result) {
151 _transformingCallback(result, callback, dbSchema);
155 public void onFailure(Throwable t) {
156 callback.exception(t);
160 return monitorHandle;
163 private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
164 this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
165 setupUpdateListener();
169 public void cancelMonitor(MonitorHandle handler) {
170 throw new UnsupportedOperationException("not yet implemented");
174 public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
175 throw new UnsupportedOperationException("not yet implemented");
179 public ListenableFuture<Boolean> steal(String lockId) {
180 throw new UnsupportedOperationException("not yet implemented");
184 public ListenableFuture<Boolean> unLock(String lockId) {
185 throw new UnsupportedOperationException("not yet implemented");
189 public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
190 throw new UnsupportedOperationException("not yet implemented");
194 public void stopEchoService() {
195 throw new UnsupportedOperationException("not yet implemented");
199 public TransactionBuilder transactBuilder() {
200 return new TransactionBuilder(this);
204 public boolean isReady(int timeout) throws InterruptedException {
205 while (timeout > 0) {
206 if (!schema.isEmpty()) {
216 public ListenableFuture<List<String>> getDatabases() {
217 return rpc.list_dbs();
221 public ListenableFuture<DatabaseSchema> getSchema(final String database, final boolean cacheResult) {
223 DatabaseSchema databaseSchema = schema.get(database);
225 if (databaseSchema == null || cacheResult) {
226 return Futures.transform(
227 getSchemaFromDevice(Lists.newArrayList(database)),
228 new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
230 public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
231 if (result.containsKey(database)) {
232 DatabaseSchema s = result.get(database);
234 OvsDBClientImpl.this.schema.put(database, s);
245 return Futures.immediateFuture(databaseSchema);
249 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
250 Map<String, DatabaseSchema> schema = Maps.newHashMap();
251 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
252 _populateSchema(dbNames, schema, future);
256 private void _populateSchema(final List<String> dbNames,
257 final Map<String, DatabaseSchema> schema,
258 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
260 if (dbNames == null || dbNames.isEmpty()) {
264 Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
265 new com.google.common.base.Function<JsonNode, Void>() {
267 public Void apply(JsonNode jsonNode) {
269 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
270 if (schema.size() > 1 && !sfuture.isCancelled()) {
271 _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
272 } else if (schema.size() == 1) {
275 } catch (Throwable e) {
276 sfuture.setException(e);
283 public void setRpc(OvsdbRPC rpc) {
287 public Queue<Throwable> getExceptions() {
291 static class CallbackContext {
292 MonitorCallBack monitorCallBack;
293 DatabaseSchema schema;
295 CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
296 this.monitorCallBack = monitorCallBack;
297 this.schema = schema;