Generalised Status Objects
DBUS demo
#!/usr/bin/python
import sys
import os
import subprocess
import time
import signal
import dbus
import gobject
import glib
import dbus.glib
import dbus.mainloop.glib
import dbus.service
INTERFACE = "test.DBUSInterface"
OBJECT_PATH = "/test/DBUSInterface"
SERVICE = "test.DBUSInterface.service"
START_COMMAND = "start"
class TestService(dbus.service.Object):
def __init__(self, connection, mainloop):
dbus.service.Object.__init__(self, connection, OBJECT_PATH)
self.mainloop = mainloop
@dbus.service.method(INTERFACE)
def TestMethod1(self, arg):
return arg/2
@dbus.service.method(INTERFACE, 'd', 's')
def TestMethod2(self, arg):
return str(arg/2)
@dbus.service.method(INTERFACE)
def PySVNTest(self, path):
import pysvn
import rabbitvcs.util._locale
rabbitvcs.util._locale.initialize_locale()
cl = pysvn.Client()
st = cl.status(unicode(path))
import rabbitvcs.vcs.status
st2 = rabbitvcs.vcs.status.SVNStatus(st[0])
import cPickle
st2_as_s = cPickle.dumps(st2)
return st2_as_s
@dbus.service.method(INTERFACE)
def PID(self):
return os.getpid()
@dbus.service.method(INTERFACE)
def Quit(self):
""" Quits the service, performing any necessary cleanup operations.
You can call this from the command line with:
dbus-send --print-reply \
--dest=test.DBUSInterface.service \
/test/DBUSInterface \
test.DBUSInterface.Quit
If calling this programmatically, then you can do "os.waitpid(pid, 0)"
on the returned PID to prevent a zombie process.
"""
print "Quitting main loop..."
self.mainloop.quit()
return self.PID()
def StartService():
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
session_bus = dbus.SessionBus()
service_name = dbus.service.BusName(SERVICE, session_bus)
mainloop = gobject.MainLoop()
TestService(session_bus, mainloop)
try:
mainloop.run()
except KeyboardInterrupt:
sys.exit(1)
print "Ended service"
def RunTests():
import cPickle
session_bus = dbus.SessionBus()
service = session_bus.get_object(SERVICE,
OBJECT_PATH)
try:
print "Service claims its PID is: %s" % service.PID()
arg1 = 1
test1 = service.TestMethod1(arg1)
print "Result of TestMethod1(%s) = %s (%s)" % (arg1, test1, type(test1))
test2 = service.TestMethod2(arg1)
print "Result of TestMethod2(%s) = %s (%s)" % (arg1, test2, type(test2))
testpath = u"/home/jason/Software/svntest/svntest"
test3 = cPickle.loads(str(service.PySVNTest(testpath)))
print "Result of PySVNTest(%s) = %s (%s)" % (testpath, test3, type(test3))
finally:
pid = 0
try:
print "Quitting service..."
pid = service.Quit()
except dbus.exceptions.DBusException:
# Ignore it, it will necessarily happen when we kill the service
pass
else:
try:
if pid: os.waitpid(pid, 0)
except OSError:
# This occurs if the process is already gone.
pass
print "Done in TestService()"
if __name__ == "__main__":
args = sys.argv
if(len(args) > 1) and args[1] == START_COMMAND:
StartService()
else:
service_proc = subprocess.Popen(
[sys.executable, __file__, START_COMMAND],
stdout=subprocess.PIPE)
time.sleep(1)
print "Service PID should be: %s" % service_proc.pid
try:
RunTests()
finally:
print "Killing service forcefully..."
try:
os.kill(service_proc.pid, signal.SIGINT)
except OSError:
pass
print "Completed DBUS demo"