Replace os.system() with subprocess.check_call()

Non-zero return codes should raise an exception, not pass silently.
pull/920/head
Alex Willmer 2 years ago
parent 1287d58a54
commit 0417d4d73a

@ -2,6 +2,7 @@
# Run tests/ansible/all.yml under Ansible and Ansible-Mitogen
import os
import subprocess
import sys
import ci_lib
@ -24,29 +25,30 @@ with ci_lib.Fold('job_setup'):
# NOTE: sshpass v1.06 causes errors so pegging to 1.05 -> "msg": "Error when changing password","out": "passwd: DS error: eDSAuthFailed\n",
# there's a checksum error with "brew install http://git.io/sshpass.rb" though, so installing manually
if not ci_lib.exists_in_path('sshpass'):
os.system("curl -O -L https://sourceforge.net/projects/sshpass/files/sshpass/1.05/sshpass-1.05.tar.gz && \
subprocess.check_call(
"curl -O -L https://sourceforge.net/projects/sshpass/files/sshpass/1.05/sshpass-1.05.tar.gz && \
tar xvf sshpass-1.05.tar.gz && \
cd sshpass-1.05 && \
./configure && \
sudo make install")
sudo make install",
shell=True,
)
with ci_lib.Fold('machine_prep'):
# generate a new ssh key for localhost ssh
if not os.path.exists(os.path.expanduser("~/.ssh/id_rsa")):
os.system("ssh-keygen -P '' -m pem -f ~/.ssh/id_rsa")
os.system("cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys")
subprocess.check_call("ssh-keygen -P '' -m pem -f ~/.ssh/id_rsa", shell=True)
subprocess.check_call("cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys", shell=True)
os.chmod(os.path.expanduser('~/.ssh'), int('0700', 8))
os.chmod(os.path.expanduser('~/.ssh/authorized_keys'), int('0600', 8))
# also generate it for the sudo user
if os.system("sudo [ -f /var/root/.ssh/id_rsa ]") != 0:
os.system("sudo ssh-keygen -P '' -m pem -f /var/root/.ssh/id_rsa")
os.system("sudo cat /var/root/.ssh/id_rsa.pub | sudo tee -a /var/root/.ssh/authorized_keys")
os.chmod(os.path.expanduser('~/.ssh'), int('0700', 8))
os.chmod(os.path.expanduser('~/.ssh/authorized_keys'), int('0600', 8))
# run chmod through sudo since it's owned by root
os.system('sudo chmod 700 /var/root/.ssh')
os.system('sudo chmod 600 /var/root/.ssh/authorized_keys')
if os.system("sudo [ -f ~root/.ssh/id_rsa ]") != 0:
subprocess.check_call("sudo ssh-keygen -P '' -m pem -f ~root/.ssh/id_rsa", shell=True)
subprocess.check_call("sudo cat ~root/.ssh/id_rsa.pub | sudo tee -a ~root/.ssh/authorized_keys", shell=True)
subprocess.check_call('sudo chmod 700 ~root/.ssh', shell=True)
subprocess.check_call('sudo chmod 600 ~root/.ssh/authorized_keys', shell=True)
if os.path.expanduser('~mitogen__user1') == '~mitogen__user1':
os.chdir(IMAGE_PREP_DIR)

@ -201,7 +201,7 @@ nested.py:
print('Connect local%d via %s' % (x, context))
context = router.local(via=context, name='local%d' % x)
context.call(os.system, 'pstree -s python -s mitogen')
context.call(subprocess.check_call, ['pstree', '-s', 'python', '-s', 'mitogen'])
Output:

@ -101,7 +101,7 @@ to your network topology**.
container='billing0',
)
internal_box.call(os.system, './run-nightly-billing.py')
internal_box.call(subprocess.check_call, ['./run-nightly-billing.py'])
The multiplexer also ensures the remote process is terminated if your Python
program crashes, communication is lost, or the application code running in the
@ -250,7 +250,7 @@ After:
"""
Install our application.
"""
os.system('tar zxvf app.tar.gz')
subprocess.check_call(['tar', 'zxvf', 'app.tar.gz'])
context.call(install_app)
@ -258,7 +258,7 @@ Or even:
.. code-block:: python
context.call(os.system, 'tar zxvf app.tar.gz')
context.call(subprocess.check_call, ['tar', 'zxvf', 'app.tar.gz'])
Exceptions raised by function calls are propagated back to the parent program,
and timeouts can be configured to ensure failed calls do not block progress of

@ -8,14 +8,14 @@ Usage:
Where:
<hostname> Hostname to install to.
"""
import os
import subprocess
import sys
import mitogen
def install_app():
os.system('tar zxvf my_app.tar.gz')
subprocess.check_call(['tar', 'zxvf', 'my_app.tar.gz'])
@mitogen.main()

@ -374,9 +374,15 @@ class TestCase(unittest.TestCase):
mitogen.core.Latch._on_fork()
if get_fd_count() != self._fd_count_before:
if sys.platform == 'linux':
os.system('lsof +E -w -p %i | grep -vw mem' % (os.getpid(),))
subprocess.check_call(
'lsof +E -w -p %i | grep -vw mem' % (os.getpid(),),
shell=True,
)
else:
os.system('lsof -w -p %i | grep -vw mem' % (os.getpid(),))
subprocess.check_call(
'lsof -w -p %i | grep -vw mem' % (os.getpid(),),
shell=True,
)
assert 0, "%s leaked FDs. Count before: %s, after: %s" % (
self, self._fd_count_before, get_fd_count(),
)
@ -410,12 +416,18 @@ class TestCase(unittest.TestCase):
return
print('Leaked children of unit test process:')
os.system('ps -o "user,pid,%%cpu,%%mem,vsz,rss,tty,stat,start,time,command" -ww -p %s'
% (','.join(str(p.pid) for p in children_leaked),))
subprocess.check_call(
['ps', '-o', 'user,pid,%cpu,%mem,vsz,rss,tty,stat,start,time,command', '-ww', '-p',
','.join(str(p.pid) for p in children_leaked),
],
)
if self._children_before:
print('Pre-existing children of unit test process:')
os.system('ps -o "user,pid,%%cpu,%%mem,vsz,rss,tty,stat,start,time,command" -ww -p %s'
% (','.join(str(p.pid) for p in self._children_before),))
subprocess.check_call(
['ps', '-o', 'user,pid,%cpu,%mem,vsz,rss,tty,stat,start,time,command', '-ww', '-p',
','.join(str(p.pid) for p in self._children_before),
],
)
assert 0, "%s leaked still-running subprocesses." % (self,)
def tearDown(self):

Loading…
Cancel
Save