Generalised Status Objects

DBUS demo

#!/usr/bin/python
 
import sys
import os
import subprocess
import time
import signal
 
import dbus
import gobject
import glib
import dbus.glib
import dbus.mainloop.glib
import dbus.service
 
INTERFACE = "test.DBUSInterface"
OBJECT_PATH = "/test/DBUSInterface"
SERVICE = "test.DBUSInterface.service"
START_COMMAND = "start"
 
class TestService(dbus.service.Object):
 
    def __init__(self, connection, mainloop):
        dbus.service.Object.__init__(self, connection, OBJECT_PATH)
        self.mainloop = mainloop
 
    @dbus.service.method(INTERFACE)
    def TestMethod1(self, arg):
        return arg/2
 
    @dbus.service.method(INTERFACE, 'd', 's')
    def TestMethod2(self, arg):
        return str(arg/2)
 
    @dbus.service.method(INTERFACE)
    def PySVNTest(self, path):
        import pysvn
        import rabbitvcs.util._locale
        rabbitvcs.util._locale.initialize_locale()
        cl = pysvn.Client()        
        st = cl.status(unicode(path))
        import rabbitvcs.vcs.status
        st2 = rabbitvcs.vcs.status.SVNStatus(st[0])
        import cPickle
        st2_as_s = cPickle.dumps(st2)
        return st2_as_s
 
 
    @dbus.service.method(INTERFACE)
    def PID(self):
        return os.getpid()
 
    @dbus.service.method(INTERFACE)
    def Quit(self):
        """ Quits the service, performing any necessary cleanup operations.
 
        You can call this from the command line with:
 
        dbus-send --print-reply \
        --dest=test.DBUSInterface.service \
        /test/DBUSInterface \
        test.DBUSInterface.Quit
 
        If calling this programmatically, then you can do "os.waitpid(pid, 0)"
        on the returned PID to prevent a zombie process.
        """
        print "Quitting main loop..."
        self.mainloop.quit()
        return self.PID()
 
 
def StartService():
 
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 
    session_bus = dbus.SessionBus()
    service_name = dbus.service.BusName(SERVICE, session_bus)
 
    mainloop = gobject.MainLoop()
 
    TestService(session_bus, mainloop)
 
    try:
        mainloop.run()
    except KeyboardInterrupt:
        sys.exit(1)
 
    print "Ended service"
 
def RunTests():
 
    import cPickle
 
    session_bus = dbus.SessionBus()
    service = session_bus.get_object(SERVICE,
                                     OBJECT_PATH)
 
    try:
 
        print "Service claims its PID is: %s" % service.PID()
 
        arg1 = 1
 
        test1 = service.TestMethod1(arg1)
 
        print "Result of TestMethod1(%s) = %s (%s)" % (arg1, test1, type(test1))
 
        test2 = service.TestMethod2(arg1)    
 
        print "Result of TestMethod2(%s) = %s (%s)" % (arg1, test2, type(test2))
 
        testpath = u"/home/jason/Software/svntest/svntest"
        test3 = cPickle.loads(str(service.PySVNTest(testpath)))
 
        print "Result of PySVNTest(%s) = %s (%s)" % (testpath, test3, type(test3))
 
    finally:
 
        pid = 0
 
        try:
            print "Quitting service..."
            pid = service.Quit()
        except dbus.exceptions.DBusException:
            # Ignore it, it will necessarily happen when we kill the service
            pass
        else:        
            try:
                if pid: os.waitpid(pid, 0)
            except OSError:
                # This occurs if the process is already gone.
                pass
 
    print "Done in TestService()"
 
 
if __name__ == "__main__":
 
    args = sys.argv
 
    if(len(args) > 1) and args[1] == START_COMMAND:
        StartService()
    else:
        service_proc = subprocess.Popen(
            [sys.executable, __file__, START_COMMAND],
            stdout=subprocess.PIPE)
 
        time.sleep(1)
 
        print "Service PID should be: %s" % service_proc.pid            
 
        try:
            RunTests()
        finally:
            print "Killing service forcefully..."
 
            try:
                os.kill(service_proc.pid, signal.SIGINT)
            except OSError:
                pass
 
    print "Completed DBUS demo"