24 from autopilot.exceptions
import ProcessSearchError
25 from autopilot.introspection
import get_proxy_object_for_existing_process
30 logger = logging.getLogger(__name__)
33 class JobError(Exception):
37 class CannotAccessUnity(Exception):
41 def unlock_unity(unity_proxy_obj=None):
42 """Helper function that attempts to unlock the unity greeter.
44 If unity_proxy_object is None create a proxy object by querying for the
45 running unity process.
46 Otherwise re-use the passed proxy object.
48 :raises RuntimeError: if the greeter attempts and fails to be unlocked.
50 :raises RuntimeWarning: when the greeter cannot be found because it is
52 :raises CannotAccessUnity: if unity is not introspectable or cannot be
54 :raises CannotAccessUnity: if unity's upstart status is not "start" or the
55 upstart job cannot be found at all.
58 if unity_proxy_obj
is None:
60 pid = _get_unity_pid()
61 unity = _get_unity_proxy_object(pid)
62 main_window = unity.select_single(main_window_emulator.QQuickView)
63 except ProcessSearchError
as e:
64 raise CannotAccessUnity(
65 "Cannot introspect unity, make sure that it has been started "
66 "with testability. Perhaps use the function "
67 "'restart_unity_with_testability' this module provides."
72 main_window = unity_proxy_obj.select_single(
73 main_window_emulator.QQuickView)
75 greeter = main_window.get_greeter()
76 if greeter.created
is False:
77 raise RuntimeWarning(
"Greeter appears to be already unlocked.")
79 bus = dbus.SessionBus()
80 dbus_proxy = bus.get_object(
"com.canonical.UnityGreeter",
"/")
82 dbus_proxy.HideGreeter()
83 except dbus.DBusException:
84 logger.info(
"Failed to unlock greeter")
87 greeter.created.wait_for(
False)
88 logger.info(
"Greeter unlocked, continuing.")
91 def lock_unity(unity_proxy_obj=None):
92 """Helper function that attempts to lock unity greeter.
94 If unity_proxy_object is None create a proxy object by querying for the
95 running unity process.
96 Otherwise re-use the passed proxy object.
98 :raises RuntimeError: if the greeter attempts and fails to be locked.
100 :raises RuntimeWarning: when the greeter is found because it is
102 :raises CannotAccessUnity: if unity is not introspectable or cannot be
104 :raises CannotAccessUnity: if unity's upstart status is not "start" or the
105 upstart job cannot be found at all.
108 if unity_proxy_obj
is None:
110 pid = _get_unity_pid()
111 unity = _get_unity_proxy_object(pid)
112 main_window = unity.select_single(main_window_emulator.QQuickView)
113 except ProcessSearchError
as e:
114 raise CannotAccessUnity(
115 "Cannot introspect unity, make sure that it has been started "
116 "with testability. Perhaps use the function "
117 "'restart_unity_with_testability' this module provides."
122 main_window = unity_proxy_obj.select_single(
123 main_window_emulator.QQuickView)
125 greeter = main_window.get_greeter()
126 if greeter.created
is True:
127 raise RuntimeWarning(
"Greeter appears to be already locked.")
129 bus = dbus.SessionBus()
130 dbus_proxy = bus.get_object(
"com.canonical.UnityGreeter",
"/")
132 dbus_proxy.ShowGreeter()
133 except dbus.DBusException:
134 logger.info(
"Failed to lock greeter")
137 greeter.created.wait_for(
True)
138 logger.info(
"Greeter locked, continuing.")
141 def restart_unity_with_testability(*args):
142 """Restarts (or starts) unity with testability enabled.
144 Passes *args arguments to the launched process.
147 args += (
"QT_LOAD_TESTABILITY=1",)
148 return restart_unity(*args)
151 def restart_unity(*args):
152 """Restarts (or starts) unity8 using the provided arguments.
154 Passes *args arguments to the launched process.
156 :raises subprocess.CalledProcessError: if unable to stop or start the
160 status = _get_unity_status()
161 if "start/" in status:
164 pid = start_job(
'unity8', *args)
165 return _get_unity_proxy_object(pid)
168 def start_job(name, *args):
171 :param str name: The name of the job.
172 :param args: The arguments to be used when starting the job.
173 :return: The process id of the started job.
174 :raises CalledProcessError: if the job failed to start.
177 logger.info(
'Starting job {} with arguments {}.'.format(name, args))
178 command = [
'/sbin/initctl',
'start', name] + list(args)
180 output = subprocess.check_output(
182 stderr=subprocess.STDOUT,
183 universal_newlines=
True,
186 pid = get_job_pid(name)
187 except subprocess.CalledProcessError
as e:
188 e.args += (
'Failed to start {}: {}.'.format(name, e.output),)
194 def get_job_pid(name):
195 """Return the process id of a running job.
197 :param str name: The name of the job.
198 :raises JobError: if the job is not running.
201 status = get_job_status(name)
202 if "start/" not in status:
203 raise JobError(
'{} is not in the running state.'.format(name))
204 return int(status.split()[-1])
207 def get_job_status(name):
208 """Return the status of a job.
210 :param str name: The name of the job.
211 :raises JobError: if it's not possible to get the status of the job.
215 return subprocess.check_output([
219 ], universal_newlines=
True)
220 except subprocess.CalledProcessError
as error:
222 "Unable to get {}'s status: {}".format(name, error)
229 :param str name: The name of the job.
230 :raises CalledProcessError: if the job failed to stop.
233 logger.info(
'Stoping job {}.'.format(name))
234 command = [
'/sbin/initctl',
'stop', name]
236 output = subprocess.check_output(
238 stderr=subprocess.STDOUT,
239 universal_newlines=
True,
242 except subprocess.CalledProcessError
as e:
243 e.args += (
'Failed to stop {}: {}.'.format(name, e.output),)
247 def is_job_running(name):
248 """Return True if the job is running. Otherwise, False.
250 :param str name: The name of the job.
251 :raises JobError: if it's not possible to get the status of the job.
254 return 'start/' in get_job_status(name)
257 def _get_unity_status():
259 return get_job_status(
'unity8')
260 except JobError
as error:
261 raise CannotAccessUnity(str(error))
264 def _get_unity_pid():
266 return get_job_pid(
'unity8')
267 except JobError
as error:
268 raise CannotAccessUnity(str(error))
271 def _get_unity_proxy_object(pid):
272 return get_proxy_object_for_existing_process(
274 emulator_base=emulators.UnityEmulatorBase,