Skip to content

Commit

Permalink
Convert PKISubsystem.setup_temp_renewal() into get_cert_ski()
Browse files Browse the repository at this point in the history
The PKISubsystem.setup_temp_renewal() has been simplified and
converted into get_cert_ski() which takes a base64-encoded cert
and returns its SKI.
  • Loading branch information
edewata committed Nov 2, 2023
1 parent b2b2a54 commit e2dca04
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 41 deletions.
2 changes: 1 addition & 1 deletion base/server/python/pki/server/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ def cert_create(
logger.info('Trying to create a new temp cert for %s.', cert_id)

# Create Temp Cert and write it to new_cert_file
subsystem.temp_cert_create(nssdb, tmpdir, cert_tag, serial, new_cert_file)
subsystem.temp_cert_create(nssdb, cert_tag, serial, new_cert_file)

logger.info('Temp cert for %s is available at %s.', cert_id, new_cert_file)

Expand Down
77 changes: 37 additions & 40 deletions base/server/python/pki/server/subsystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -933,64 +933,52 @@ def set_startup_test_criticality(self, critical, test=None):
target_tests[testID] = critical
self.set_startup_tests(target_tests)

def setup_temp_renewal(self, tmpdir):
def get_cert_ski(self, cert_data):
"""
Retrieve CA signing cert info and Subject Key Identifier (SKI aka AKI)
Get the Subject Key Identifier of a certificate
:param tmpdir: Path to temp dir to write CA signing cert file
:type tmpdir: str
:return: (ca_signing_cert, aki)
:param cert_data: Base64-encoded cert data
:type cert_data: str
:return: ski
"""

ca_cert_file = os.path.join(tmpdir, 'ca_certificate.crt')
pem_cert = pki.nssdb.convert_cert(cert_data, 'base64', 'pem')

logger.debug('Extracting SKI from CA cert')
# TODO: Support remote CA.

# Retrieve Subject Key Identifier from CA cert
ca_signing_cert = self.instance.get_subsystem('ca').get_subsystem_cert('signing')

ca_cert_data = ca_signing_cert.get('data')
if ca_cert_data is None:
raise pki.server.PKIServerException(
'Unable to find certificate data for CA signing certificate.')

logger.debug('Retrieved CA cert details: %s', ca_cert_data)
tmpdir = tempfile.mkdtemp()
try:
cert_file = os.path.join(tmpdir, 'cert.crt')
with open(cert_file, 'w', encoding='utf-8') as f:
f.write(pem_cert)

ca_cert = pki.nssdb.convert_cert(ca_cert_data, 'base64', 'pem')
with open(ca_cert_file, 'w', encoding='utf-8') as f:
f.write(ca_cert)
logger.info('CA cert written to %s', ca_cert_file)
cmd = [
'openssl',
'x509',
'-in', cert_file,
'-noout',
'-text'
]

ca_cert_retrieve_cmd = [
'openssl',
'x509',
'-in', ca_cert_file,
'-noout',
'-text'
]
logger.debug('Command: %s', ' '.join(cmd))
cert_info = subprocess.check_output(cmd).decode('utf-8')

logger.debug('Command: %s', ' '.join(ca_cert_retrieve_cmd))
ca_cert_details = subprocess.check_output(ca_cert_retrieve_cmd).decode('utf-8')
finally:
shutil.rmtree(tmpdir)

aki = re.search(r'Subject Key Identifier.*\n.*?(.*?)\n', ca_cert_details).group(1)
ski = re.search(r'Subject Key Identifier.*\n.*?(.*?)\n', cert_info).group(1)

# Add 0x to represent this as a Hex
aki = '0x' + aki.strip().replace(':', '')
logger.info('AKI: %s', aki)
ski = '0x' + ski.strip().replace(':', '')
logger.info('SKI: %s', ski)

return ca_signing_cert, aki
return ski

def temp_cert_create(self, nssdb, tmpdir, cert_tag, serial, new_cert_file):
def temp_cert_create(self, nssdb, cert_tag, serial, new_cert_file):
"""
Generates temp cert with validity of 3 months by default
**Note**: Currently, supports only *sslserver* cert
:param nssdb: NSS db instance
:type nssdb: NSSDatabase
:param tmpdir: Path to temp dir to write cert's csr and ca's cert file
:type tmpdir: str
:param cert_tag: Cert for which temp cert needs to be created
:type cert_tag: str
:param serial: Serial number to be assigned to new cert
Expand All @@ -1006,7 +994,16 @@ def temp_cert_create(self, nssdb, tmpdir, cert_tag, serial, new_cert_file):
raise pki.server.PKIServerException(
'Temp cert for %s is not supported yet.' % cert_tag)

ca_signing_cert, aki = self.setup_temp_renewal(tmpdir=tmpdir)
ca_signing_cert = self.instance.get_subsystem('ca').get_subsystem_cert('signing')
# TODO: Support remote CA.

ca_cert_data = ca_signing_cert.get('data')
logger.debug('CA signing cert: %s', ca_cert_data)

if ca_cert_data is None:
raise pki.server.PKIServerException('Missing CA signing certificate')

aki = self.get_cert_ski(ca_cert_data)

csr_file = self.instance.csr_file(cert_tag)
logger.debug('Reusing existing CSR in %s', csr_file)
Expand Down

0 comments on commit e2dca04

Please sign in to comment.