aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/http/server.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/http/server.py')
-rw-r--r--Lib/http/server.py390
1 files changed, 29 insertions, 361 deletions
diff --git a/Lib/http/server.py b/Lib/http/server.py
index 64f766f9bc2..ef10d185932 100644
--- a/Lib/http/server.py
+++ b/Lib/http/server.py
@@ -1,29 +1,10 @@
"""HTTP server classes.
Note: BaseHTTPRequestHandler doesn't implement any HTTP request; see
-SimpleHTTPRequestHandler for simple implementations of GET, HEAD and POST,
-and (deprecated) CGIHTTPRequestHandler for CGI scripts.
+SimpleHTTPRequestHandler for simple implementations of GET, HEAD and POST.
It does, however, optionally implement HTTP/1.1 persistent connections.
-Notes on CGIHTTPRequestHandler
-------------------------------
-
-This class is deprecated. It implements GET and POST requests to cgi-bin scripts.
-
-If the os.fork() function is not present (Windows), subprocess.Popen() is used,
-with slightly altered but never documented semantics. Use from a threaded
-process is likely to trigger a warning at os.fork() time.
-
-In all cases, the implementation is intentionally naive -- all
-requests are executed synchronously.
-
-SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
--- it may execute arbitrary Python code or external programs.
-
-Note that status code 200 is sent prior to execution of a CGI script, so
-scripts cannot send other status codes such as 302 (redirect).
-
XXX To do:
- log requests even later (to capture byte count)
@@ -86,10 +67,8 @@ __all__ = [
"HTTPServer", "ThreadingHTTPServer",
"HTTPSServer", "ThreadingHTTPSServer",
"BaseHTTPRequestHandler", "SimpleHTTPRequestHandler",
- "CGIHTTPRequestHandler",
]
-import copy
import datetime
import email.utils
import html
@@ -99,7 +78,6 @@ import itertools
import mimetypes
import os
import posixpath
-import select
import shutil
import socket
import socketserver
@@ -750,7 +728,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
f = None
if os.path.isdir(path):
parts = urllib.parse.urlsplit(self.path)
- if not parts.path.endswith('/'):
+ if not parts.path.endswith(('/', '%2f', '%2F')):
# redirect browser - doing basically what apache does
self.send_response(HTTPStatus.MOVED_PERMANENTLY)
new_parts = (parts[0], parts[1], parts[2] + '/',
@@ -840,11 +818,14 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
return None
list.sort(key=lambda a: a.lower())
r = []
+ displaypath = self.path
+ displaypath = displaypath.split('#', 1)[0]
+ displaypath = displaypath.split('?', 1)[0]
try:
- displaypath = urllib.parse.unquote(self.path,
+ displaypath = urllib.parse.unquote(displaypath,
errors='surrogatepass')
except UnicodeDecodeError:
- displaypath = urllib.parse.unquote(self.path)
+ displaypath = urllib.parse.unquote(displaypath)
displaypath = html.escape(displaypath, quote=False)
enc = sys.getfilesystemencoding()
title = f'Directory listing for {displaypath}'
@@ -890,14 +871,14 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
"""
# abandon query parameters
- path = path.split('?',1)[0]
- path = path.split('#',1)[0]
+ path = path.split('#', 1)[0]
+ path = path.split('?', 1)[0]
# Don't forget explicit trailing slash when normalizing. Issue17324
- trailing_slash = path.rstrip().endswith('/')
try:
path = urllib.parse.unquote(path, errors='surrogatepass')
except UnicodeDecodeError:
path = urllib.parse.unquote(path)
+ trailing_slash = path.endswith('/')
path = posixpath.normpath(path)
words = path.split('/')
words = filter(None, words)
@@ -953,56 +934,6 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
return 'application/octet-stream'
-# Utilities for CGIHTTPRequestHandler
-
-def _url_collapse_path(path):
- """
- Given a URL path, remove extra '/'s and '.' path elements and collapse
- any '..' references and returns a collapsed path.
-
- Implements something akin to RFC-2396 5.2 step 6 to parse relative paths.
- The utility of this function is limited to is_cgi method and helps
- preventing some security attacks.
-
- Returns: The reconstituted URL, which will always start with a '/'.
-
- Raises: IndexError if too many '..' occur within the path.
-
- """
- # Query component should not be involved.
- path, _, query = path.partition('?')
- path = urllib.parse.unquote(path)
-
- # Similar to os.path.split(os.path.normpath(path)) but specific to URL
- # path semantics rather than local operating system semantics.
- path_parts = path.split('/')
- head_parts = []
- for part in path_parts[:-1]:
- if part == '..':
- head_parts.pop() # IndexError if more '..' than prior parts
- elif part and part != '.':
- head_parts.append( part )
- if path_parts:
- tail_part = path_parts.pop()
- if tail_part:
- if tail_part == '..':
- head_parts.pop()
- tail_part = ''
- elif tail_part == '.':
- tail_part = ''
- else:
- tail_part = ''
-
- if query:
- tail_part = '?'.join((tail_part, query))
-
- splitpath = ('/' + '/'.join(head_parts), tail_part)
- collapsed_path = "/".join(splitpath)
-
- return collapsed_path
-
-
-
nobody = None
def nobody_uid():
@@ -1026,274 +957,6 @@ def executable(path):
return os.access(path, os.X_OK)
-class CGIHTTPRequestHandler(SimpleHTTPRequestHandler):
-
- """Complete HTTP server with GET, HEAD and POST commands.
-
- GET and HEAD also support running CGI scripts.
-
- The POST command is *only* implemented for CGI scripts.
-
- """
-
- def __init__(self, *args, **kwargs):
- import warnings
- warnings._deprecated("http.server.CGIHTTPRequestHandler",
- remove=(3, 15))
- super().__init__(*args, **kwargs)
-
- # Determine platform specifics
- have_fork = hasattr(os, 'fork')
-
- # Make rfile unbuffered -- we need to read one line and then pass
- # the rest to a subprocess, so we can't use buffered input.
- rbufsize = 0
-
- def do_POST(self):
- """Serve a POST request.
-
- This is only implemented for CGI scripts.
-
- """
-
- if self.is_cgi():
- self.run_cgi()
- else:
- self.send_error(
- HTTPStatus.NOT_IMPLEMENTED,
- "Can only POST to CGI scripts")
-
- def send_head(self):
- """Version of send_head that support CGI scripts"""
- if self.is_cgi():
- return self.run_cgi()
- else:
- return SimpleHTTPRequestHandler.send_head(self)
-
- def is_cgi(self):
- """Test whether self.path corresponds to a CGI script.
-
- Returns True and updates the cgi_info attribute to the tuple
- (dir, rest) if self.path requires running a CGI script.
- Returns False otherwise.
-
- If any exception is raised, the caller should assume that
- self.path was rejected as invalid and act accordingly.
-
- The default implementation tests whether the normalized url
- path begins with one of the strings in self.cgi_directories
- (and the next character is a '/' or the end of the string).
-
- """
- collapsed_path = _url_collapse_path(self.path)
- dir_sep = collapsed_path.find('/', 1)
- while dir_sep > 0 and not collapsed_path[:dir_sep] in self.cgi_directories:
- dir_sep = collapsed_path.find('/', dir_sep+1)
- if dir_sep > 0:
- head, tail = collapsed_path[:dir_sep], collapsed_path[dir_sep+1:]
- self.cgi_info = head, tail
- return True
- return False
-
-
- cgi_directories = ['/cgi-bin', '/htbin']
-
- def is_executable(self, path):
- """Test whether argument path is an executable file."""
- return executable(path)
-
- def is_python(self, path):
- """Test whether argument path is a Python script."""
- head, tail = os.path.splitext(path)
- return tail.lower() in (".py", ".pyw")
-
- def run_cgi(self):
- """Execute a CGI script."""
- dir, rest = self.cgi_info
- path = dir + '/' + rest
- i = path.find('/', len(dir)+1)
- while i >= 0:
- nextdir = path[:i]
- nextrest = path[i+1:]
-
- scriptdir = self.translate_path(nextdir)
- if os.path.isdir(scriptdir):
- dir, rest = nextdir, nextrest
- i = path.find('/', len(dir)+1)
- else:
- break
-
- # find an explicit query string, if present.
- rest, _, query = rest.partition('?')
-
- # dissect the part after the directory name into a script name &
- # a possible additional path, to be stored in PATH_INFO.
- i = rest.find('/')
- if i >= 0:
- script, rest = rest[:i], rest[i:]
- else:
- script, rest = rest, ''
-
- scriptname = dir + '/' + script
- scriptfile = self.translate_path(scriptname)
- if not os.path.exists(scriptfile):
- self.send_error(
- HTTPStatus.NOT_FOUND,
- "No such CGI script (%r)" % scriptname)
- return
- if not os.path.isfile(scriptfile):
- self.send_error(
- HTTPStatus.FORBIDDEN,
- "CGI script is not a plain file (%r)" % scriptname)
- return
- ispy = self.is_python(scriptname)
- if self.have_fork or not ispy:
- if not self.is_executable(scriptfile):
- self.send_error(
- HTTPStatus.FORBIDDEN,
- "CGI script is not executable (%r)" % scriptname)
- return
-
- # Reference: https://www6.uniovi.es/~antonio/ncsa_httpd/cgi/env.html
- # XXX Much of the following could be prepared ahead of time!
- env = copy.deepcopy(os.environ)
- env['SERVER_SOFTWARE'] = self.version_string()
- env['SERVER_NAME'] = self.server.server_name
- env['GATEWAY_INTERFACE'] = 'CGI/1.1'
- env['SERVER_PROTOCOL'] = self.protocol_version
- env['SERVER_PORT'] = str(self.server.server_port)
- env['REQUEST_METHOD'] = self.command
- uqrest = urllib.parse.unquote(rest)
- env['PATH_INFO'] = uqrest
- env['PATH_TRANSLATED'] = self.translate_path(uqrest)
- env['SCRIPT_NAME'] = scriptname
- env['QUERY_STRING'] = query
- env['REMOTE_ADDR'] = self.client_address[0]
- authorization = self.headers.get("authorization")
- if authorization:
- authorization = authorization.split()
- if len(authorization) == 2:
- import base64, binascii
- env['AUTH_TYPE'] = authorization[0]
- if authorization[0].lower() == "basic":
- try:
- authorization = authorization[1].encode('ascii')
- authorization = base64.decodebytes(authorization).\
- decode('ascii')
- except (binascii.Error, UnicodeError):
- pass
- else:
- authorization = authorization.split(':')
- if len(authorization) == 2:
- env['REMOTE_USER'] = authorization[0]
- # XXX REMOTE_IDENT
- if self.headers.get('content-type') is None:
- env['CONTENT_TYPE'] = self.headers.get_content_type()
- else:
- env['CONTENT_TYPE'] = self.headers['content-type']
- length = self.headers.get('content-length')
- if length:
- env['CONTENT_LENGTH'] = length
- referer = self.headers.get('referer')
- if referer:
- env['HTTP_REFERER'] = referer
- accept = self.headers.get_all('accept', ())
- env['HTTP_ACCEPT'] = ','.join(accept)
- ua = self.headers.get('user-agent')
- if ua:
- env['HTTP_USER_AGENT'] = ua
- co = filter(None, self.headers.get_all('cookie', []))
- cookie_str = ', '.join(co)
- if cookie_str:
- env['HTTP_COOKIE'] = cookie_str
- # XXX Other HTTP_* headers
- # Since we're setting the env in the parent, provide empty
- # values to override previously set values
- for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
- 'HTTP_USER_AGENT', 'HTTP_COOKIE', 'HTTP_REFERER'):
- env.setdefault(k, "")
-
- self.send_response(HTTPStatus.OK, "Script output follows")
- self.flush_headers()
-
- decoded_query = query.replace('+', ' ')
-
- if self.have_fork:
- # Unix -- fork as we should
- args = [script]
- if '=' not in decoded_query:
- args.append(decoded_query)
- nobody = nobody_uid()
- self.wfile.flush() # Always flush before forking
- pid = os.fork()
- if pid != 0:
- # Parent
- pid, sts = os.waitpid(pid, 0)
- # throw away additional data [see bug #427345]
- while select.select([self.rfile], [], [], 0)[0]:
- if not self.rfile.read(1):
- break
- exitcode = os.waitstatus_to_exitcode(sts)
- if exitcode:
- self.log_error(f"CGI script exit code {exitcode}")
- return
- # Child
- try:
- try:
- os.setuid(nobody)
- except OSError:
- pass
- os.dup2(self.rfile.fileno(), 0)
- os.dup2(self.wfile.fileno(), 1)
- os.execve(scriptfile, args, env)
- except:
- self.server.handle_error(self.request, self.client_address)
- os._exit(127)
-
- else:
- # Non-Unix -- use subprocess
- import subprocess
- cmdline = [scriptfile]
- if self.is_python(scriptfile):
- interp = sys.executable
- if interp.lower().endswith("w.exe"):
- # On Windows, use python.exe, not pythonw.exe
- interp = interp[:-5] + interp[-4:]
- cmdline = [interp, '-u'] + cmdline
- if '=' not in query:
- cmdline.append(query)
- self.log_message("command: %s", subprocess.list2cmdline(cmdline))
- try:
- nbytes = int(length)
- except (TypeError, ValueError):
- nbytes = 0
- p = subprocess.Popen(cmdline,
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env = env
- )
- if self.command.lower() == "post" and nbytes > 0:
- data = self.rfile.read(nbytes)
- else:
- data = None
- # throw away additional data [see bug #427345]
- while select.select([self.rfile._sock], [], [], 0)[0]:
- if not self.rfile._sock.recv(1):
- break
- stdout, stderr = p.communicate(data)
- self.wfile.write(stdout)
- if stderr:
- self.log_error('%s', stderr)
- p.stderr.close()
- p.stdout.close()
- status = p.returncode
- if status:
- self.log_error("CGI script exit status %#x", status)
- else:
- self.log_message("CGI script exited OK")
-
-
def _get_best_family(*address):
infos = socket.getaddrinfo(
*address,
@@ -1317,8 +980,8 @@ def test(HandlerClass=BaseHTTPRequestHandler,
HandlerClass.protocol_version = protocol
if tls_cert:
- server = ThreadingHTTPSServer(addr, HandlerClass, certfile=tls_cert,
- keyfile=tls_key, password=tls_password)
+ server = ServerClass(addr, HandlerClass, certfile=tls_cert,
+ keyfile=tls_key, password=tls_password)
else:
server = ServerClass(addr, HandlerClass)
@@ -1336,13 +999,12 @@ def test(HandlerClass=BaseHTTPRequestHandler,
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)
-if __name__ == '__main__':
+
+def _main(args=None):
import argparse
import contextlib
parser = argparse.ArgumentParser(color=True)
- parser.add_argument('--cgi', action='store_true',
- help='run as CGI server')
parser.add_argument('-b', '--bind', metavar='ADDRESS',
help='bind to this address '
'(default: all interfaces)')
@@ -1362,7 +1024,7 @@ if __name__ == '__main__':
parser.add_argument('port', default=8000, type=int, nargs='?',
help='bind to this port '
'(default: %(default)s)')
- args = parser.parse_args()
+ args = parser.parse_args(args)
if not args.tls_cert and args.tls_key:
parser.error("--tls-key requires --tls-cert to be set")
@@ -1378,13 +1040,8 @@ if __name__ == '__main__':
except OSError as e:
parser.error(f"Failed to read TLS password file: {e}")
- if args.cgi:
- handler_class = CGIHTTPRequestHandler
- else:
- handler_class = SimpleHTTPRequestHandler
-
# ensure dual-stack is not disabled; ref #38907
- class DualStackServer(ThreadingHTTPServer):
+ class DualStackServerMixin:
def server_bind(self):
# suppress exception when protocol is IPv4
@@ -1397,9 +1054,16 @@ if __name__ == '__main__':
self.RequestHandlerClass(request, client_address, self,
directory=args.directory)
+ class HTTPDualStackServer(DualStackServerMixin, ThreadingHTTPServer):
+ pass
+ class HTTPSDualStackServer(DualStackServerMixin, ThreadingHTTPSServer):
+ pass
+
+ ServerClass = HTTPSDualStackServer if args.tls_cert else HTTPDualStackServer
+
test(
- HandlerClass=handler_class,
- ServerClass=DualStackServer,
+ HandlerClass=SimpleHTTPRequestHandler,
+ ServerClass=ServerClass,
port=args.port,
bind=args.bind,
protocol=args.protocol,
@@ -1407,3 +1071,7 @@ if __name__ == '__main__':
tls_key=args.tls_key,
tls_password=tls_key_password,
)
+
+
+if __name__ == '__main__':
+ _main()