| Server IP : 127.0.1.1 / Your IP : 216.73.216.152 Web Server : Apache/2.4.52 (Ubuntu) System : Linux bahcrestlinepropertiesllc 5.15.0-113-generic #123-Ubuntu SMP Mon Jun 10 08:16:17 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /proc/2792866/cwd/usr/lib/python3/dist-packages/requests_toolbelt/auth/ |
Upload File : |
# -*- coding: utf-8 -*-
"""The module containing HTTPProxyDigestAuth."""
import re
from requests import cookies, utils
from . import _digest_auth_compat as auth
class HTTPProxyDigestAuth(auth.HTTPDigestAuth):
"""HTTP digest authentication between proxy
:param stale_rejects: The number of rejects indicate that:
the client may wish to simply retry the request
with a new encrypted response, without reprompting the user for a
new username and password. i.e., retry build_digest_header
:type stale_rejects: int
"""
_pat = re.compile(r'digest ', flags=re.IGNORECASE)
def __init__(self, *args, **kwargs):
super(HTTPProxyDigestAuth, self).__init__(*args, **kwargs)
self.stale_rejects = 0
self.init_per_thread_state()
@property
def stale_rejects(self):
thread_local = getattr(self, '_thread_local', None)
if thread_local is None:
return self._stale_rejects
return thread_local.stale_rejects
@stale_rejects.setter
def stale_rejects(self, value):
thread_local = getattr(self, '_thread_local', None)
if thread_local is None:
self._stale_rejects = value
else:
thread_local.stale_rejects = value
def init_per_thread_state(self):
try:
super(HTTPProxyDigestAuth, self).init_per_thread_state()
except AttributeError:
# If we're not on requests 2.8.0+ this method does not exist
pass
def handle_407(self, r, **kwargs):
"""Handle HTTP 407 only once, otherwise give up
:param r: current response
:returns: responses, along with the new response
"""
if r.status_code == 407 and self.stale_rejects < 2:
s_auth = r.headers.get("proxy-authenticate")
if s_auth is None:
raise IOError(
"proxy server violated RFC 7235:"
"407 response MUST contain header proxy-authenticate")
elif not self._pat.match(s_auth):
return r
self.chal = utils.parse_dict_header(
self._pat.sub('', s_auth, count=1))
# if we present the user/passwd and still get rejected
# http://tools.ietf.org/html/rfc2617#section-3.2.1
if ('Proxy-Authorization' in r.request.headers and
'stale' in self.chal):
if self.chal['stale'].lower() == 'true': # try again
self.stale_rejects += 1
# wrong user/passwd
elif self.chal['stale'].lower() == 'false':
raise IOError("User or password is invalid")
# Consume content and release the original connection
# to allow our new request to reuse the same one.
r.content
r.close()
prep = r.request.copy()
cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw)
prep.prepare_cookies(prep._cookies)
prep.headers['Proxy-Authorization'] = self.build_digest_header(
prep.method, prep.url)
_r = r.connection.send(prep, **kwargs)
_r.history.append(r)
_r.request = prep
return _r
else: # give up authenticate
return r
def __call__(self, r):
self.init_per_thread_state()
# if we have nonce, then just use it, otherwise server will tell us
if self.last_nonce:
r.headers['Proxy-Authorization'] = self.build_digest_header(
r.method, r.url
)
r.register_hook('response', self.handle_407)
return r