diff --git a/libagent/ssh/certificate.py b/libagent/ssh/certificate.py new file mode 100644 index 00000000..c8c1e850 --- /dev/null +++ b/libagent/ssh/certificate.py @@ -0,0 +1,74 @@ +"""Utilities for signing ssh-certificates.""" +import io + +from . import formats, util + + +def _parse_stringlist(i): + res = [] + length, = util.recv(i, '>L') + while length >= 4: + size, = util.recv(i, '>L') + length -= 4 + size = min(size, length) + if size == 0: + continue + res.append(util.recv(i, size).decode('utf8')) + length -= size + return res + + +def parse(blob): + """Parses a data blob to a to-be-signed ssh-certificate.""" + # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys + res = {} + i = io.BytesIO(blob) + firstString = util.read_frame(i) + if firstString.endswith(b'cert-v01@openssh.com'): + res['isCertificate'] = True + _certificate_key_type = firstString + _nonce = util.read_frame(i) + if (_certificate_key_type.startswith(b'ssh-rsa')): + _pub_key = {} + _pub_key['e'] = util.read_frame(i) + _pub_key['n'] = util.read_frame(i) + elif (_certificate_key_type.startswith(b'ssh-dsa')): + _pub_key = {} + _pub_key['p'] = util.read_frame(i) + _pub_key['q'] = util.read_frame(i) + _pub_key['g'] = util.read_frame(i) + _pub_key['y'] = util.read_frame(i) + elif (_certificate_key_type.startswith(b'ecdsa-sha2-nistp')): + _curve = util.read_frame(i) + _pub_key = util.read_frame(i) + elif (_certificate_key_type.startswith(b'ssh-ed25519')): + _pub_key = util.read_frame(i) + else: + raise ValueError('unknown certificate key type: '+_certificate_key_type.decode('utf8')) + _serial_number, = util.recv(i, '>Q') + res['certificate_type'], = util.recv(i, '>L') + _key_id_ = util.read_frame(i) + res['principals'] = _parse_stringlist(i) + res['principals'] = ', '.join(res['principals']) + _valid_after, = util.recv(i, '>Q') + _valid_before, = util.recv(i, '>Q') + _critical_options = _parse_stringlist(i) + _extensions = _parse_stringlist(i) + _reserved = util.read_frame(i) + _signature_key = util.read_frame(i) + assert not i.read() + return res + res['isCertificate'] = False + i.close() + return res + + +def format(certificate): + """ + Formats list properties to comma seperated strings and + the signature key to human readable string. + """ + certificate['principals'] = ', '.join(certificate['principals']) + certificate['critical_options'] = ', '.join(certificate['critical_options']) + certificate['extensions'] = ', '.join(certificate['extensions']) + certificate['signature_key'] = formats.parse_pubkey(certificate['signature_key']) diff --git a/libagent/ssh/client.py b/libagent/ssh/client.py index aa3b47cc..2a4d6aa9 100644 --- a/libagent/ssh/client.py +++ b/libagent/ssh/client.py @@ -6,7 +6,7 @@ import io import logging -from . import formats, util +from . import certificate, formats, util log = logging.getLogger(__name__) @@ -36,6 +36,15 @@ def sign_ssh_challenge(self, blob, identity): if msg['sshsig']: log.info('please confirm "%s" signature for "%s" using %s...', msg['namespace'], identity.to_string(), self.device) + elif msg['sshcertsign']: + entity = 'unknown' + if msg['certificate_type'] == 1: + entity = 'user' + elif msg['certificate_type'] == 2: + entity = 'host' + log.info('please confirm signing public key for %s "%s" with "%s" using %s...', + entity, msg['principals'], identity.to_string(), + self.device) else: log.debug('%s: user %r via %r (%r)', msg['conn'], msg['user'], msg['auth'], msg['key_type']) @@ -59,22 +68,31 @@ def parse_ssh_blob(data): i = io.BytesIO(data[6:]) # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.sshsig res['sshsig'] = True + res['sshcertsign'] = False res['namespace'] = util.read_frame(i) res['reserved'] = util.read_frame(i) res['hashalg'] = util.read_frame(i) res['message'] = util.read_frame(i) else: - i = io.BytesIO(data) res['sshsig'] = False - res['nonce'] = util.read_frame(i) - i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108) - res['user'] = util.read_frame(i) - res['conn'] = util.read_frame(i) - res['auth'] = util.read_frame(i) - i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056) - res['key_type'] = util.read_frame(i) - public_key = util.read_frame(i) - res['public_key'] = formats.parse_pubkey(public_key) + _certificate = certificate.parse(data) + if _certificate['isCertificate']: + certificate.format(_certificate) + _certificate['sshsig'] = res['sshsig'] + _certificate['sshcertsign'] = True + return _certificate + else: + res['sshcertsign'] = False + i = io.BytesIO(data) + res['nonce'] = util.read_frame(i) + i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108) + res['user'] = util.read_frame(i) + res['conn'] = util.read_frame(i) + res['auth'] = util.read_frame(i) + i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056) + res['key_type'] = util.read_frame(i) + public_key = util.read_frame(i) + res['public_key'] = formats.parse_pubkey(public_key) unparsed = i.read() if unparsed: diff --git a/libagent/ssh/tests/test_client.py b/libagent/ssh/tests/test_client.py index 9c982244..d510dce4 100644 --- a/libagent/ssh/tests/test_client.py +++ b/libagent/ssh/tests/test_client.py @@ -101,6 +101,7 @@ def test_parse_ssh_challenge(): 'fingerprint': '47:a3:26:af:0b:5d:a2:c3:91:ed:26:36:94:be:3a:d5', 'type': b'ssh-ed25519'}, 'sshsig': False, + 'sshcertsign': False, 'user': b'git', } @@ -122,4 +123,5 @@ def test_parse_ssh_signature(): 'namespace': b'file', 'reserved': b'', 'sshsig': True, + 'sshcertsign': False, }