Unity 8
process_helpers.py
1 # -*- Mode: Python; coding: utf-8; indent-tabs-mode: nil; tab-width: 4 -*-
2 #
3 # Unity Autopilot Utilities
4 # Copyright (C) 2013, 2014, 2015 Canonical
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #
19 
20 import logging
21 import subprocess
22 import dbus
23 
24 from autopilot.exceptions import ProcessSearchError
25 from autopilot.introspection import get_proxy_object_for_existing_process
26 
27 from unity8.shell import emulators
28 from unity8.shell.emulators import main_window as main_window_emulator
29 
30 logger = logging.getLogger(__name__)
31 
32 
33 class JobError(Exception):
34  pass
35 
36 
37 class CannotAccessUnity(Exception):
38  pass
39 
40 
41 def unlock_unity(unity_proxy_obj=None):
42  """Helper function that attempts to unlock the unity greeter.
43 
44  If unity_proxy_object is None create a proxy object by querying for the
45  running unity process.
46  Otherwise re-use the passed proxy object.
47 
48  :raises RuntimeError: if the greeter attempts and fails to be unlocked.
49 
50  :raises RuntimeWarning: when the greeter cannot be found because it is
51  already unlocked.
52  :raises CannotAccessUnity: if unity is not introspectable or cannot be
53  found on dbus.
54  :raises CannotAccessUnity: if unity's upstart status is not "start" or the
55  upstart job cannot be found at all.
56 
57  """
58  if unity_proxy_obj is None:
59  try:
60  pid = _get_unity_pid()
61  unity = _get_unity_proxy_object(pid)
62  main_window = unity.select_single(main_window_emulator.QQuickView)
63  except ProcessSearchError as e:
64  raise CannotAccessUnity(
65  "Cannot introspect unity, make sure that it has been started "
66  "with testability. Perhaps use the function "
67  "'restart_unity_with_testability' this module provides."
68  "(%s)"
69  % str(e)
70  )
71  else:
72  main_window = unity_proxy_obj.select_single(
73  main_window_emulator.QQuickView)
74 
75  greeter = main_window.get_greeter()
76  if greeter.created is False:
77  raise RuntimeWarning("Greeter appears to be already unlocked.")
78 
79  bus = dbus.SessionBus()
80  dbus_proxy = bus.get_object("com.canonical.UnityGreeter", "/")
81  try:
82  dbus_proxy.HideGreeter()
83  except dbus.DBusException:
84  logger.info("Failed to unlock greeter")
85  raise
86  else:
87  greeter.created.wait_for(False)
88  logger.info("Greeter unlocked, continuing.")
89 
90 
91 def lock_unity(unity_proxy_obj=None):
92  """Helper function that attempts to lock unity greeter.
93 
94  If unity_proxy_object is None create a proxy object by querying for the
95  running unity process.
96  Otherwise re-use the passed proxy object.
97 
98  :raises RuntimeError: if the greeter attempts and fails to be locked.
99 
100  :raises RuntimeWarning: when the greeter is found because it is
101  already locked.
102  :raises CannotAccessUnity: if unity is not introspectable or cannot be
103  found on dbus.
104  :raises CannotAccessUnity: if unity's upstart status is not "start" or the
105  upstart job cannot be found at all.
106 
107  """
108  if unity_proxy_obj is None:
109  try:
110  pid = _get_unity_pid()
111  unity = _get_unity_proxy_object(pid)
112  main_window = unity.select_single(main_window_emulator.QQuickView)
113  except ProcessSearchError as e:
114  raise CannotAccessUnity(
115  "Cannot introspect unity, make sure that it has been started "
116  "with testability. Perhaps use the function "
117  "'restart_unity_with_testability' this module provides."
118  "(%s)"
119  % str(e)
120  )
121  else:
122  main_window = unity_proxy_obj.select_single(
123  main_window_emulator.QQuickView)
124 
125  greeter = main_window.get_greeter()
126  if greeter.created is True:
127  raise RuntimeWarning("Greeter appears to be already locked.")
128 
129  bus = dbus.SessionBus()
130  dbus_proxy = bus.get_object("com.canonical.UnityGreeter", "/")
131  try:
132  dbus_proxy.ShowGreeter()
133  except dbus.DBusException:
134  logger.info("Failed to lock greeter")
135  raise
136  else:
137  greeter.created.wait_for(True)
138  logger.info("Greeter locked, continuing.")
139 
140 
141 def restart_unity_with_testability(*args):
142  """Restarts (or starts) unity with testability enabled.
143 
144  Passes *args arguments to the launched process.
145 
146  """
147  args += ("QT_LOAD_TESTABILITY=1",)
148  return restart_unity(*args)
149 
150 
151 def restart_unity(*args):
152  """Restarts (or starts) unity8 using the provided arguments.
153 
154  Passes *args arguments to the launched process.
155 
156  :raises subprocess.CalledProcessError: if unable to stop or start the
157  unity8 upstart job.
158 
159  """
160  status = _get_unity_status()
161  if "start/" in status:
162  stop_job('unity8')
163 
164  pid = start_job('unity8', *args)
165  return _get_unity_proxy_object(pid)
166 
167 
168 def start_job(name, *args):
169  """Start a job.
170 
171  :param str name: The name of the job.
172  :param args: The arguments to be used when starting the job.
173  :return: The process id of the started job.
174  :raises CalledProcessError: if the job failed to start.
175 
176  """
177  logger.info('Starting job {} with arguments {}.'.format(name, args))
178  command = ['/sbin/initctl', 'start', name] + list(args)
179  try:
180  output = subprocess.check_output(
181  command,
182  stderr=subprocess.STDOUT,
183  universal_newlines=True,
184  )
185  logger.info(output)
186  pid = get_job_pid(name)
187  except subprocess.CalledProcessError as e:
188  e.args += ('Failed to start {}: {}.'.format(name, e.output),)
189  raise
190  else:
191  return pid
192 
193 
194 def get_job_pid(name):
195  """Return the process id of a running job.
196 
197  :param str name: The name of the job.
198  :raises JobError: if the job is not running.
199 
200  """
201  status = get_job_status(name)
202  if "start/" not in status:
203  raise JobError('{} is not in the running state.'.format(name))
204  return int(status.split()[-1])
205 
206 
207 def get_job_status(name):
208  """Return the status of a job.
209 
210  :param str name: The name of the job.
211  :raises JobError: if it's not possible to get the status of the job.
212 
213  """
214  try:
215  return subprocess.check_output([
216  '/sbin/initctl',
217  'status',
218  name
219  ], universal_newlines=True)
220  except subprocess.CalledProcessError as error:
221  raise JobError(
222  "Unable to get {}'s status: {}".format(name, error)
223  )
224 
225 
226 def stop_job(name):
227  """Stop a job.
228 
229  :param str name: The name of the job.
230  :raises CalledProcessError: if the job failed to stop.
231 
232  """
233  logger.info('Stoping job {}.'.format(name))
234  command = ['/sbin/initctl', 'stop', name]
235  try:
236  output = subprocess.check_output(
237  command,
238  stderr=subprocess.STDOUT,
239  universal_newlines=True,
240  )
241  logger.info(output)
242  except subprocess.CalledProcessError as e:
243  e.args += ('Failed to stop {}: {}.'.format(name, e.output),)
244  raise
245 
246 
247 def is_job_running(name):
248  """Return True if the job is running. Otherwise, False.
249 
250  :param str name: The name of the job.
251  :raises JobError: if it's not possible to get the status of the job.
252 
253  """
254  return 'start/' in get_job_status(name)
255 
256 
257 def _get_unity_status():
258  try:
259  return get_job_status('unity8')
260  except JobError as error:
261  raise CannotAccessUnity(str(error))
262 
263 
264 def _get_unity_pid():
265  try:
266  return get_job_pid('unity8')
267  except JobError as error:
268  raise CannotAccessUnity(str(error))
269 
270 
271 def _get_unity_proxy_object(pid):
272  return get_proxy_object_for_existing_process(
273  pid=pid,
274  emulator_base=emulators.UnityEmulatorBase,
275  )