rest_dbus: implement client subscription API
Import gobject for running the d-bus event loop, and gevent for
a greenlets support.
Resolves openbmc/openbmc#2318.
Change-Id: Ieaf2e6badcc22159f7470e3fdf949b358fc3e3fe
Signed-off-by: Deepak Kodihalli <dkodihal@in.ibm.com>
diff --git a/module/obmc/wsgi/apps/rest_dbus.py b/module/obmc/wsgi/apps/rest_dbus.py
index 1ad359c..323e7da 100644
--- a/module/obmc/wsgi/apps/rest_dbus.py
+++ b/module/obmc/wsgi/apps/rest_dbus.py
@@ -29,6 +29,16 @@
import crypt
import tempfile
import re
+have_wsock = True
+try:
+ from geventwebsocket import WebSocketError
+except ImportError:
+ have_wsock = False
+if have_wsock:
+ from dbus.mainloop.glib import DBusGMainLoop
+ DBusGMainLoop(set_as_default=True)
+ import gobject
+ import gevent
DBUS_UNKNOWN_INTERFACE = 'org.freedesktop.UnknownInterface'
DBUS_UNKNOWN_INTERFACE_ERROR = 'org.freedesktop.DBus.Error.UnknownInterface'
@@ -719,6 +729,81 @@
pass
+class EventNotifier:
+ keyNames = {}
+ keyNames['event'] = 'event'
+ keyNames['path'] = 'path'
+ keyNames['intfMap'] = 'interfaces'
+ keyNames['propMap'] = 'properties'
+ keyNames['intf'] = 'interface'
+
+ def __init__(self, wsock, filters):
+ self.wsock = wsock
+ self.paths = filters.get("paths", [])
+ self.interfaces = filters.get("interfaces", [])
+ if not self.paths:
+ self.paths.append(None)
+ bus = dbus.SystemBus()
+ # Add a signal receiver for every path the client is interested in
+ for path in self.paths:
+ bus.add_signal_receiver(
+ self.interfaces_added_handler,
+ dbus_interface=dbus.BUS_DAEMON_IFACE + '.ObjectManager',
+ signal_name='InterfacesAdded',
+ path=path)
+ bus.add_signal_receiver(
+ self.properties_changed_handler,
+ dbus_interface=dbus.PROPERTIES_IFACE,
+ signal_name='PropertiesChanged',
+ path=path,
+ path_keyword='path')
+ loop = gobject.MainLoop()
+ # gobject's mainloop.run() will block the entire process, so the gevent
+ # scheduler and hence greenlets won't execute. The while-loop below
+ # works around this limitation by using gevent's sleep, instead of
+ # calling loop.run()
+ gcontext = loop.get_context()
+ while loop is not None:
+ try:
+ if gcontext.pending():
+ gcontext.iteration()
+ else:
+ # gevent.sleep puts only the current greenlet to sleep,
+ # not the entire process.
+ gevent.sleep(5)
+ except WebSocketError:
+ break
+
+ def interfaces_added_handler(self, path, iprops, **kw):
+ ''' If the client is interested in these changes, respond to the
+ client. This handles d-bus interface additions.'''
+ if (not self.interfaces) or \
+ (not set(iprops).isdisjoint(self.interfaces)):
+ response = {}
+ response[self.keyNames['event']] = "InterfacesAdded"
+ response[self.keyNames['path']] = path
+ response[self.keyNames['intfMap']] = iprops
+ try:
+ self.wsock.send(json.dumps(response))
+ except WebSocketError:
+ return
+
+ def properties_changed_handler(self, interface, new, old, **kw):
+ ''' If the client is interested in these changes, respond to the
+ client. This handles d-bus property changes. '''
+ if (not self.interfaces) or (interface in self.interfaces):
+ path = str(kw['path'])
+ response = {}
+ response[self.keyNames['event']] = "PropertiesChanged"
+ response[self.keyNames['path']] = path
+ response[self.keyNames['intf']] = interface
+ response[self.keyNames['propMap']] = new
+ try:
+ self.wsock.send(json.dumps(response))
+ except WebSocketError:
+ return
+
+
class EventHandler(RouteHandler):
''' Handles the /subscribe route, for clients to be able
to subscribe to BMC events. '''
@@ -740,7 +825,9 @@
wsock = request.environ.get('wsgi.websocket')
if not wsock:
abort(400, 'Expected WebSocket request.')
- wsock.send("Connected")
+ filters = wsock.receive()
+ filters = json.loads(filters)
+ notifier = EventNotifier(wsock, filters)
class ImagePutHandler(RouteHandler):