xref: /openbsd-src/gnu/llvm/lldb/third_party/Python/module/pexpect-4.6/pexpect/fdpexpect.py (revision 061da546b983eb767bad15e67af1174fb0bcf31c)
1*061da546Spatrick'''This is like pexpect, but it will work with any file descriptor that you
2*061da546Spatrickpass it. You are responsible for opening and close the file descriptor.
3*061da546SpatrickThis allows you to use Pexpect with sockets and named pipes (FIFOs).
4*061da546Spatrick
5*061da546SpatrickPEXPECT LICENSE
6*061da546Spatrick
7*061da546Spatrick    This license is approved by the OSI and FSF as GPL-compatible.
8*061da546Spatrick        http://opensource.org/licenses/isc-license.txt
9*061da546Spatrick
10*061da546Spatrick    Copyright (c) 2012, Noah Spurrier <noah@noah.org>
11*061da546Spatrick    PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
12*061da546Spatrick    PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
13*061da546Spatrick    COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
14*061da546Spatrick    THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
15*061da546Spatrick    WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
16*061da546Spatrick    MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
17*061da546Spatrick    ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
18*061da546Spatrick    WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
19*061da546Spatrick    ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
20*061da546Spatrick    OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
21*061da546Spatrick
22*061da546Spatrick'''
23*061da546Spatrick
24*061da546Spatrickfrom .spawnbase import SpawnBase
25*061da546Spatrickfrom .exceptions import ExceptionPexpect, TIMEOUT
26*061da546Spatrickfrom .utils import select_ignore_interrupts, poll_ignore_interrupts
27*061da546Spatrickimport os
28*061da546Spatrick
29*061da546Spatrick__all__ = ['fdspawn']
30*061da546Spatrick
31*061da546Spatrickclass fdspawn(SpawnBase):
32*061da546Spatrick    '''This is like pexpect.spawn but allows you to supply your own open file
33*061da546Spatrick    descriptor. For example, you could use it to read through a file looking
34*061da546Spatrick    for patterns, or to control a modem or serial device. '''
35*061da546Spatrick
36*061da546Spatrick    def __init__ (self, fd, args=None, timeout=30, maxread=2000, searchwindowsize=None,
37*061da546Spatrick                  logfile=None, encoding=None, codec_errors='strict', use_poll=False):
38*061da546Spatrick        '''This takes a file descriptor (an int) or an object that support the
39*061da546Spatrick        fileno() method (returning an int). All Python file-like objects
40*061da546Spatrick        support fileno(). '''
41*061da546Spatrick
42*061da546Spatrick        if type(fd) != type(0) and hasattr(fd, 'fileno'):
43*061da546Spatrick            fd = fd.fileno()
44*061da546Spatrick
45*061da546Spatrick        if type(fd) != type(0):
46*061da546Spatrick            raise ExceptionPexpect('The fd argument is not an int. If this is a command string then maybe you want to use pexpect.spawn.')
47*061da546Spatrick
48*061da546Spatrick        try: # make sure fd is a valid file descriptor
49*061da546Spatrick            os.fstat(fd)
50*061da546Spatrick        except OSError:
51*061da546Spatrick            raise ExceptionPexpect('The fd argument is not a valid file descriptor.')
52*061da546Spatrick
53*061da546Spatrick        self.args = None
54*061da546Spatrick        self.command = None
55*061da546Spatrick        SpawnBase.__init__(self, timeout, maxread, searchwindowsize, logfile,
56*061da546Spatrick                           encoding=encoding, codec_errors=codec_errors)
57*061da546Spatrick        self.child_fd = fd
58*061da546Spatrick        self.own_fd = False
59*061da546Spatrick        self.closed = False
60*061da546Spatrick        self.name = '<file descriptor %d>' % fd
61*061da546Spatrick        self.use_poll = use_poll
62*061da546Spatrick
63*061da546Spatrick    def close (self):
64*061da546Spatrick        """Close the file descriptor.
65*061da546Spatrick
66*061da546Spatrick        Calling this method a second time does nothing, but if the file
67*061da546Spatrick        descriptor was closed elsewhere, :class:`OSError` will be raised.
68*061da546Spatrick        """
69*061da546Spatrick        if self.child_fd == -1:
70*061da546Spatrick            return
71*061da546Spatrick
72*061da546Spatrick        self.flush()
73*061da546Spatrick        os.close(self.child_fd)
74*061da546Spatrick        self.child_fd = -1
75*061da546Spatrick        self.closed = True
76*061da546Spatrick
77*061da546Spatrick    def isalive (self):
78*061da546Spatrick        '''This checks if the file descriptor is still valid. If :func:`os.fstat`
79*061da546Spatrick        does not raise an exception then we assume it is alive. '''
80*061da546Spatrick
81*061da546Spatrick        if self.child_fd == -1:
82*061da546Spatrick            return False
83*061da546Spatrick        try:
84*061da546Spatrick            os.fstat(self.child_fd)
85*061da546Spatrick            return True
86*061da546Spatrick        except:
87*061da546Spatrick            return False
88*061da546Spatrick
89*061da546Spatrick    def terminate (self, force=False):  # pragma: no cover
90*061da546Spatrick        '''Deprecated and invalid. Just raises an exception.'''
91*061da546Spatrick        raise ExceptionPexpect('This method is not valid for file descriptors.')
92*061da546Spatrick
93*061da546Spatrick    # These four methods are left around for backwards compatibility, but not
94*061da546Spatrick    # documented as part of fdpexpect. You're encouraged to use os.write
95*061da546Spatrick    # directly.
96*061da546Spatrick    def send(self, s):
97*061da546Spatrick        "Write to fd, return number of bytes written"
98*061da546Spatrick        s = self._coerce_send_string(s)
99*061da546Spatrick        self._log(s, 'send')
100*061da546Spatrick
101*061da546Spatrick        b = self._encoder.encode(s, final=False)
102*061da546Spatrick        return os.write(self.child_fd, b)
103*061da546Spatrick
104*061da546Spatrick    def sendline(self, s):
105*061da546Spatrick        "Write to fd with trailing newline, return number of bytes written"
106*061da546Spatrick        s = self._coerce_send_string(s)
107*061da546Spatrick        return self.send(s + self.linesep)
108*061da546Spatrick
109*061da546Spatrick    def write(self, s):
110*061da546Spatrick        "Write to fd, return None"
111*061da546Spatrick        self.send(s)
112*061da546Spatrick
113*061da546Spatrick    def writelines(self, sequence):
114*061da546Spatrick        "Call self.write() for each item in sequence"
115*061da546Spatrick        for s in sequence:
116*061da546Spatrick            self.write(s)
117*061da546Spatrick
118*061da546Spatrick    def read_nonblocking(self, size=1, timeout=-1):
119*061da546Spatrick        """
120*061da546Spatrick        Read from the file descriptor and return the result as a string.
121*061da546Spatrick
122*061da546Spatrick        The read_nonblocking method of :class:`SpawnBase` assumes that a call
123*061da546Spatrick        to os.read will not block (timeout parameter is ignored). This is not
124*061da546Spatrick        the case for POSIX file-like objects such as sockets and serial ports.
125*061da546Spatrick
126*061da546Spatrick        Use :func:`select.select`, timeout is implemented conditionally for
127*061da546Spatrick        POSIX systems.
128*061da546Spatrick
129*061da546Spatrick        :param int size: Read at most *size* bytes.
130*061da546Spatrick        :param int timeout: Wait timeout seconds for file descriptor to be
131*061da546Spatrick            ready to read. When -1 (default), use self.timeout. When 0, poll.
132*061da546Spatrick        :return: String containing the bytes read
133*061da546Spatrick        """
134*061da546Spatrick        if os.name == 'posix':
135*061da546Spatrick            if timeout == -1:
136*061da546Spatrick                timeout = self.timeout
137*061da546Spatrick            rlist = [self.child_fd]
138*061da546Spatrick            wlist = []
139*061da546Spatrick            xlist = []
140*061da546Spatrick            if self.use_poll:
141*061da546Spatrick                rlist = poll_ignore_interrupts(rlist, timeout)
142*061da546Spatrick            else:
143*061da546Spatrick                rlist, wlist, xlist = select_ignore_interrupts(
144*061da546Spatrick                    rlist, wlist, xlist, timeout
145*061da546Spatrick                )
146*061da546Spatrick            if self.child_fd not in rlist:
147*061da546Spatrick                raise TIMEOUT('Timeout exceeded.')
148*061da546Spatrick        return super(fdspawn, self).read_nonblocking(size)
149