hadapi: Add beginnings of API wrapper
Handles users and projects for now.
This commit is contained in:
parent
5b7197d46c
commit
a07a2755be
0
hadsh/hadapi/__init__.py
Normal file
0
hadsh/hadapi/__init__.py
Normal file
312
hadsh/hadapi/hadapi.py
Normal file
312
hadsh/hadapi/hadapi.py
Normal file
@ -0,0 +1,312 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
from tornado.httpclient import AsyncHTTPClient
|
||||
from tornado.ioloop import IOLoop
|
||||
from tornado.gen import coroutine, Return
|
||||
from enum import Enum
|
||||
|
||||
try:
|
||||
from urllib import parse as urlparse
|
||||
except ImportError:
|
||||
import urlparse
|
||||
|
||||
from cgi import parse_header
|
||||
|
||||
|
||||
class UserSortBy(Enum):
|
||||
influence='influence'
|
||||
newest='newest'
|
||||
followers='followers'
|
||||
projects='projects'
|
||||
skulls='skulls'
|
||||
|
||||
|
||||
class ProjectSortBy(Enum):
|
||||
skulls='skulls'
|
||||
newest='newest'
|
||||
views='views'
|
||||
comments='comments'
|
||||
followers='followers'
|
||||
updated='updated'
|
||||
|
||||
|
||||
class HackadayAPI(object):
|
||||
"""
|
||||
Core Hackaday.io API handler.
|
||||
"""
|
||||
|
||||
HAD_API_URI='https://api.hackaday.io/v1'
|
||||
HAD_AUTH_URI='https://hackaday.io/authorize'
|
||||
HAD_TOKEN_URI='https://auth.hackaday.io/access_token'\
|
||||
'?client_id=%(CLIENT_ID)s'\
|
||||
'&client_secret=%(CLIENT_SECRET)s'\
|
||||
'&code=%(CODE)s'\
|
||||
'&grant_type=authorization_code'
|
||||
|
||||
def __init__(self, client_id, client_secret, api_key,
|
||||
api_uri=HAD_API_URI, auth_uri=HAD_AUTH_URI,
|
||||
token_uri=HAD_TOKEN_URI, client=None, log=None,
|
||||
io_loop=None):
|
||||
|
||||
if log is None:
|
||||
log = logging.getLogger(self.__class__.__module__)
|
||||
|
||||
if io_loop is None:
|
||||
io_loop = IOLoop.current()
|
||||
|
||||
if client is None:
|
||||
client = AsyncHTTPClient()
|
||||
|
||||
self._client = client
|
||||
self._io_loop = io_loop
|
||||
self._log = log
|
||||
self._client_id = client_id
|
||||
self._client_secret = client_secret
|
||||
self._api_key = api_key
|
||||
self._api_uri = api_uri
|
||||
self._auth_uri = auth_uri
|
||||
self._token_uri = token_uri
|
||||
|
||||
def _decode(self, response, default_encoding='UTF-8'):
|
||||
"""
|
||||
Decode a given reponse body.
|
||||
"""
|
||||
# Ideally, encoding should be in the content type
|
||||
(ct, ctopts) = parse_header(response.headers['Content-Type'])
|
||||
encoding = ctopts.get('charset', default_encoding)
|
||||
|
||||
# Return the decoded payload along with the content-type.
|
||||
return (ct, ctopts, response.body.decode(encoding))
|
||||
|
||||
@coroutine
|
||||
def _api_call(self, uri, query=None, token=None, api_key=True, **kwargs):
|
||||
headers = kwargs.setdefault('headers', {})
|
||||
headers.setdefault('Accept', 'application/json')
|
||||
if token is not None:
|
||||
headers['Authorization'] = 'token %s' % token
|
||||
|
||||
if query is None:
|
||||
query = {}
|
||||
|
||||
if api_key:
|
||||
query.setdefault('api_key', self._api_key)
|
||||
|
||||
encode_kv = lambda k, v : '%s=%s' % (k, quote_plus(str(v)))
|
||||
def encode_item(item):
|
||||
(key, value) = item
|
||||
if isinstance(value, list):
|
||||
return '&'.join(map(lambda v : encode_kv(key, v), value))
|
||||
else:
|
||||
return encode_kv(key, value)
|
||||
|
||||
uri += '?%s' % '&'.join(map(encode_item, query.items()))
|
||||
|
||||
if not uri.startswith('http'):
|
||||
uri = self._api_uri + uri
|
||||
|
||||
response = yield self._client.fetch(uri, **kwargs)
|
||||
(ct, ctopts, body) = self._decode(response)
|
||||
if ct.lower() != 'application/json':
|
||||
raise ValueError('Server returned unrecognised type %s' % ct)
|
||||
raise Return(json.loads(body)
|
||||
|
||||
# oAuth endpoints
|
||||
|
||||
def get_token(self, code):
|
||||
"""
|
||||
Fetch the token for API queries from the authorization code given.
|
||||
"""
|
||||
# Determine where to retrieve the access token from
|
||||
post_uri = self._token_uri % dict(
|
||||
CLIENT_ID=quote_plus(self._client_id),
|
||||
CLIENT_SECRET=quote_plus(self._client_secret),
|
||||
CODE=quote_plus(code)
|
||||
)
|
||||
|
||||
return self._api_call(
|
||||
post_uri, method='POST', body=b'', api_key=False)
|
||||
|
||||
# Pagination options
|
||||
|
||||
def _page_query_opts(self, page, perpage):
|
||||
query = {}
|
||||
if page is not None:
|
||||
query['page'] = int(page)
|
||||
if perpage is not None:
|
||||
query['perpage'] = int(perpage)
|
||||
|
||||
return query
|
||||
|
||||
# User API endpoints
|
||||
|
||||
def get_current_user(self, token):
|
||||
"""
|
||||
Fetch the current user's profile information.
|
||||
"""
|
||||
return self._api_call('/me', token=token)
|
||||
|
||||
def _user_query_opts(self, sortby, page, perpage):
|
||||
query = _page_query_opts(page, perpage)
|
||||
sortby = UserSortBy(sortby)
|
||||
query['sortby'] = sortby.value
|
||||
return query
|
||||
|
||||
def get_users(self, sortby=UserSortBy.influence,
|
||||
ids=None, page=None, perpage=None):
|
||||
"""
|
||||
Retrieve a list of all users
|
||||
"""
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
|
||||
if ids is None:
|
||||
return self._api_call('/users', query=query)
|
||||
elif isinstance(ids, slice):
|
||||
query['ids'] = '%d,%d' % (slice.start, slice.stop)
|
||||
return self._api_call('/users/range', query=query)
|
||||
else:
|
||||
ids = list(ids)
|
||||
if len(ids) > 50:
|
||||
raise ValueError('Too many IDs')
|
||||
query['ids'] = ','.join(ids)
|
||||
return self._api_call('/users/batch', query=query)
|
||||
|
||||
def search_users(self, screen_name=None, location=None, tag=None,
|
||||
sortby=UserSortBy.influence, page=None, perpage=None):
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
|
||||
for (arg, val) in ( ('screen_name', screen_name),
|
||||
('location', location),
|
||||
('tag', tag) ):
|
||||
if val is not None:
|
||||
query[arg] = str(val)
|
||||
return self._api_call('/users/search', query=query)
|
||||
|
||||
def get_user(self, user_id):
|
||||
return self._api_call('/users/%d' % user_id)
|
||||
|
||||
def get_user_followers(self, user_id,
|
||||
sortby=UserSortBy.influence, page=None, perpage=None):
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
return self._api_call('/users/%d/followers' % user_id, query=query)
|
||||
|
||||
def get_user_following(self, user_id,
|
||||
sortby=UserSortBy.influence, page=None, perpage=None):
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
return self._api_call('/users/%d/following' % user_id, query=query)
|
||||
|
||||
def get_user_projects(self, user_id,
|
||||
sortby=ProjectSortBy.skulls, page=None, perpage=None):
|
||||
query = self._project_query_opts(sortby, page, perpage)
|
||||
return self._api_call('/users/%d/projects' % user_id, query=query)
|
||||
|
||||
def get_user_skulls(self, user_id,
|
||||
sortby=UserSortBy.influence, page=None, perpage=None):
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
return self._api_call('/users/%d/skulls' % user_id, query=query)
|
||||
|
||||
def get_user_links(self, user_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/users/%d/links' % user_id, query=query)
|
||||
|
||||
def get_user_tags(self, user_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/users/%d/tags' % user_id, query=query)
|
||||
|
||||
def get_user_pages(self, user_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/users/%d/pages' % user_id, query=query)
|
||||
|
||||
# Projects API
|
||||
|
||||
def _project_query_opts(self, sortby, page, perpage):
|
||||
query = _page_query_opts(page, perpage)
|
||||
sortby = ProjectSortBy(sortby)
|
||||
query['sortby'] = sortby.value
|
||||
return query
|
||||
|
||||
def get_projects(self, sortby=ProjectSortBy.influence,
|
||||
ids=None, page=None, perpage=None):
|
||||
"""
|
||||
Retrieve a list of all projects
|
||||
"""
|
||||
query = self._project_query_opts(sortby, page, perpage)
|
||||
|
||||
if ids is None:
|
||||
return self._api_call('/projects', query=query)
|
||||
elif isinstance(ids, slice):
|
||||
query['ids'] = '%d,%d' % (slice.start, slice.stop)
|
||||
return self._api_call('/projects/range', query=query)
|
||||
else:
|
||||
ids = list(ids)
|
||||
if len(ids) > 50:
|
||||
raise ValueError('Too many IDs')
|
||||
query['ids'] = ','.join(ids)
|
||||
return self._api_call('/projects/batch', query=query)
|
||||
|
||||
def search_projects(self, term,
|
||||
sortby=ProjectSortBy.influence, page=None, perpage=None):
|
||||
query = self._project_query_opts(sortby, page, perpage)
|
||||
query['search_term'] = str(term)
|
||||
return self._api_call('/projects/search', query=query)
|
||||
|
||||
def get_project(self, project_id):
|
||||
return self._api_call('/projects/%d' % project_id)
|
||||
|
||||
def get_project_team(self, project_id,
|
||||
sortby=UserSortBy.influence, page=None, perpage=None):
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
return self._api_call('/projects/%d/team' % project_id, query=query)
|
||||
|
||||
def get_project_followers(self, project_id,
|
||||
sortby=UserSortBy.influence, page=None, perpage=None):
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
return self._api_call('/projects/%d/followers' % project_id,
|
||||
query=query)
|
||||
|
||||
def get_project_skulls(self, project_id,
|
||||
sortby=UserSortBy.influence, page=None, perpage=None):
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
return self._api_call('/projects/%d/skulls' % project_id,
|
||||
query=query)
|
||||
|
||||
def get_project_comments(self, project_id,
|
||||
sortby=UserSortBy.influence, page=None, perpage=None):
|
||||
query = self._user_query_opts(sortby, page, perpage)
|
||||
return self._api_call('/projects/%d/comments' % project_id,
|
||||
query=query)
|
||||
|
||||
def get_project_links(self, project_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/projects/%d/links' % project_id,
|
||||
query=query)
|
||||
|
||||
def get_project_images(self, project_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/projects/%d/images' % project_id,
|
||||
query=query)
|
||||
|
||||
def get_project_components(self, project_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/projects/%d/components' % project_id,
|
||||
query=query)
|
||||
|
||||
def get_project_tags(self, project_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/projects/%d/tags' % project_id, query=query)
|
||||
|
||||
def get_project_logs(self, project_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/projects/%d/logs' % project_id, query=query)
|
||||
|
||||
def get_project_instructions(self, project_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/projects/%d/instructions' % project_id,
|
||||
query=query)
|
||||
|
||||
def get_project_details(self, project_id, page=None, perpage=None):
|
||||
query = self._page_query_opts(page, perpage)
|
||||
return self._api_call('/projects/%d/details' % project_id,
|
||||
query=query)
|
Reference in New Issue
Block a user