2 * Copyright (C) 2014 EBay Software Foundation
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 * Authors : Ashwin Raveendran
10 package org.opendaylight.ovsdb.lib;
12 import java.util.HashMap;
13 import java.util.Iterator;
14 import java.util.List;
16 import java.util.Queue;
17 import java.util.UUID;
18 import java.util.concurrent.ExecutorService;
20 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
21 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
22 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
23 import org.opendaylight.ovsdb.lib.message.TableUpdate;
24 import org.opendaylight.ovsdb.lib.message.TableUpdates;
25 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
26 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
27 import org.opendaylight.ovsdb.lib.notation.Row;
28 import org.opendaylight.ovsdb.lib.operations.Operation;
29 import org.opendaylight.ovsdb.lib.operations.OperationResult;
30 import org.opendaylight.ovsdb.lib.operations.TransactionBuilder;
31 import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
32 import org.opendaylight.ovsdb.lib.schema.GenericTableSchema;
33 import org.opendaylight.ovsdb.lib.schema.TableSchema;
34 import org.opendaylight.ovsdb.lib.schema.typed.TypedTable;
35 import org.opendaylight.ovsdb.lib.schema.typed.TyperUtils;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import com.fasterxml.jackson.databind.JsonNode;
40 import com.fasterxml.jackson.databind.node.ObjectNode;
41 import com.google.common.base.Function;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.Lists;
44 import com.google.common.collect.Maps;
45 import com.google.common.util.concurrent.FutureCallback;
46 import com.google.common.util.concurrent.Futures;
47 import com.google.common.util.concurrent.ListenableFuture;
48 import com.google.common.util.concurrent.SettableFuture;
51 public class OvsDBClientImpl implements OvsDBClient {
53 protected static final Logger logger = LoggerFactory.getLogger(OvsDBClientImpl.class);
54 private ExecutorService executorService;
56 private Map<String, DatabaseSchema> schema = Maps.newHashMap();
57 private HashMap<String, CallbackContext> monitorCallbacks = Maps.newHashMap();
58 private Queue<Throwable> exceptions;
59 private OvsdbRPC.Callback rpcCallback;
61 public OvsDBClientImpl(OvsdbRPC rpc, ExecutorService executorService) {
63 this.executorService = executorService;
69 void setupUpdateListener() {
70 if (rpcCallback == null) {
71 OvsdbRPC.Callback temp = new OvsdbRPC.Callback() {
73 public void update(Object node, UpdateNotification upadateNotification) {
74 Object key = upadateNotification.getContext();
75 CallbackContext callbackContext = monitorCallbacks.get(key);
76 MonitorCallBack monitorCallBack = callbackContext.monitorCallBack;
77 if (monitorCallBack == null) {
79 logger.info("callback received with context {}, but no known handler. Ignoring!", key);
82 _transformingCallback(upadateNotification.getUpdates(), monitorCallBack, callbackContext.schema);
86 public void locked(Object node, List<String> ids) {
91 public void stolen(Object node, List<String> ids) {
95 this.rpcCallback = temp;
96 rpc.registerCallback(temp);
101 protected void _transformingCallback(JsonNode tableUpdatesJson, MonitorCallBack monitorCallBack, DatabaseSchema dbSchema) {
102 //todo(ashwin): we should move all the JSON parsing logic to a utility class
103 if (tableUpdatesJson instanceof ObjectNode) {
104 Map<String, TableUpdate> tableUpdateMap = Maps.newHashMap();
105 ObjectNode updatesJson = (ObjectNode) tableUpdatesJson;
106 for (Iterator<Map.Entry<String,JsonNode>> itr = updatesJson.fields(); itr.hasNext();){
107 Map.Entry<String, JsonNode> entry = itr.next();
109 DatabaseSchema databaseSchema = this.schema.get(dbSchema.getName());
110 TableSchema table = databaseSchema.table(entry.getKey(), TableSchema.class);
111 tableUpdateMap.put(entry.getKey(), table.updatesFromJson(entry.getValue()));
114 TableUpdates updates = new TableUpdates(tableUpdateMap);
115 monitorCallBack.update(updates);
120 public ListenableFuture<List<OperationResult>> transact(List<Operation> operations) {
122 //todo, we may not need transactionbuilder if we can have JSON objects
123 TransactBuilder builder = new TransactBuilder();
124 for (Operation o : operations) {
125 builder.addOperation(o);
128 return rpc.transact(builder);
132 public <E extends TableSchema<E>> MonitorHandle monitor(final DatabaseSchema dbSchema,
133 List<MonitorRequest<E>> monitorRequest,
134 final MonitorCallBack callback) {
136 final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
137 new Function<MonitorRequest<E>, String>() {
139 public String apply(MonitorRequest<E> input) {
140 return input.getTableName();
144 final MonitorHandle monitorHandle = new MonitorHandle(UUID.randomUUID().toString());
145 registerCallback(monitorHandle, callback, dbSchema);
147 ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
149 public List<Object> params() {
150 return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
153 Futures.addCallback(monitor, new FutureCallback<JsonNode>() {
155 public void onSuccess(JsonNode result) {
156 _transformingCallback(result, callback, dbSchema);
160 public void onFailure(Throwable t) {
161 callback.exception(t);
165 return monitorHandle;
168 private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
169 this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
170 setupUpdateListener();
174 public void cancelMonitor(MonitorHandle handler) {
175 throw new UnsupportedOperationException("not yet implemented");
179 public void lock(String lockId, LockAquisitionCallback lockedCallBack, LockStolenCallback stolenCallback) {
180 throw new UnsupportedOperationException("not yet implemented");
184 public ListenableFuture<Boolean> steal(String lockId) {
185 throw new UnsupportedOperationException("not yet implemented");
189 public ListenableFuture<Boolean> unLock(String lockId) {
190 throw new UnsupportedOperationException("not yet implemented");
194 public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
195 throw new UnsupportedOperationException("not yet implemented");
199 public void stopEchoService() {
200 throw new UnsupportedOperationException("not yet implemented");
204 public TransactionBuilder transactBuilder() {
205 return new TransactionBuilder(this);
209 public boolean isReady(int timeout) throws InterruptedException {
210 while (timeout > 0) {
211 if (!schema.isEmpty()) {
221 public ListenableFuture<List<String>> getDatabases() {
222 return rpc.list_dbs();
226 public ListenableFuture<DatabaseSchema> getSchema(final String database, final boolean cacheResult) {
228 DatabaseSchema databaseSchema = schema.get(database);
230 if (databaseSchema == null || cacheResult) {
231 return Futures.transform(
232 getSchemaFromDevice(Lists.newArrayList(database)),
233 new Function<Map<String, DatabaseSchema>, DatabaseSchema>() {
235 public DatabaseSchema apply(Map<String, DatabaseSchema> result) {
236 if (result.containsKey(database)) {
237 DatabaseSchema s = result.get(database);
238 s.populateInternallyGeneratedColumns();
240 OvsDBClientImpl.this.schema.put(database, s);
251 return Futures.immediateFuture(databaseSchema);
255 private ListenableFuture<Map<String, DatabaseSchema>> getSchemaFromDevice(final List<String> dbNames) {
256 Map<String, DatabaseSchema> schema = Maps.newHashMap();
257 SettableFuture<Map<String, DatabaseSchema>> future = SettableFuture.create();
258 _populateSchema(dbNames, schema, future);
262 private void _populateSchema(final List<String> dbNames,
263 final Map<String, DatabaseSchema> schema,
264 final SettableFuture<Map<String, DatabaseSchema>> sfuture) {
266 if (dbNames == null || dbNames.isEmpty()) {
270 Futures.transform(rpc.get_schema(Lists.newArrayList(dbNames.get(0))),
271 new com.google.common.base.Function<JsonNode, Void>() {
273 public Void apply(JsonNode jsonNode) {
275 schema.put(dbNames.get(0), DatabaseSchema.fromJson(dbNames.get(0), jsonNode));
276 if (schema.size() > 1 && !sfuture.isCancelled()) {
277 _populateSchema(dbNames.subList(1, dbNames.size()), schema, sfuture);
278 } else if (schema.size() == 1) {
281 } catch (Throwable e) {
282 sfuture.setException(e);
289 public void setRpc(OvsdbRPC rpc) {
293 public Queue<Throwable> getExceptions() {
297 static class CallbackContext {
298 MonitorCallBack monitorCallBack;
299 DatabaseSchema schema;
301 CallbackContext(MonitorCallBack monitorCallBack, DatabaseSchema schema) {
302 this.monitorCallBack = monitorCallBack;
303 this.schema = schema;
308 public DatabaseSchema getDatabaseSchema (String dbName) {
309 return schema.get(dbName);
313 * This method finds the DatabaseSchema that matches a given Typed Table Class.
314 * With the introduction of TypedTable and TypedColumn annotations, it is possible to express
315 * the Database Name, Table Name & the Database Versions within which the Table is defined and maintained.
317 * @param klazz Typed Class that represents a Table
318 * @return DatabaseSchema that matches a Typed Table Class
320 private <T> DatabaseSchema getDatabaseSchemaForTypedTable (Class <T> klazz) {
321 TypedTable typedTable = klazz.getAnnotation(TypedTable.class);
322 if (typedTable != null) {
323 return this.getDatabaseSchema(typedTable.database());
329 * User friendly convenient method that make use of TyperUtils.getTypedRowWrapper to create a Typed Row Proxy
330 * given the Typed Table Class
332 * @param klazz Typed Interface
333 * @return Proxy wrapper for the actual raw Row class.
336 public <T> T createTypedRowWrapper(Class<T> klazz) {
337 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
338 return this.createTypedRowWrapper(dbSchema, klazz);
342 * User friendly convenient method that make use of getTypedRowWrapper to create a Typed Row Proxy given
343 * DatabaseSchema and Typed Table Class.
345 * @param dbSchema Database Schema of interest
346 * @param klazz Typed Interface
347 * @return Proxy wrapper for the actual raw Row class.
350 public <T> T createTypedRowWrapper(DatabaseSchema dbSchema, Class<T> klazz) {
351 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, new Row<GenericTableSchema>());
355 * User friendly convenient method to get a Typed Row Proxy given a Typed Table Class and the Row to be wrapped.
357 * @param klazz Typed Interface
358 * @param row The actual Row that the wrapper is operating on. It can be null if the caller is just interested in getting ColumnSchema.
359 * @return Proxy wrapper for the actual raw Row class.
362 public <T> T getTypedRowWrapper(final Class<T> klazz, final Row<GenericTableSchema> row) {
363 DatabaseSchema dbSchema = getDatabaseSchemaForTypedTable(klazz);
364 return TyperUtils.getTypedRowWrapper(dbSchema, klazz, row);