Source code for rtkit.authenticators

"""Connect to an RT server using various authentication techniques

* The :py:class:`~rtkit.authenticators.AbstractAuthenticator` contains the base methods
* And the current implementations are:
    * :py:class:`~rtkit.authenticators.BasicAuthenticator`
    * :py:class:`~rtkit.authenticators.CookieAuthenticator`
    * :py:class:`~rtkit.authenticators.QueryStringAuthenticator`
    * :py:class:`~rtkit.authenticators.KerberosAuthenticator`

.. seealso::

    :py:mod:`rtkit.resource` for usage


"""
import os
# for compatibility with Python 3.x
try:
    from urllib.parse import urlencode
except ImportError:
    from urllib import urlencode
try:
    import urllib.request as urllib2
except ImportError:
    import urllib2
try:
    import http.cookiejar as cookielib
except ImportError:
    import cookielib
try:
    from urllib.parse import urlsplit, parse_qs, urlunsplit
except ImportError:
    from urlparse import urlsplit, parse_qs, urlunsplit

__all__ = [
    'BasicAuthenticator',
    'CookieAuthenticator',
    'QueryStringAuthenticator',
    'KerberosAuthenticator',
]
if os.environ.get('__GEN_DOCS__', None):
    __all__.insert(0, "AbstractAuthenticator")


[docs]class AbstractAuthenticator(object): """Abstract Base Authenticator""" def __init__(self, username, password, url, *handlers): """ :param username: The RT Login :param password: Plain Text Password :param url: the url ? :param *handlers: todo """ self.opener = urllib2.build_opener(*handlers) self.username = username self.password = password self.url = url self._logged = True
[docs] def login(self): """Login to server, unless already logged in""" if self._logged: return self._login() self._logged = True
def _login(self): raise NotImplementedError
[docs] def open(self, request): """Open connection to server""" self.login() return self.opener.open(request)
[docs]class BasicAuthenticator(AbstractAuthenticator): """Basic Authenticator .. doctest:: from rtkit.resource import RTResource from rtkit.authenticators import BasicAuthenticator from rtkit.errors import RTResourceError from rtkit import set_logging import logging set_logging('debug') logger = logging.getLogger('rtkit') resource = RTResource('http://<HOST>/REST/1.0/', '<USER>', '<PWD>', BasicAuthenticator) """ def __init__(self, username, password, url): passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, url, username, password) super(BasicAuthenticator, self).__init__( username, password, url, urllib2.HTTPBasicAuthHandler(passman) )
[docs]class CookieAuthenticator(AbstractAuthenticator): """Authenticate against server using a cookie .. doctest:: from rtkit.resource import RTResource from rtkit.authenticators import CookieAuthenticator from rtkit.errors import RTResourceError from rtkit import set_logging import logging set_logging('debug') logger = logging.getLogger('rtkit') resource = RTResource('http://<HOST>/REST/1.0/', '<USER>', '<PWD>', CookieAuthenticator) """ def __init__(self, username, password, url): super(CookieAuthenticator, self).__init__( username, password, url, urllib2.HTTPCookieProcessor(cookielib.LWPCookieJar()) ) self._logged = False def _login(self): data = {'user': self.username, 'pass': self.password} self.opener.open( urllib2.Request(self.url, urlencode(data)) )
[docs]class QueryStringAuthenticator(AbstractAuthenticator): """Authenticate against server using a querystring .. doctest:: from rtkit.resource import RTResource from rtkit.authenticators import QueryStringAuthenticator from rtkit.errors import RTResourceError from rtkit import set_logging import logging set_logging('debug') logger = logging.getLogger('rtkit') resource = RTResource('http://<HOST>/REST/1.0/', '<USER>', '<PWD>', QueryStringAuthenticator) """ def __init__(self, username, password, url): super(QueryStringAuthenticator, self).__init__(username, password, url, QueryStringAuthHandler(username, password))
class QueryStringAuthHandler(urllib2.BaseHandler): def __init__(self, username, password): self.username = username self.password = password def default_open(self, request): scheme, netloc, path, query_string, fragment = urlsplit(request.get_full_url()) query_params = parse_qs(query_string) query_params['user'] = self.username query_params['pass'] = self.password request = urllib2.Request( url=urlunsplit((scheme, netloc, path, urlencode(query_params, doseq=True), fragment)), data=request.data, headers=request.headers ) return urllib2.urlopen(request)
[docs]class KerberosAuthenticator(AbstractAuthenticator): """Authenticate using Kerberos .. warning:: * Requires the urllib2_kerberos * http://pypi.python.org/pypi/urllib2_kerberos/ * sudo easy_install urllib2_kerberos .. doctest:: from rtkit.resource import RTResource from rtkit.authenticators import KerberosAuthenticator from rtkit.errors import RTResourceError from rtkit import set_logging import logging set_logging('debug') logger = logging.getLogger('rtkit') resource = RTResource(url, None, None, KerberosAuthenticator) """ def __init__(self, username, password, url): try: from urllib2_kerberos import HTTPKerberosAuthHandler except ImportError: raise ImportError('You need urllib2_kerberos, try: pip install urllib2_kerberos') super(KerberosAuthenticator, self).__init__( username, password, url, HTTPKerberosAuthHandler() )