Skip to content

Commit

Permalink
Merge PKIInstance.nssdb_import_cert() into cert_import()
Browse files Browse the repository at this point in the history
  • Loading branch information
edewata committed Nov 2, 2023
1 parent 781d1c8 commit b2b2a54
Showing 1 changed file with 22 additions and 45 deletions.
67 changes: 22 additions & 45 deletions base/server/python/pki/server/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,93 +711,70 @@ def csr_file(self, cert_id):
"""Compute name of CSR under instance cert folder."""
return os.path.join(self.cert_folder, cert_id + '.csr')

def nssdb_import_cert(self, cert_id, cert_file=None):
def cert_import(self, cert_id, cert_file=None):
"""
Add cert from cert_file to NSS db with appropriate trust flags
Import cert from cert_file into NSS db with appropriate trust
:param cert_id: Cert ID
:type cert_id: str
:param cert_file: Cert file to be imported into NSS db
:type cert_file: str
:return: New cert data loaded into nssdb
:rtype: dict
:raises pki.server.PKIServerException
:return: None
:rtype: None
"""

# If cert_file is not provided, load the cert from /etc/pki/certs/<cert_id>.crt
if not cert_file:
cert_file = self.cert_file(cert_id)

logger.debug('Importing cert %s from %s', cert_id, cert_file)

if not os.path.isfile(cert_file):
raise pki.server.PKIServerException('File does not exist: %s' % cert_file)

subsystem_name, cert_tag = pki.server.PKIServer.split_cert_id(cert_id)

if not subsystem_name:
subsystem_name = self.get_subsystems()[0].name

logger.debug('- subsystem: %s', subsystem_name)
logger.debug('- cert tag: %s', cert_tag)

subsystem = self.get_subsystem(subsystem_name)

# audit and CA signing cert require special flags set in NSSDB
trust_attributes = None
if subsystem_name == 'ca' and cert_tag == 'signing':
if cert_id == 'ca_signing':
trust_attributes = 'CT,C,C'
elif cert_tag == 'audit_signing':
trust_attributes = ',,P'

logger.debug('- trust flags: %s', trust_attributes)

nssdb = self.open_nssdb()

try:
# If cert_file is not provided, load the cert from /etc/pki/certs/<cert_id>.crt
if not cert_file:
cert_file = self.cert_file(cert_id)

if not os.path.isfile(cert_file):
raise pki.server.PKIServerException('%s does not exist.' % cert_file)

cert = subsystem.get_subsystem_cert(cert_tag)

logger.debug('Checking existing %s certificate in NSS database'
' for subsystem: %s, instance: %s',
cert_tag, subsystem_name, self.name)
logger.debug('Checking existing %s cert', cert_id)

if nssdb.get_cert(
nickname=cert['nickname'],
token=cert['token']):
raise pki.server.PKIServerException(
'Certificate already exists: %s in subsystem %s' % (cert_tag, self.name))
'Certificate already exists: %s' % cert_id)

logger.debug('Importing new %s certificate into NSS database'
' for subsys %s, instance %s',
cert_tag, subsystem_name, self.name)
logger.debug('Importing %s cert', cert_id)

nssdb.add_cert(
nickname=cert['nickname'],
token=cert['token'],
cert_file=cert_file,
trust_attributes=trust_attributes)

logger.info('Updating CS.cfg with the new certificate')
data = nssdb.get_cert(
nickname=cert['nickname'],
token=cert['token'],
output_format='base64')

# Store the cert data retrieved from NSS db
cert['data'] = data

return cert

finally:
nssdb.close()

def cert_import(self, cert_id, cert_file=None):
"""
Import cert from cert_file into NSS db with appropriate trust
:param cert_id: Cert ID
:type cert_id: str
:param cert_file: Cert file to be imported into NSS db
:type cert_file: str
:return: None
:rtype: None
"""
self.nssdb_import_cert(cert_id, cert_file)

def cert_create(
self, cert_id=None,
username=None, password=None,
Expand Down

0 comments on commit b2b2a54

Please sign in to comment.