'''
This module provides the class PIDClientCredentials
which handles the credentials for Handle server
Interaction and for the Search Servlet.
Author: Merret Buurman (DKRZ), 2015-2016
'''
import json
import os
import logging
import pyhandle
from pyhandle.handleexceptions import CredentialsFormatError, HandleSyntaxError
import pyhandle.utilhandle as utilhandle
import pyhandle.util as util
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(util.NullHandler())
class PIDClientCredentials(object):
'''
Provides authentication information to access a Handle server, either by
specifying username and password or by providing a json file containing
the relevant information.
'''
[docs] @staticmethod
def load_from_JSON(json_filename):
'''
Create a new instance of a PIDClientCredentials with information read
from a local JSON file.
:param json_filename: The path to the json credentials file. The json
file should have the following format:
.. code:: json
{
"handle_server_url": "https://url.to.your.handle.server",
"username": "index:prefix/suffix",
"password": "ZZZZZZZ",
"prefix": "prefix_to_use_for_writing_handles",
"handleowner": "username_to_own_handles"
}
Any additional key-value-pairs are stored in the instance as
config.
:raises: :exc:`~pyhandle.handleexceptions.CredentialsFormatError`
:raises: :exc:`~pyhandle.handleexceptions.HandleSyntaxError`
:return: An instance.
'''
try:
jsonfilecontent = json.loads(open(json_filename, 'r').read())
except ValueError as exc:
raise CredentialsFormatError(msg="Invalid JSON syntax: "+str(exc))
instance = PIDClientCredentials(credentials_filename=json_filename,**jsonfilecontent)
return instance
[docs] def __init__(self, **args):
'''
Initialize client credentials instance.
The constructor checks if enough arguments are passed to
authenticate at a handle server or search servlet. For this,
the following parameters are checked. Depending on the
chosen authentication method, only a subset of them are
required.
All other parameters passed are stored and can be retrieved
using 'get_config()'. If a credentials objects is used to
initialize the client, these key-value pairs are passed on
to the client constructor.
:param client: Client object to the HS ('rest' or 'db')
:param handle_server_url: Optional. The URL of the Handle System
server to read from. Defaults to 'https://hdl.handle.net'
:param username: Optional. This must be a handle value reference in
the format "index:prefix/suffix". The method will throw an exception
upon bad syntax or non-existing Handle. The existence or validity
of the password in the handle is not checked at this moment.
:param password: Optional. This is the password stored as secret key
in the actual Handle value the username points to.
:param handleowner: Optional. The username that will be given admin
permissions over every newly created handle. By default, it is
'200:0.NA/xyz' (where xyz is the prefix of the handle being created.
:param private_key: Optional. The path to a file containing the private
key that will be used for authentication in write mode. If this is
specified, a certificate needs to be specified too.
:param certificate_only: Optional. The path to a file containing the
client certificate that will be used for authentication in write
mode. If this is specified, a private key needs to be specified too.
:param certificate_and_key: Optional. The path to a file containing both
certificate and private key, used for authentication in write mode.
:param prefix: Prefix. This is not used by the library, but may be
retrieved by the user.
:credentials_filename: This is the file location of the credentials file,
if read from JSON. It is used to find the certificate/key files, if any.
:param \**args: Any other key-value pairs are stored and can be accessed
using 'get_config()'.
:raises: :exc:`~pyhandle.handleexceptions.HandleSyntaxError`
'''
util.log_instantiation(LOGGER, 'PIDClientCredentials', args, ['password','reverselookup_password'])
# Possible arguments:
useful_args = [
'client',
'handle_server_url',
'username',
'password',
'private_key',
'certificate_only',
'certificate_and_key',
'prefix',
'handleowner',
'reverselookup_password',
'reverselookup_username',
'reverselookup_baseuri',
'credentials_filename',
'db_host',
'db_user',
'db_password',
'db_name',
'passphrase',
]
util.add_missing_optional_args_with_value_none(args, useful_args)
# Store args
self.__all_args = args
# Args that the constructor understands:
self.__client = args['client']
self.__handle_server_url = args['handle_server_url']
self.__username = args['username']
self.__password = args['password']
self.__prefix = args['prefix']
self.__handleowner = args['handleowner']
self.__private_key = args['private_key']
self.__certificate_only = args['certificate_only']
self.__certificate_and_key = args['certificate_and_key']
self.__reverselookup_password = args['reverselookup_password']
self.__reverselookup_username = args['reverselookup_username']
self.__reverselookup_baseuri = args['reverselookup_baseuri']
self.__credentials_filename = args['credentials_filename']
self.__db_host = args['db_host']
self.__db_user = args['db_user']
self.__db_password = args['db_password']
self.__db_name = args['db_name']
self.__passphrase = args['passphrase']
# All the other args collected as "additional config":
self.__additional_config = self.__collect_additional_arguments(args, useful_args)
# Some checks:
self.__check_handle_syntax()
self.__check_file_existence()
if self.__check_client_existence():
if self.__client == 'db':
self.__check_if_enough_args_for_hs_auth_db(args)
elif self.__client == 'rest':
self.__check_if_enough_args_for_revlookup_auth(args)
self.__check_if_enough_args_for_hs_auth()
elif self.__client == 'batch':
if self.__private_key:
self.__check_if_enough_args_for_hs_auth_batch_pubkey(args)
else:
self.__check_if_enough_args_for_hs_auth_batch_seckey(args)
else:
msg = 'Client not provided or empty'
raise CredentialsFormatError(msg=msg)
def __check_client_existence(self):
if not self.__client:
return False
return True
def __check_if_enough_args_for_hs_auth_batch_seckey(self, args):
batch_args_seckey = ['username', 'password']
empty_args = []
for k in batch_args_seckey:
if not args[k]:
empty_args.append(k)
if empty_args:
msg = '(%s) are missing or empty' % empty_args
raise CredentialsFormatError(msg=msg)
def __check_if_enough_args_for_hs_auth_batch_pubkey(self, args):
batch_args_pubkey = ['username', 'private_key']
empty_args = []
for k in batch_args_pubkey:
if not args[k]:
empty_args.append(k)
if not self.__passphrase:
if self.__passphrase is not None:
empty_args.append('passphrase')
if empty_args:
msg = '(%s) are missing or empty' % empty_args
raise CredentialsFormatError(msg=msg)
def __check_if_enough_args_for_hs_auth_db(self, args):
db_args = ['db_host', 'db_user', 'db_password', 'db_name']
empty_args = []
for k in db_args:
if not args[k]:
empty_args.append(k)
if empty_args:
msg = '(%s) are missing or empty' % empty_args
raise CredentialsFormatError(msg=msg)
def __collect_additional_arguments(self, args, used_args):
temp_additional_config = {}
for argname in args.keys():
if argname not in used_args:
temp_additional_config[argname] = args[argname]
if len(temp_additional_config) > 0:
return temp_additional_config
else:
return None
def __check_if_enough_args_for_revlookup_auth(self, args):
user = args['reverselookup_username'] or args['username']
pw = args['reverselookup_password'] or args['password']
url = args['reverselookup_baseuri'] or args['handle_server_url']
if user and pw and url:
self.__reverselookup = True
self.__reverselookup_username = user
self.__reverselookup_password = pw
self.__reverselookup_baseuri = url
LOGGER.debug('Sufficient information given for reverselookup.')
else:
self.__reverselookup = False
def __check_handle_syntax(self):
if self.__handleowner:
pyhandle.utilhandle.check_handle_syntax_with_index(self.__handleowner)
if self.__username:
pyhandle.utilhandle.check_handle_syntax_with_index(self.__username)
def __check_file_existence(self):
if self.__certificate_only:
try:
self.__certificate_only = self.__get_path_and_check_file_existence(self.__certificate_only)
except ValueError as e:
msg = '(certficate file): '+e.__str__()
raise CredentialsFormatError(msg=msg)
if self.__certificate_and_key:
try:
self.__certificate_and_key = self.__get_path_and_check_file_existence(self.__certificate_and_key)
except ValueError as e:
msg = '(certficate and key file): '+e.__str__()
raise CredentialsFormatError(msg=msg)
if self.__private_key:
try:
self.__private_key = self.__get_path_and_check_file_existence(self.__private_key)
except ValueError as e:
msg = '(private key file): '+e.__str__()
raise CredentialsFormatError(msg=msg)
def __get_path_and_check_file_existence(self, path):
try:
path = util.get_absolute_path(path, self.__credentials_filename)
except ValueError: # not a valid path
thisdir = util.get_this_directory(self.__credentials_filename)
msg = ('Please provide an absolute path or a path relative to '
'the location of the credentials file\'s location (%s), '
'starting with %s.' % (thisdir, os.path.curdir))
raise ValueError(msg)
if not os.path.isfile(path): # file does not exist
msg = 'The file was not found at the specified path: '+path
raise ValueError(msg)
return path
def __check_if_enough_args_for_hs_auth(self):
# Which authentication method?
authentication_method = None
# DB authentication
if self.__db_host and self.__db_user and self.__db_password and self.__db_name:
authentication_method = 'db_auth'
# Username and Password
if self.__username and self.__password:
authentication_method = 'user_password'
# Certificate file and Key file
if self.__certificate_only and self.__private_key:
authentication_method = 'auth_cert_2files'
# Certificate and Key in one file
if self.__certificate_and_key:
authentication_method = 'auth_cert_1file'
# None was provided:
if authentication_method is None:
if self.__reverselookup is True:
msg = ('Insufficient credentials for writing to handle '
'server, but sufficient credentials for searching.')
LOGGER.info(msg)
else:
msg = ''
if self.__username and not self.__password:
msg += 'Username was provided, but no password. '
elif self.__password and not self.__username:
msg += 'Password was provided, but no username. '
if self.__certificate_only and not self.__private_key:
msg += 'A client certificate was provided, but no private key. '
elif self.__private_key and not self.__certificate_only:
msg += 'A private key was provided, but no client certificate. '
if self.__reverselookup is None:
msg += 'Reverse lookup credentials not checked yet.'
elif self.__reverselookup is False:
msg += 'Insufficient credentials for searching.'
raise CredentialsFormatError(msg=msg)
def get_all_args(self):
# pylint: disable=missing-docstring
return self.__all_args
def get_client(self):
# pylint: disable=missing-docstring
return self.__client
def get_username(self):
# pylint: disable=missing-docstring
return self.__username
def get_password(self):
# pylint: disable=missing-docstring
return self.__password
def get_server_URL(self):
# pylint: disable=missing-docstring
return self.__handle_server_url
def get_prefix(self):
# pylint: disable=missing-docstring
return self.__prefix
def get_handleowner(self):
# pylint: disable=missing-docstring
return self.__handleowner
def get_config(self):
# pylint: disable=missing-docstring
return self.__additional_config
def get_path_to_private_key(self):
# pylint: disable=missing-docstring
return self.__private_key
def get_path_to_file_certificate(self):
# pylint: disable=missing-docstring
return self.__certificate_only or self.__certificate_and_key
def get_path_to_file_certificate_only(self):
# pylint: disable=missing-docstring
return self.__certificate_only
def get_path_to_file_certificate_and_key(self):
# pylint: disable=missing-docstring
return self.__certificate_and_key
def get_reverselookup_username(self):
# pylint: disable=missing-docstring
return self.__reverselookup_username
def get_reverselookup_password(self):
# pylint: disable=missing-docstring
return self.__reverselookup_password
def get_reverselookup_baseuri(self):
# pylint: disable=missing-docstring
return self.__reverselookup_baseuri
def get_db_host(self):
# pylint: disable=missing-docstring
return self.__db_host
def get_db_user(self):
# pylint: disable=missing-docstring
return self.__db_user
def get_db_password(self):
# pylint: disable=missing-docstring
return self.__db_password
def get_db_name(self):
# pylint: disable=missing-docstring
return self.__db_name
def get_key_passphrase(self):
# pylint: disable=missing-docstring
return self.__passphrase