Commit 8ffb9fa6 by xczh

init repo ver 0.2

parents
# IDE
.idea
# cache
__pycache__
*.pyc
\ No newline at end of file
# XJTUCAS-pyClient
西安交通大学统一身份认证系统,Python客户端
[Maintainer]
- zhuxingchi [<zhuxingchi@tiaozhan.com>]
- furuoxuan [<furuoxuan@vip.qq.com>]
欢迎反馈任何的Bug或issue。
## Usage
```sh
# 获取源码
git clone https://git.tiaozhan.com/xczh/xjtucas-pyclient.git
cd xjtucas-pyclient
# 安装库依赖
pip install -r requirements.txt
# 将库文件复制到你的项目目录
cp src/xjtucas_pyclient ~/your_project
# 然后你就可以这样用啦!
# from xjtucas_pyclient import CASClient
#
```
具体API我们在`example`目录下给出了对应的例子,可以参考着来用~
> 要运行`example`目录下的demo,需要安装依赖`pip install -r example/requirements.txt`
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
a Demo HTTPServer Using xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
# 导入xjtucas_pyclient库
import sys
sys.path.append('../src/')
from xjtucas_pyclient import CASClient
# 我们使用tornado作为演示HTTPServer
import tornado.ioloop
import tornado.web
class CASHandler(tornado.web.RequestHandler):
''' 处理CAS登录/登出请求的类 '''
def get(self):
# 实例化一个CASClient,通常使用基础协议,版本1即可
# service_url为登录完毕之后回调url,本例使用同一个请求处理类完成重定向登录和验证工作
cas = CASClient(
version = '1',
service_url = 'http://localhost/cas?extra_param1=p1',
)
action = self.get_argument('action', default='login')
if action == 'login':
# 登录逻辑
# 判断GET参数中是否包含ticket参数,有则进行ST验证工作,无则重定向登录
ticket = self.get_argument('ticket', default=None)
if ticket:
# 验证ST是否有效,有效返回用户NetID,无效返回None
# 注意本方法返回一个3维元组,在版本1中后两维全为None,可以忽略
# WARNING: 这是一个同步方法,会阻塞IO,因而不能用于异步框架,本例仅仅用于说明用法,不能用于tornado生产环境!
# TODO: 异步(asyncio)验证支持
netid = cas.verify_ticket(ticket)[0]
if netid is not None:
# 此时登录工作已经完成,输出提示页面
text = '''
<html>
<body>
<h2>xjtucas_pyclient Version: %s</h2>
<p>Your NetID: %s</p>
</body>
</html>
''' % (CASClient.getVersion(), netid)
return self.write(text)
# ST无效,重定向进行登录
return self.redirect(cas.get_login_url())
elif action == 'logout':
# 登出逻辑
return self.redirect(cas.get_logout_url())
else:
return self.send_error('unsupported action: %s' % action)
def make_app():
return tornado.web.Application([
(r"/cas", CASHandler),
])
if __name__ == '__main__':
app = make_app()
# 请使用 http://localhost 进行测试,此回环地址已被网络中心授权用于开发测试
app.listen(80)
tornado.ioloop.IOLoop.current().start()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package: xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
__VERSION__ = '0.2'
from .cas_client_v1 import CASClientV1
from .cas_client_v2 import CASClientV2
from .cas_client_v3 import CASClientV3
from .cas_client_with_saml_v1 import CASClientWithSAMLV1
class CASClient(object):
def __new__(self, *args, **kwargs):
version = kwargs.pop('version')
if version in (1, '1'):
return CASClientV1(*args, **kwargs)
elif version in (2, '2'):
return CASClientV2(*args, **kwargs)
elif version in (3, '3'):
return CASClientV3(*args, **kwargs)
elif version == 'CAS_2_SAML_1_0':
return CASClientWithSAMLV1(*args, **kwargs)
raise ValueError('Unsupported CAS_VERSION %r' % version)
@staticmethod
def getVersion():
return __VERSION__
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package: xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
from six.moves.urllib import parse as urllib_parse
import requests
from .cas_error import CASError
class CASClientBase(object):
logout_redirect_param_name = 'service'
def __init__(self, service_url=None, server_url='https://cas.xjtu.edu.cn',
extra_login_params=None, renew=False,
username_attribute=None):
self.service_url = service_url
self.server_url = server_url
self.extra_login_params = extra_login_params or {}
self.renew = renew
self.username_attribute = username_attribute
pass
def verify_ticket(self, ticket):
"""must return a triple"""
raise NotImplementedError()
def get_login_url(self):
"""Generates CAS login URL"""
params = {'service': self.service_url}
if self.renew:
params.update({'renew': 'true'})
params.update(self.extra_login_params)
url = urllib_parse.urljoin(self.server_url, 'login')
query = urllib_parse.urlencode(params)
return url + '?' + query
def get_logout_url(self, redirect_url=None):
"""Generates CAS logout URL"""
url = urllib_parse.urljoin(self.server_url, 'logout')
if redirect_url:
params = {self.logout_redirect_param_name: redirect_url}
url += '?' + urllib_parse.urlencode(params)
return url
def get_proxy_url(self, pgt):
"""Returns proxy url, given the proxy granting ticket"""
params = urllib_parse.urlencode({'pgt': pgt, 'targetService': self.service_url})
return "%s/proxy?%s" % (self.server_url, params)
def get_proxy_ticket(self, pgt):
"""Returns proxy ticket given the proxy granting ticket"""
response = requests.get(self.get_proxy_url(pgt))
try:
from lxml import etree
except ImportError:
import xml.etree.ElementTree as etree
if response.status_code == 200:
root = etree.fromstring(response.content)
tickets = root.xpath(
"//cas:proxyTicket",
namespaces={"cas": "http://www.yale.edu/tp/cas"}
)
if len(tickets) == 1:
return tickets[0].text
errors = root.xpath(
"//cas:authenticationFailure",
namespaces={"cas": "http://www.yale.edu/tp/cas"}
)
if len(errors) == 1:
raise CASError(errors[0].attrib['code'], errors[0].text)
raise CASError("Bad http code %s" % response.status_code)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package: xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
from six.moves.urllib import parse as urllib_parse
import requests
from .cas_client_base import CASClientBase
class CASClientV1(CASClientBase):
"""CAS Client Version 1"""
logout_redirect_param_name = 'url'
def verify_ticket(self, ticket):
"""Verifies CAS 1.0 authentication ticket.
Returns username on success and None on failure.
"""
params = [('ticket', ticket), ('service', self.service_url)]
url = (urllib_parse.urljoin(self.server_url, 'validate') + '?' +
urllib_parse.urlencode(params))
page = requests.get(url, stream=True)
try:
page_iterator = page.iter_lines(chunk_size=8192)
verified = next(page_iterator).strip().decode()
if verified == 'yes':
return next(page_iterator).strip().decode(), None, None
else:
return None, None, None
finally:
page.close()
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package: xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
from six.moves.urllib import parse as urllib_parse
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
import requests
from .cas_client_base import CASClientBase
class CASClientV2(CASClientBase):
"""CAS Client Version 2"""
url_suffix = 'serviceValidate'
logout_redirect_param_name = 'url'
def __init__(self, proxy_callback=None, *args, **kwargs):
"""proxy_callback is for V2 and V3 so V3 is subclass of V2"""
self.proxy_callback = proxy_callback
super(CASClientV2, self).__init__(*args, **kwargs)
def verify_ticket(self, ticket):
"""Verifies CAS 2.0+/3.0+ XML-based authentication ticket and returns extended attributes"""
response = self.get_verification_response(ticket)
return self.verify_response(response)
def get_verification_response(self, ticket):
params = {
'ticket': ticket,
'service': self.service_url
}
if self.proxy_callback:
params.update({'pgtUrl': self.proxy_callback})
base_url = urllib_parse.urljoin(self.server_url, self.url_suffix)
page = requests.get(base_url, params=params)
try:
return page.content
finally:
page.close()
@classmethod
def parse_attributes_xml_element(cls, element):
attributes = dict()
for attribute in element:
tag = attribute.tag.split("}").pop()
if tag in attributes:
if isinstance(attributes[tag], list):
attributes[tag].append(attribute.text)
else:
attributes[tag] = [attributes[tag]]
attributes[tag].append(attribute.text)
else:
if tag == 'attraStyle':
pass
else:
attributes[tag] = attribute.text
return attributes
@classmethod
def verify_response(cls, response):
user, attributes, pgtiou = cls.parse_response_xml(response)
if len(attributes) == 0:
attributes = None
return user, attributes, pgtiou
@classmethod
def parse_response_xml(cls, response):
user = None
attributes = {}
pgtiou = None
tree = ElementTree.fromstring(response)
if tree[0].tag.endswith('authenticationSuccess'):
for element in tree[0]:
if element.tag.endswith('user'):
user = element.text
elif element.tag.endswith('proxyGrantingTicket'):
pgtiou = element.text
elif element.tag.endswith('attributes'):
attributes = cls.parse_attributes_xml_element(element)
return user, attributes, pgtiou
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package: xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
from .cas_client_v2 import CASClientV2
from .single_logout_mixin import SingleLogoutMixin
class CASClientV3(CASClientV2, SingleLogoutMixin):
"""CAS Client Version 3"""
url_suffix = 'serviceValidate'
logout_redirect_param_name = 'service'
@classmethod
def parse_attributes_xml_element(cls, element):
attributes = dict()
for attribute in element:
tag = attribute.tag.split("}").pop()
if tag in attributes:
if isinstance(attributes[tag], list):
attributes[tag].append(attribute.text)
else:
attributes[tag] = [attributes[tag]]
attributes[tag].append(attribute.text)
else:
attributes[tag] = attribute.text
return attributes
@classmethod
def verify_response(cls, response):
return cls.parse_response_xml(response)
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package: xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
from six.moves.urllib import parse as urllib_parse
try:
from xml.etree import ElementTree
except ImportError:
from elementtree import ElementTree
from uuid import uuid4
import datetime
import requests
from .cas_client_v2 import CASClientV2
from .single_logout_mixin import SingleLogoutMixin
SAML_1_0_NS = 'urn:oasis:names:tc:SAML:1.0:'
SAML_1_0_PROTOCOL_NS = '{' + SAML_1_0_NS + 'protocol' + '}'
SAML_1_0_ASSERTION_NS = '{' + SAML_1_0_NS + 'assertion' + '}'
SAML_ASSERTION_TEMPLATE = """<?xml version="1.0" encoding="UTF-8"?>
<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
<SOAP-ENV:Header/>
<SOAP-ENV:Body>
<samlp:Request xmlns:samlp="urn:oasis:names:tc:SAML:1.0:protocol"
MajorVersion="1"
MinorVersion="1"
RequestID="{request_id}"
IssueInstant="{timestamp}">
<samlp:AssertionArtifact>{ticket}</samlp:AssertionArtifact></samlp:Request>
</SOAP-ENV:Body>
</SOAP-ENV:Envelope>"""
class CASClientWithSAMLV1(CASClientV2, SingleLogoutMixin):
"""CASClient 3.0+ with SAML"""
def verify_ticket(self, ticket, **kwargs):
"""Verifies CAS 3.0+ XML-based authentication ticket and returns extended attributes.
@date: 2011-11-30
@author: Carlos Gonzalez Vila <carlewis@gmail.com>
Returns username and attributes on success and None,None on failure.
"""
page = self.fetch_saml_validation(ticket)
try:
user = None
attributes = {}
response = page.content
tree = ElementTree.fromstring(response)
# Find the authentication status
success = tree.find('.//' + SAML_1_0_PROTOCOL_NS + 'StatusCode')
if success is not None and success.attrib['Value'].endswith(':Success'):
# User is validated
name_identifier = tree.find('.//' + SAML_1_0_ASSERTION_NS + 'NameIdentifier')
if name_identifier is not None:
user = name_identifier.text
attrs = tree.findall('.//' + SAML_1_0_ASSERTION_NS + 'Attribute')
for at in attrs:
if self.username_attribute in list(at.attrib.values()):
user = at.find(SAML_1_0_ASSERTION_NS + 'AttributeValue').text
attributes['uid'] = user
values = at.findall(SAML_1_0_ASSERTION_NS + 'AttributeValue')
if len(values) > 1:
values_array = []
for v in values:
values_array.append(v.text)
attributes[at.attrib['AttributeName']] = values_array
else:
attributes[at.attrib['AttributeName']] = values[0].text
return user, attributes, None
finally:
page.close()
def fetch_saml_validation(self, ticket):
# We do the SAML validation
headers = {
'soapaction': 'http://www.oasis-open.org/committees/security',
'cache-control': 'no-cache',
'pragma': 'no-cache',
'accept': 'text/xml',
'connection': 'keep-alive',
'content-type': 'text/xml; charset=utf-8',
}
params = {'TARGET': self.service_url}
saml_validate_url = urllib_parse.urljoin(
self.server_url, 'samlValidate',
)
return requests.post(
saml_validate_url,
self.get_saml_assertion(ticket),
params=params,
headers=headers)
@classmethod
def get_saml_assertion(cls, ticket):
"""
http://www.jasig.org/cas/protocol#samlvalidate-cas-3.0
SAML request values:
RequestID [REQUIRED]:
unique identifier for the request
IssueInstant [REQUIRED]:
timestamp of the request
samlp:AssertionArtifact [REQUIRED]:
the valid CAS Service Ticket obtained as a response parameter at login.
"""
# RequestID [REQUIRED] - unique identifier for the request
request_id = uuid4()
# e.g. 2014-06-02T09:21:03.071189
timestamp = datetime.datetime.now().isoformat()
return SAML_ASSERTION_TEMPLATE.format(
request_id=request_id,
timestamp=timestamp,
ticket=ticket,
).encode('utf8')
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package: xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
class CASError(ValueError):
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Package: xjtucas_pyclient
Author: xczh <zhuxingchi@tiaozhan.com>
Copyright (C) 2016 tiaozhan. All Rights Reserved.
"""
from lxml import etree
class SingleLogoutMixin(object):
@classmethod
def get_saml_slos(cls, logout_request):
"""returns saml logout ticket info"""
try:
root = etree.fromstring(logout_request)
return root.xpath(
"//samlp:SessionIndex",
namespaces={'samlp': "urn:oasis:names:tc:SAML:2.0:protocol"})
except etree.XMLSyntaxError:
pass
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment