kallithea-hooks/hooks/canpush.py

94 lines
2.9 KiB
Python
Executable File

from __future__ import with_statement
import sys
import os
#from rhodecode.bin.base import json, api_call
from kallithea.lib.compat import json
def api_call(apikey, apihost, method=None, **kw):
import random
import urllib2
import pprint
"""
Api_call wrapper for RhodeCode.
:param apikey:
:param apihost:
:param format: formatting, pretty means prints and pprint of json
json returns unparsed json
:param method:
:returns: json response from server
"""
def _build_data(random_id):
"""
Builds API data with given random ID
:param random_id:
"""
return {
"id": random_id,
"api_key": apikey,
"method": method,
"args": kw
}
if not method:
raise Exception('please specify method name !')
id_ = random.randrange(1, 9999)
req = urllib2.Request('%s/_admin/api' % apihost,
data=json.dumps(_build_data(id_)),
headers={'content-type': 'text/plain'})
ret = urllib2.urlopen(req)
raw_json = ret.read()
json_data = json.loads(raw_json)
id_ret = json_data['id']
if id_ret == id_:
return json_data
else:
_formatted_json = pprint.pformat(json_data)
raise Exception('something went wrong. '
'ID mismatch got %s, expected %s | %s' % (
id_ret, id_, _formatted_json))
def main(argv=None):
#print ','.join(os.environ)
#print os.environ['RC_SCM_DATA']
rc_scm_data = json.loads(os.environ['KALLITHEA_EXTRAS'])
username = rc_scm_data['username']
repoid = rc_scm_data['repository']
apihost = rc_scm_data['server_url']
apikey = 'CHANGEME'
method = 'get_repo'
margs = {'repoid': repoid}
#print 'Calling method %s(%s) => %s' % (method, margs, apihost)
json_resp = api_call(apikey, apihost, method, **margs)
if json_resp['error']:
json_data = json_resp['error']
else:
json_data = json_resp['result']
#print 'Server response \n%s' % (json.dumps(json_data, indent=4, sort_keys=True))
#print 'members:', json_data['members']
# if we are the owner, allow pushing
#if json_data['owner'] == username:
# return 0
# if we, or the default user, have write or admin permissions, allow pushing
for member in json_data['members']:
#if (member['username'] == 'default' or member['username'] == username) and (member['permission'] == 'repository.write' or member['permission'] == 'repository.admin'):
if (member['name'] == 'default' or member['name'] == username) and (member['permission'] == 'repository.write' or member['permission'] == 'repository.admin'):
return 0
# otherwise deny pushing
print 'You do not have the proper permissions to write to this repository! Ask the owner to add you.'
return 1
if __name__ == '__main__':
sys.exit(main(sys.argv))