summaryrefslogtreecommitdiffstats
path: root/qemu/roms/u-boot/tools/patman/cros_subprocess.py
diff options
context:
space:
mode:
Diffstat (limited to 'qemu/roms/u-boot/tools/patman/cros_subprocess.py')
-rw-r--r--qemu/roms/u-boot/tools/patman/cros_subprocess.py397
1 files changed, 0 insertions, 397 deletions
diff --git a/qemu/roms/u-boot/tools/patman/cros_subprocess.py b/qemu/roms/u-boot/tools/patman/cros_subprocess.py
deleted file mode 100644
index 0fc4a06b5..000000000
--- a/qemu/roms/u-boot/tools/patman/cros_subprocess.py
+++ /dev/null
@@ -1,397 +0,0 @@
-# Copyright (c) 2012 The Chromium OS Authors.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-#
-# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
-# Licensed to PSF under a Contributor Agreement.
-# See http://www.python.org/2.4/license for licensing details.
-
-"""Subprocress execution
-
-This module holds a subclass of subprocess.Popen with our own required
-features, mainly that we get access to the subprocess output while it
-is running rather than just at the end. This makes it easiler to show
-progress information and filter output in real time.
-"""
-
-import errno
-import os
-import pty
-import select
-import subprocess
-import sys
-import unittest
-
-
-# Import these here so the caller does not need to import subprocess also.
-PIPE = subprocess.PIPE
-STDOUT = subprocess.STDOUT
-PIPE_PTY = -3 # Pipe output through a pty
-stay_alive = True
-
-
-class Popen(subprocess.Popen):
- """Like subprocess.Popen with ptys and incremental output
-
- This class deals with running a child process and filtering its output on
- both stdout and stderr while it is running. We do this so we can monitor
- progress, and possibly relay the output to the user if requested.
-
- The class is similar to subprocess.Popen, the equivalent is something like:
-
- Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
- But this class has many fewer features, and two enhancement:
-
- 1. Rather than getting the output data only at the end, this class sends it
- to a provided operation as it arrives.
- 2. We use pseudo terminals so that the child will hopefully flush its output
- to us as soon as it is produced, rather than waiting for the end of a
- line.
-
- Use CommunicateFilter() to handle output from the subprocess.
-
- """
-
- def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
- shell=False, cwd=None, env=None, **kwargs):
- """Cut-down constructor
-
- Args:
- args: Program and arguments for subprocess to execute.
- stdin: See subprocess.Popen()
- stdout: See subprocess.Popen(), except that we support the sentinel
- value of cros_subprocess.PIPE_PTY.
- stderr: See subprocess.Popen(), except that we support the sentinel
- value of cros_subprocess.PIPE_PTY.
- shell: See subprocess.Popen()
- cwd: Working directory to change to for subprocess, or None if none.
- env: Environment to use for this subprocess, or None to inherit parent.
- kwargs: No other arguments are supported at the moment. Passing other
- arguments will cause a ValueError to be raised.
- """
- stdout_pty = None
- stderr_pty = None
-
- if stdout == PIPE_PTY:
- stdout_pty = pty.openpty()
- stdout = os.fdopen(stdout_pty[1])
- if stderr == PIPE_PTY:
- stderr_pty = pty.openpty()
- stderr = os.fdopen(stderr_pty[1])
-
- super(Popen, self).__init__(args, stdin=stdin,
- stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
- **kwargs)
-
- # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
- # We want to use the master half on our end from now on. Setting this here
- # does make some assumptions about the implementation of subprocess, but
- # those assumptions are pretty minor.
-
- # Note that if stderr is STDOUT, then self.stderr will be set to None by
- # this constructor.
- if stdout_pty is not None:
- self.stdout = os.fdopen(stdout_pty[0])
- if stderr_pty is not None:
- self.stderr = os.fdopen(stderr_pty[0])
-
- # Insist that unit tests exist for other arguments we don't support.
- if kwargs:
- raise ValueError("Unit tests do not test extra args - please add tests")
-
- def CommunicateFilter(self, output):
- """Interact with process: Read data from stdout and stderr.
-
- This method runs until end-of-file is reached, then waits for the
- subprocess to terminate.
-
- The output function is sent all output from the subprocess and must be
- defined like this:
-
- def Output([self,] stream, data)
- Args:
- stream: the stream the output was received on, which will be
- sys.stdout or sys.stderr.
- data: a string containing the data
-
- Note: The data read is buffered in memory, so do not use this
- method if the data size is large or unlimited.
-
- Args:
- output: Function to call with each fragment of output.
-
- Returns:
- A tuple (stdout, stderr, combined) which is the data received on
- stdout, stderr and the combined data (interleaved stdout and stderr).
-
- Note that the interleaved output will only be sensible if you have
- set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
- the timing of the output in the subprocess. If a subprocess flips
- between stdout and stderr quickly in succession, by the time we come to
- read the output from each we may see several lines in each, and will read
- all the stdout lines, then all the stderr lines. So the interleaving
- may not be correct. In this case you might want to pass
- stderr=cros_subprocess.STDOUT to the constructor.
-
- This feature is still useful for subprocesses where stderr is
- rarely used and indicates an error.
-
- Note also that if you set stderr to STDOUT, then stderr will be empty
- and the combined output will just be the same as stdout.
- """
-
- read_set = []
- write_set = []
- stdout = None # Return
- stderr = None # Return
-
- if self.stdin:
- # Flush stdio buffer. This might block, if the user has
- # been writing to .stdin in an uncontrolled fashion.
- self.stdin.flush()
- if input:
- write_set.append(self.stdin)
- else:
- self.stdin.close()
- if self.stdout:
- read_set.append(self.stdout)
- stdout = []
- if self.stderr and self.stderr != self.stdout:
- read_set.append(self.stderr)
- stderr = []
- combined = []
-
- input_offset = 0
- while read_set or write_set:
- try:
- rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
- except select.error, e:
- if e.args[0] == errno.EINTR:
- continue
- raise
-
- if not stay_alive:
- self.terminate()
-
- if self.stdin in wlist:
- # When select has indicated that the file is writable,
- # we can write up to PIPE_BUF bytes without risk
- # blocking. POSIX defines PIPE_BUF >= 512
- chunk = input[input_offset : input_offset + 512]
- bytes_written = os.write(self.stdin.fileno(), chunk)
- input_offset += bytes_written
- if input_offset >= len(input):
- self.stdin.close()
- write_set.remove(self.stdin)
-
- if self.stdout in rlist:
- data = ""
- # We will get an error on read if the pty is closed
- try:
- data = os.read(self.stdout.fileno(), 1024)
- except OSError:
- pass
- if data == "":
- self.stdout.close()
- read_set.remove(self.stdout)
- else:
- stdout.append(data)
- combined.append(data)
- if output:
- output(sys.stdout, data)
- if self.stderr in rlist:
- data = ""
- # We will get an error on read if the pty is closed
- try:
- data = os.read(self.stderr.fileno(), 1024)
- except OSError:
- pass
- if data == "":
- self.stderr.close()
- read_set.remove(self.stderr)
- else:
- stderr.append(data)
- combined.append(data)
- if output:
- output(sys.stderr, data)
-
- # All data exchanged. Translate lists into strings.
- if stdout is not None:
- stdout = ''.join(stdout)
- else:
- stdout = ''
- if stderr is not None:
- stderr = ''.join(stderr)
- else:
- stderr = ''
- combined = ''.join(combined)
-
- # Translate newlines, if requested. We cannot let the file
- # object do the translation: It is based on stdio, which is
- # impossible to combine with select (unless forcing no
- # buffering).
- if self.universal_newlines and hasattr(file, 'newlines'):
- if stdout:
- stdout = self._translate_newlines(stdout)
- if stderr:
- stderr = self._translate_newlines(stderr)
-
- self.wait()
- return (stdout, stderr, combined)
-
-
-# Just being a unittest.TestCase gives us 14 public methods. Unless we
-# disable this, we can only have 6 tests in a TestCase. That's not enough.
-#
-# pylint: disable=R0904
-
-class TestSubprocess(unittest.TestCase):
- """Our simple unit test for this module"""
-
- class MyOperation:
- """Provides a operation that we can pass to Popen"""
- def __init__(self, input_to_send=None):
- """Constructor to set up the operation and possible input.
-
- Args:
- input_to_send: a text string to send when we first get input. We will
- add \r\n to the string.
- """
- self.stdout_data = ''
- self.stderr_data = ''
- self.combined_data = ''
- self.stdin_pipe = None
- self._input_to_send = input_to_send
- if input_to_send:
- pipe = os.pipe()
- self.stdin_read_pipe = pipe[0]
- self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
-
- def Output(self, stream, data):
- """Output handler for Popen. Stores the data for later comparison"""
- if stream == sys.stdout:
- self.stdout_data += data
- if stream == sys.stderr:
- self.stderr_data += data
- self.combined_data += data
-
- # Output the input string if we have one.
- if self._input_to_send:
- self._stdin_write_pipe.write(self._input_to_send + '\r\n')
- self._stdin_write_pipe.flush()
-
- def _BasicCheck(self, plist, oper):
- """Basic checks that the output looks sane."""
- self.assertEqual(plist[0], oper.stdout_data)
- self.assertEqual(plist[1], oper.stderr_data)
- self.assertEqual(plist[2], oper.combined_data)
-
- # The total length of stdout and stderr should equal the combined length
- self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
-
- def test_simple(self):
- """Simple redirection: Get process list"""
- oper = TestSubprocess.MyOperation()
- plist = Popen(['ps']).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
-
- def test_stderr(self):
- """Check stdout and stderr"""
- oper = TestSubprocess.MyOperation()
- cmd = 'echo fred >/dev/stderr && false || echo bad'
- plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(plist [0], 'bad\r\n')
- self.assertEqual(plist [1], 'fred\r\n')
-
- def test_shell(self):
- """Check with and without shell works"""
- oper = TestSubprocess.MyOperation()
- cmd = 'echo test >/dev/stderr'
- self.assertRaises(OSError, Popen, [cmd], shell=False)
- plist = Popen([cmd], shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(len(plist [0]), 0)
- self.assertEqual(plist [1], 'test\r\n')
-
- def test_list_args(self):
- """Check with and without shell works using list arguments"""
- oper = TestSubprocess.MyOperation()
- cmd = ['echo', 'test', '>/dev/stderr']
- plist = Popen(cmd, shell=False).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
- self.assertEqual(len(plist [1]), 0)
-
- oper = TestSubprocess.MyOperation()
-
- # this should be interpreted as 'echo' with the other args dropped
- cmd = ['echo', 'test', '>/dev/stderr']
- plist = Popen(cmd, shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(plist [0], '\r\n')
-
- def test_cwd(self):
- """Check we can change directory"""
- for shell in (False, True):
- oper = TestSubprocess.MyOperation()
- plist = Popen('pwd', shell=shell, cwd='/tmp').CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(plist [0], '/tmp\r\n')
-
- def test_env(self):
- """Check we can change environment"""
- for add in (False, True):
- oper = TestSubprocess.MyOperation()
- env = os.environ
- if add:
- env ['FRED'] = 'fred'
- cmd = 'echo $FRED'
- plist = Popen(cmd, shell=True, env=env).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
-
- def test_extra_args(self):
- """Check we can't add extra arguments"""
- self.assertRaises(ValueError, Popen, 'true', close_fds=False)
-
- def test_basic_input(self):
- """Check that incremental input works
-
- We set up a subprocess which will prompt for name. When we see this prompt
- we send the name as input to the process. It should then print the name
- properly to stdout.
- """
- oper = TestSubprocess.MyOperation('Flash')
- prompt = 'What is your name?: '
- cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
- plist = Popen([cmd], stdin=oper.stdin_read_pipe,
- shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(len(plist [1]), 0)
- self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
-
- def test_isatty(self):
- """Check that ptys appear as terminals to the subprocess"""
- oper = TestSubprocess.MyOperation()
- cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
- 'else echo "not %d" >&%d; fi;')
- both_cmds = ''
- for fd in (1, 2):
- both_cmds += cmd % (fd, fd, fd, fd, fd)
- plist = Popen(both_cmds, shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(plist [0], 'terminal 1\r\n')
- self.assertEqual(plist [1], 'terminal 2\r\n')
-
- # Now try with PIPE and make sure it is not a terminal
- oper = TestSubprocess.MyOperation()
- plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- shell=True).CommunicateFilter(oper.Output)
- self._BasicCheck(plist, oper)
- self.assertEqual(plist [0], 'not 1\n')
- self.assertEqual(plist [1], 'not 2\n')
-
-if __name__ == '__main__':
- unittest.main()