diff --git a/tests/keys.gpg b/tests/keys.gpg new file mode 100644 index 0000000..7632338 --- /dev/null +++ b/tests/keys.gpg @@ -0,0 +1,61 @@ +-----BEGIN PGP PRIVATE KEY BLOCK----- +Comment: THESE PRIVATE KEYS ARE FOR TESTING, DO NOT IMPORT THEM!! + +lQHYBFW5puEBBACb4DlfxmTUNMNwNf/36yY53+fKQsjmi1Vq2xmCMs88TRBiFmol +5tJHhVPIBJQfkGw9JnRO93aZQj7f9xHexVtQqrKcl/r38OkMEDCHS4zUoRfuWMTx +++X25I9sPNY2H1pMw0dF+0he+/0Z8tTQpUVRYQXwQBEY+6VkjVH98xwxdwARAQAB +AAP/QtXbM/ZIwHaZQDFfPimtG86mP+Lv6m5e4zDr2Jg5pIT0m+I5hGPa0QDZgh94 +dapCxtuIrl1MFH3DoNt65ZagxqP0T3hsrUx+6NHIBHc2gbLZL14H9Es2GJSE2LcH +wm7Oh5aazRhyZrt22SBF3rRiltUZOYgZTFvhisUkFUN5aS0CAMJ8+Qz/hhMwgAWX +DebLqklh/ojhIAuPPxknV9SVllrsanHePIJjEpuiprBeTFIUCsAR0DV2ctPDdJMi +lTZJFTMCAM0s8syUHFxzTWqbv94ezhJbKm3uFZI1g2j8Upgj5UJM0ddP+km0gdvZ +kpMeqc1FPk4M96YJ4mwjA7L48LvHqq0CAKhGRXfMWJh6YGe4jne7QGfsUVuSCsgv +oCByo0TB8cAumkjGAgWTQYOM/Vjre0+Dx9o1IjJTbsW57kvZsHXjJgGhRLQoWmV5 +cGxlIFRlc3QgMSA8dGVzdDFAemV5cGxlLmV4YW1wbGUuY29tPoi4BBMBAgAiBQJV +uabhAhsDBgsJCAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRDWUTwE4kwfg9tOA/4i +aNsBT5poWba+3O1ChrbURq017cAPvNQ9+xHpBLFriZxGK09K8U1KOoiz9h8BYNtl +UulKP2svzTXXWSldcvrmZ+UT61OE62xJOAYrD+KfkjLyG9XxFOpQSPlOODAh+SPe +RCsVGpzGwbJ+78hVIJtbrNHwNcA5X6Bf3JZ2XEWjKp0B2ARVuabhAQQA738PIUUm +aRYk/PUOu+dECKpytfAPIRVunWdyoDQM3TtdQqv44d4p4xWu9D99ILYUiouZWfy3 +8AA09H8BjfjTadNvzupKxOKkhu5O34Zq3IActd9yMy27/QcfFimXE4ymvFhMZSJy +PGoHo9T1Qa9sIo7ljk9gQMEQGU228ROCIY8AEQEAAQAD/A8BtKZ+iT4bc5zgFBjF +EHfEimSJEsGdcK1vPnj4WfgA0MKtOO6aN6CxiqFmWwZSMm5N+gFv+uyQbsEFNkk5 +lrGTVK0ngnj3sy9kGaOmt3HRnV8dtHGZsdVj2Cj75tc+Ah937cOjUolrw2i8GB3D +SBMg+fnRHYNcJRaLdksBOQaFAgD2stdreAzkucSyHwa1mNP3dW+yhO1/gB+yeZ6k +yLSlGRoyOKdvZnX8n7+y6lnYqo2b+MAqF6uM+5R+z0jkR5BVAgD4hrNYkcr2tQwH +MiNKQM9FSHC3yTB57z5CMMbXwrw8t+bINZ5mM2iGOlIR1+uV7rjCmtc92y1lrM3W +LXrksv5TAgCfD3Q0r3nbGhnqLjNg4cyBvY/7tOpDcpS08DQvvnH/NksR2ODHdmRp +ceO6zglGHpTR3xya6qxtfq9iA+QLHJSUpdGInwQYAQIACQUCVbmm4QIbDAAKCRDW +UTwE4kwfgxM5A/9ntQB0RMw9Kc7eisMlRbQ1htLFPJqU0Wx7Wb1KtUwSXAMQJV3M +ybHeK0R3Yp5OTGmwdnmIQ2/oINuqpg6UTceuEcaIwwhJQp1iep8J9+jcpsXwvOlR +xno+9LuI4pXx4AG7pTEYLHhyRgh84vS+ZyjmQK/Fzms4optK0IXR3lOkopUB2ARV +uab8AQQAwtrin0PqSMBJqnqRbpTA2K4oZBP2niJ+yOjGkOBxgrL7WHhn7dc0+63Q +Eq4MlCAZMeqRuNSztvDkqwwiflarhtDNVWBD/o/ASlglQFMRslexLxMkZx5gSUcP +klLD4EnNX8rNnEvdEdzegS9SPd5k3K9GOls4VbiwFzodIQS+qYEAEQEAAQAD/jkI +lWWVnOkvc0B1gMTzwGCL1WG5oClYInEPBTPZpg/h8ITMNWtd3vG9xdX54M+od4dv +R7jodTPaXawdMKl3F9wq2STw6ssFE5s2WY7YFq6xcjUyerDFY5F2jHqmiDqShCYw +L+h6edD1oReCkE60yAIZrfeW7bunq00YT0v35kkDAgDeRHQL4V+PrhpeUBT0O0tK +lB5MnmI2U+75VJ8ahDN4IU0N9MPXAzOLUgKqa5zNRVUfGM5I71yCaxCDIEudRcJz +AgDgbWeVoopKMLRnY5HonAFsbaugoWaZ4Q/arfSgpT8K2RW3RiZNYm1WF56Bl/lQ +E+MeEpFCkkkt9LrB5iYzroM7AfwJOC7VsKqGPm5p6Tsf4FyX2SN7YpLamnIYEVoT +lo2IWNguODCBwc9yCOuXSBXSjgZ52a+UfdT656W7j7LF+gujniK0KFpleXBsZSBU +ZXN0IDIgPHRlc3QyQHpleXBsZS5leGFtcGxlLmNvbT6IuAQTAQIAIgUCVbmm/AIb +AwYLCQgHAwIGFQgCCQoLBBYCAwECHgECF4AACgkQBCLxxZf7FodxSQP/VH3xaElu +G4NlUTsTSSiBqi0C1CCuVzrKNSU8QwfxkEdEv8DR+TMU4K3ZvcEMVFaKFCdps+nC +J8v6irJRHrr29EPujOqZVC/lijvHDfkOm+xSQhXjwv40oqXeypM/N9pxB7sP5Gep +e1fhyjcW8h3A0kOQQDW1JHnrAN0wcIV1SJydAdgEVbmm/AEEAO7All/20nkJl1c5 +h2pVO24w1yRy6S1NMDKv6hD6qitbwwE2Hu8H9gl2FvitaZ/ZELKMItL7z/KxrKid +WIeNKK4ky/qBdq/CAbtoHYHgeUAwUt/dVo2FB4MIecjsFh7wt6+jNNYvy8/4sCII +5VH73zvHOyIZQJ8nzXMu145neH/jABEBAAEAA/4p6Tvch7cF0VW0VaB8XY7rtn4l +41gkgC7MTw4vQdl6eAbA4S/H9SVPHuBEciifC1s/hJMeZ17nMyJkjQ576R8xL+8J +PjqzcSF+DR5u1kqpD+2gh0PhI2+8aa+9yyshbSfn/bqyTa5sa1MhZcyLVLBGCo4+ +0jY2++24Cbdbcw5FkQIA9uH7QXo+08NUdmR2FiRXQSsI6K5o/h12r0EBe14X1swP +8mFpCcWtEBXMP2IfM69wsELs/hXwvIDy92xxiL1yEQIA95G9th/DO+e/hxGOFTis +1xuipKxn915MAKeeFjAIEu/cC/SXrNVpu1GQylN0Fj61Ud1aEWLXZ4igfqOHiafe +swH/SVDhXiGB9kppo4fkfh3XhastBDmasyo8TUTuk34MlGMkvjU6D05iJRzRBGZw +fR3d5rhJGUnpoSApZvJbezuUA5bhiJ8EGAECAAkFAlW5pvwCGwwACgkQBCLxxZf7 +FoeJkQP/RuIUMAG772pfvdYH0L5s97U8On52lu6XyU7BIDnO3GU864DlX8K0KILU +5MF4G+SulNv8Kci6gK2tszgruOFI8NGAkBZrDgDCLcEH1k7o0S/jYKxvkAfHz91O +UjI8Ynl8Yri1XVdG3OP6Bm2x90j/v0D15vgxvYGXAHLBYO84rto= +=+1Yx +-----END PGP PRIVATE KEY BLOCK----- diff --git a/tests/test_zeyple.py b/tests/test_zeyple.py index a436494..d97f8c1 100644 --- a/tests/test_zeyple.py +++ b/tests/test_zeyple.py @@ -1,74 +1,108 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +# BBB: Python 2.7 support +from __future__ import unicode_literals + import unittest from mock import Mock import os +import subprocess import shutil import six +from six.moves.configparser import ConfigParser +import tempfile from textwrap import dedent from zeyple import zeyple -LINUS_ID = '79BE3E4300411886' - -def is_encrypted(string): - return string.startswith(six.b('-----BEGIN PGP MESSAGE-----')) +KEYS_FNAME = os.path.join(os.path.dirname(__file__), 'keys.gpg') +TEST1_ID = 'D6513C04E24C1F83' +TEST1_EMAIL = 'test1@zeyple.example.com' +TEST2_ID = '0422F1C597FB1687' +TEST2_EMAIL = 'test2@zeyple.example.com' class ZeypleTest(unittest.TestCase): def setUp(self): - shutil.copyfile('tests/zeyple.conf', 'zeyple.conf') - os.system("gpg --recv-keys %s 2> /dev/null" % LINUS_ID) - self.zeyple = zeyple.Zeyple() - self.zeyple._send_message = Mock() # don't try to send emails + self.tmpdir = tempfile.mkdtemp() - def tearDown(self): - os.remove('zeyple.conf') + self.conffile = os.path.join(self.tmpdir, 'zeyple.conf') + self.homedir = os.path.join(self.tmpdir, 'gpg') + self.logfile = os.path.join(self.tmpdir, 'zeyple.log') + + config = ConfigParser() + + config.add_section('zeyple') + config.set('zeyple', 'log_file', self.logfile) + config.set('zeyple', 'add_header', 'true') + + config.add_section('gpg') + config.set('gpg', 'home', self.homedir) + + config.add_section('relay') + config.set('relay', 'host', 'example.net') + config.set('relay', 'port', '2525') - def test_config(self): - """Parses the configuration file properly""" + with open(self.conffile, 'w') as fp: + config.write(fp) - log_file = self.zeyple._config.get('zeyple', 'log_file') - assert log_file == '/tmp/zeyple.log' + os.mkdir(self.homedir, 0o700) + subprocess.check_call( + ['gpg', '--homedir', self.homedir, '--import', KEYS_FNAME], + stderr=open('/dev/null'), + ) + + self.zeyple = zeyple.Zeyple(self.conffile) + self.zeyple._send_message = Mock() # don't try to send emails + + def tearDown(self): + shutil.rmtree(self.tmpdir) + + def decrypt(self, data): + gpg = subprocess.Popen( + ['gpg', '--homedir', self.homedir, '--decrypt'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + return gpg.communicate(data)[0] def test_user_key(self): """Returns the right ID for the given email address""" assert self.zeyple._user_key('non_existant@example.org') is None - user_key = self.zeyple._user_key('torvalds@linux-foundation.org') - assert user_key == LINUS_ID + user_key = self.zeyple._user_key(TEST1_EMAIL) + assert user_key == TEST1_ID def test_encrypt_with_plain_text(self): """Encrypts plain text""" + content = 'The key is under the carpet.'.encode('ascii') + encrypted = self.zeyple.encrypt(content, [TEST1_ID]) + assert self.decrypt(encrypted) == content - encrypted = self.zeyple._encrypt( - 'The key is under the carpet.', [LINUS_ID] - ) - assert is_encrypted(encrypted) - - def test_encrypt_with_unicode(self): - """Encrypts Unicode text""" - - encrypted = self.zeyple._encrypt('héhé', [LINUS_ID]) - assert is_encrypted(encrypted) + def test_encrypt_binary_data(self): + """Encrypt binary data. (Simulate encrypting non ascii characters""" + content = b'\xff\x80' + encrypted = self.zeyple.encrypt(content, [TEST1_ID]) + assert self.decrypt(encrypted) == content def test_process_message_with_simple_message(self): """Encrypts simple messages""" + content = "test" emails = self.zeyple.process_message(dedent("""\ Received: by example.org (Postfix, from userid 0) id DD3B67981178; Thu, 6 Sep 2012 23:35:37 +0000 (UTC) - To: torvalds@linux-foundation.org - Subject: Hello with Unicode héüøœ©ßð®å¥¹²æ¿áßö«ç + To: """ + TEST1_EMAIL + """ + Subject: Hello Message-Id: <20120906233537.DD3B67981178@example.org> Date: Thu, 6 Sep 2012 23:35:37 +0000 (UTC) From: root@example.org (root) - test ðßïð - """), ["torvalds@linux-foundation.org"]) + """ + content).encode('ascii'), [TEST1_EMAIL]) assert emails[0]['X-Zeyple'] is not None - assert is_encrypted(emails[0].get_payload().encode('utf-8')) + payload = emails[0].get_payload().encode('ascii') + assert self.decrypt(payload) == content.encode('ascii') def test_process_message_with_multipart_message(self): """Ignores multipart messages""" @@ -78,7 +112,7 @@ def test_process_message_with_multipart_message(self): Received: by example.org (Postfix, from userid 0) id CE9876C78258; Sat, 8 Sep 2012 13:00:18 +0000 (UTC) Date: Sat, 08 Sep 2012 13:00:18 +0000 - To: torvalds@linux-foundation.org + To: """ + TEST1_EMAIL + """ Subject: test User-Agent: Heirloom mailx 12.4 7/29/08 MIME-Version: 1.0 @@ -106,10 +140,31 @@ def test_process_message_with_multipart_message(self): Yy90ZXN0JyB3d3ctZGF0YQo= --=_504b4162.Gyt30puFsMOHWjpCATT1XRbWoYI1iR/sT4UX78zEEMJbxu+h-- - """), ["torvalds@linux-foundation.org"]) + """).encode('ascii'), [TEST1_EMAIL]) assert emails[0]['X-Zeyple'] is not None - assert emails[0].is_multipart() - for part in emails[0].walk(): - assert not is_encrypted(part.as_string().encode('utf-8')) + assert not emails[0].is_multipart() # GPG encrypt the multipart + assert self.decrypt(emails[0].get_payload().encode('ascii')) + + def test_process_message_with_multiple_recipients(self): + """Encrypt a message with multiple recipients""" + + content = "Content" + + emails = self.zeyple.process_message(dedent("""\ + Received: by example.org (Postfix, from userid 0) + id DD3B67981178; Thu, 6 Sep 2012 23:35:37 +0000 (UTC) + To: """ + ', '.join([TEST1_EMAIL, TEST2_EMAIL]) + """ + Subject: Hello + Message-Id: <20120906233537.DD3B67981178@example.org> + Date: Thu, 6 Sep 2012 23:35:37 +0000 (UTC) + From: root@example.org (root) + + """ + content).encode('ascii'), [TEST1_EMAIL, TEST2_EMAIL]) + + assert len(emails) == 2 # It had two recipients + for m in emails: + assert m['X-Zeyple'] is not None + payload = self.decrypt(m.get_payload().encode('ascii')) + assert payload == six.b(content) diff --git a/tests/zeyple.conf b/tests/zeyple.conf deleted file mode 100644 index 59ea273..0000000 --- a/tests/zeyple.conf +++ /dev/null @@ -1,11 +0,0 @@ -[zeyple] -log_file = /tmp/zeyple.log -add_header = true - -[gpg] -home = ~/.gnupg - -[relay] -host = localhost -port = 10026 - diff --git a/zeyple/zeyple.py b/zeyple/zeyple.py index 9dd4cdf..e304dd6 100644 --- a/zeyple/zeyple.py +++ b/zeyple/zeyple.py @@ -5,14 +5,40 @@ import os import logging import email +import email.mime.multipart +import email.mime.application +import email.encoders import smtplib import gpgme +import copy from io import BytesIO try: from configparser import SafeConfigParser # Python 3 except ImportError: from ConfigParser import SafeConfigParser # Python 2 +# Boiler plate to avoid dependency from six +# BBB: Python 2.7 support +PY3K = sys.version_info > (3, 0) +binary_string = bytes if PY3K else str +if PY3K: + message_from_binary = email.message_from_bytes +else: + message_from_binary = email.message_from_string + + +def as_binary_string(email): + if PY3K: + return email.as_bytes() + else: + return email.as_string() + + +def get_binary_payload(email): + payload = email.get_payload() + if not isinstance(payload, binary_string): + payload = payload.encode() + return payload __title__ = 'Zeyple' __version__ = '1.0.0' @@ -24,24 +50,40 @@ class Zeyple: """Zeyple Encrypts Your Precious Log Emails""" - def __init__(self): - self._load_configuration() + def __init__(self, config_fname='zeyple.conf'): + self.config = self.load_configuration(config_fname) - log_file = self._config.get('zeyple', 'log_file') + log_file = self.config.get('zeyple', 'log_file') logging.basicConfig( filename=log_file, level=logging.DEBUG, format='%(asctime)s %(process)s %(levelname)s %(message)s' ) logging.info("Zeyple ready to encrypt outgoing emails") - # tells gpgme.Context() where are the keys - os.environ['GNUPGHOME'] = self._config.get('gpg', 'home') + @property + def gpg(self): + protocol = gpgme.PROTOCOL_OpenPGP + + if self.config.has_option('gpg', 'executable'): + executable = self.config.get('gpg', 'executable') + else: + executable = None # Default value - def process_message(self, message, recipients): + home_dir = self.config.get('gpg', 'home') + + ctx = gpgme.Context() + ctx.set_engine_info(protocol, executable, home_dir) + ctx.armor = True + + return ctx + + def process_message(self, message_data, recipients): """Encrypts the message with recipient keys""" + assert isinstance(message_data, binary_string) - message = email.message_from_string(message) - logging.info("Processing outgoing message %s", message['Message-id']) + in_message = message_from_binary(message_data) + logging.info( + "Processing outgoing message %s", in_message['Message-id']) if not recipients: logging.warn("Cannot find any recipients, ignoring") @@ -53,25 +95,74 @@ def process_message(self, message, recipients): key_id = self._user_key(recipient) logging.info("Key ID: %s", key_id) if key_id: - if message.is_multipart(): - logging.warn("Message is multipart, ignoring") + if in_message.is_multipart(): + mime = email.mime.multipart.MIMEMultipart( + 'mixed', + None, # The boundary will be autogenerated + in_message.get_payload(), + ) + payload = as_binary_string(mime) + else: + payload = get_binary_payload(in_message) + + encrypted_payload = self.encrypt(payload, [key_id]) + + version = self.get_version_part() + encrypted = self.get_encrypted_part(encrypted_payload) + + out_message = copy.copy(in_message) + out_message.preamble = "This is an OpenPGP/MIME encrypted " \ + "message (RFC 4880 and 3156)" + + if 'Content-Type' not in out_message: + out_message['Content-Type'] = 'multipart/encrypted' else: - payload = self._encrypt(message.get_payload(), [key_id]) + out_message.replace_header( + 'Content-Type', + 'multipart/encrypted', + ) + out_message.set_param('protocol', 'application/pgp-encrypted') + out_message.set_payload([version, encrypted]) - # replace message body with encrypted payload - message.set_payload(payload) else: logging.warn("No keys found, message will be sent unencrypted") - self._add_zeyple_header(message) - self._send_message(message, recipient) - sent_messages.append(message) + self._add_zeyple_header(out_message) + self._send_message(out_message, recipient) + sent_messages.append(out_message) return sent_messages + def get_version_part(self): + ret = email.mime.application.MIMEApplication( + 'Version: 1\n', + 'pgp-encrypted', + email.encoders.encode_noop, + ) + ret.add_header( + 'Content-Description', + "PGP/MIME version identification", + ) + return ret + + def get_encrypted_part(self, payload): + ret = email.mime.application.MIMEApplication( + payload, + 'octet-stream', + email.encoders.encode_noop, + name="encrypted.asc", + ) + ret.add_header('Content-Description', "OpenPGP encrypted message") + ret.add_header( + 'Content-Disposition', + 'inline', + filename='encrypted.asc', + ) + return ret + def _add_zeyple_header(self, message): - if self._config.has_option('zeyple', 'add_header') and \ - self._config.getboolean('zeyple', 'add_header'): + if self.config.has_option('zeyple', 'add_header') and \ + self.config.getboolean('zeyple', 'add_header'): message.add_header( 'X-Zeyple', "processed by {0} v{1}".format(__title__, __version__) @@ -81,27 +172,30 @@ def _send_message(self, message, recipient): """Sends the given message through the SMTP relay""" logging.info("Sending message %s", message['Message-id']) - smtp = smtplib.SMTP(self._config.get('relay', 'host'), - self._config.get('relay', 'port')) + smtp = smtplib.SMTP(self.config.get('relay', 'host'), + self.config.get('relay', 'port')) smtp.sendmail(message['From'], recipient, message.as_string()) smtp.quit() logging.info("Message %s sent", message['Message-id']) - def _load_configuration(self, filename='zeyple.conf'): + def load_configuration(self, filename): """Reads and parses the config file""" - self._config = SafeConfigParser() - self._config.read(['/etc/' + filename, filename]) - if not self._config.sections(): + config = SafeConfigParser() + config.read([ + os.path.join('/etc/', filename), + filename, + ]) + if not config.sections(): raise IOError('Cannot open config file.') + return config def _user_key(self, email): """Returns the GPG key for the given email address""" logging.info("Trying to encrypt for %s", email) - gpg = gpgme.Context() - keys = [key for key in gpg.keylist(email)] + keys = [key for key in self.gpg.keylist(email)] if keys: key = keys.pop() # NOTE: looks like keys[0] is the master key @@ -110,32 +204,29 @@ def _user_key(self, email): return None - def _encrypt(self, message, key_ids): + def encrypt(self, message, key_ids): """Encrypts the message with the given keys""" + assert isinstance(message, binary_string) - try: - message = message.decode('utf-8', 'replace') - except AttributeError: - pass - - message = message.encode('utf-8') plaintext = BytesIO(message) ciphertext = BytesIO() - gpg = gpgme.Context() - gpg.armor = True + self.gpg.armor = True - recipient = [gpg.get_key(key_id) for key_id in key_ids] + recipient = [self.gpg.get_key(key_id) for key_id in key_ids] - gpg.encrypt(recipient, gpgme.ENCRYPT_ALWAYS_TRUST, - plaintext, ciphertext) + self.gpg.encrypt(recipient, gpgme.ENCRYPT_ALWAYS_TRUST, + plaintext, ciphertext) return ciphertext.getvalue() if __name__ == '__main__': recipients = sys.argv[1:] - message = sys.stdin.read() + + # BBB: Python 2.7 support + binary_stdin = sys.stdin.buffer if PY3K else sys.stdin + message = binary_stdin.read() zeyple = Zeyple() zeyple.process_message(message, recipients)