#!/usr/bin/python import sys import os import subprocess import time import signal import dbus import gobject import glib import dbus.glib import dbus.mainloop.glib import dbus.service INTERFACE = "test.DBUSInterface" OBJECT_PATH = "/test/DBUSInterface" SERVICE = "test.DBUSInterface.service" START_COMMAND = "start" class TestService(dbus.service.Object): def __init__(self, connection, mainloop): dbus.service.Object.__init__(self, connection, OBJECT_PATH) self.mainloop = mainloop @dbus.service.method(INTERFACE) def TestMethod1(self, arg): return arg/2 @dbus.service.method(INTERFACE, 'd', 's') def TestMethod2(self, arg): return str(arg/2) @dbus.service.method(INTERFACE) def PySVNTest(self, path): import pysvn import rabbitvcs.util._locale rabbitvcs.util._locale.initialize_locale() cl = pysvn.Client() st = cl.status(unicode(path)) import rabbitvcs.vcs.status st2 = rabbitvcs.vcs.status.SVNStatus(st[0]) import cPickle st2_as_s = cPickle.dumps(st2) return st2_as_s @dbus.service.method(INTERFACE) def PID(self): return os.getpid() @dbus.service.method(INTERFACE) def Quit(self): """ Quits the service, performing any necessary cleanup operations. You can call this from the command line with: dbus-send --print-reply \ --dest=test.DBUSInterface.service \ /test/DBUSInterface \ test.DBUSInterface.Quit If calling this programmatically, then you can do "os.waitpid(pid, 0)" on the returned PID to prevent a zombie process. """ print "Quitting main loop..." self.mainloop.quit() return self.PID() def StartService(): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) session_bus = dbus.SessionBus() service_name = dbus.service.BusName(SERVICE, session_bus) mainloop = gobject.MainLoop() TestService(session_bus, mainloop) try: mainloop.run() except KeyboardInterrupt: sys.exit(1) print "Ended service" def RunTests(): import cPickle session_bus = dbus.SessionBus() service = session_bus.get_object(SERVICE, OBJECT_PATH) try: print "Service claims its PID is: %s" % service.PID() arg1 = 1 test1 = service.TestMethod1(arg1) print "Result of TestMethod1(%s) = %s (%s)" % (arg1, test1, type(test1)) test2 = service.TestMethod2(arg1) print "Result of TestMethod2(%s) = %s (%s)" % (arg1, test2, type(test2)) testpath = u"/home/jason/Software/svntest/svntest" test3 = cPickle.loads(str(service.PySVNTest(testpath))) print "Result of PySVNTest(%s) = %s (%s)" % (testpath, test3, type(test3)) finally: pid = 0 try: print "Quitting service..." pid = service.Quit() except dbus.exceptions.DBusException: # Ignore it, it will necessarily happen when we kill the service pass else: try: if pid: os.waitpid(pid, 0) except OSError: # This occurs if the process is already gone. pass print "Done in TestService()" if __name__ == "__main__": args = sys.argv if(len(args) > 1) and args[1] == START_COMMAND: StartService() else: service_proc = subprocess.Popen( [sys.executable, __file__, START_COMMAND], stdout=subprocess.PIPE) time.sleep(1) print "Service PID should be: %s" % service_proc.pid try: RunTests() finally: print "Killing service forcefully..." try: os.kill(service_proc.pid, signal.SIGINT) except OSError: pass print "Completed DBUS demo"