Source code for debianmemberportfolio.model.keyringanalyzer

# -*- python -*-
# -*- coding: utf-8 -*-
#
# Debian Member Portfolio Service application key ring analyzer tool
#
# Copyright © 2009-2015 Jan Dittberner <jan@dittberner.info>
#
# This file is part of the Debian Member Portfolio Service.
#
# Debian Member Portfolio Service is free software: you can redistribute it
# and/or modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# Debian Member Portfolio Service is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero
# General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
"""
This is a tool that analyzes GPG and PGP keyrings and stores the
retrieved data in a file database. The tool was inspired by Debian
qa's carnivore.
"""

import dbm
import pkg_resources
import glob
import configparser
import os
import os.path
import logging
import subprocess
import sys
import email.utils


CONFIG = configparser.ConfigParser()


def _get_keyrings():
    """
    Gets the available keyring files from the keyring directory
    configured in portfolio.ini.
    """
    keyringdir = os.path.expanduser(CONFIG.get('DEFAULT', 'keyring.dir'))
    logging.debug("keyring dir is %s", keyringdir)
    keyrings = glob.glob(os.path.join(keyringdir, '*.gpg'))
    keyrings.extend(glob.glob(os.path.join(keyringdir, '*.pgp')))
    keyrings.sort()
    return keyrings


def _parse_uid(uid):
    """
    Parse a uid of the form 'Real Name <email@example.com>' into email
    and realname parts.
    """

    # First try with the Python library, but it doesn't always catch everything
    (name, mail) = email.utils.parseaddr(uid)
    if (not name) and (not mail):
        logging.warning("malformed uid %s", uid)
    if (not name) or (not mail):
        logging.debug("strange uid %s: '%s' - <%s>", uid, name, mail)
        # Try and do better than the python library
        if not '@' in mail:
            uid = uid.strip()
            # First, strip comment
            s = uid.find('(')
            e = uid.find(')')
            if s >= 0 and e >= 0:
                uid = uid[:s] + uid[e + 1:]
            s = uid.find('<')
            e = uid.find('>')
            mail = None
            if s >= 0 and e >= 0:
                mail = uid[s + 1:e]
                uid = uid[:s] + uid[e + 1:]
            uid = uid.strip()
            if not mail and uid.find('@') >= 0:
                mail, uid = uid, mail

            name = uid
            logging.debug("corrected: '%s' - <%s>", name, mail)
    return (name, mail)

resultdict = {}


def _get_canonical(key):
    if not key in resultdict:
        resultdict[key] = []
    return key


def _add_to_result(key, newvalue):
    logging.debug("adding %s: %s", key, newvalue)
    thekey = _get_canonical(key)
    if newvalue not in resultdict[thekey]:
        resultdict[thekey].append(newvalue)


def _handle_mail(mail, fpr):
    if mail.endswith('@debian.org'):
        login = mail[0:-len('@debian.org')]
        _add_to_result('login:email:%s' % mail, login)
        _add_to_result('login:fpr:%s' % fpr, login)
        _add_to_result('fpr:login:%s' % login, fpr)
    _add_to_result('fpr:email:%s' % mail, fpr)
    _add_to_result('email:fpr:%s' % fpr, mail)


def _handle_uid(uid, fpr):
    # Do stuff with 'uid'
    if uid:
        (uid, mail) = _parse_uid(uid)
        if mail:
            _handle_mail(mail, fpr)
    if uid:
        _add_to_result('name:fpr:%s' % fpr, uid)
        if mail:
            _add_to_result('name:email:%s' % mail, uid)
    return fpr


[docs]def process_gpg_list_keys_line(line, fpr): """ Process a line of gpg --list-keys --with-colon output. """ items = line.split(':') if items[0] == 'pub': return None if items[0] == 'fpr': return items[9].strip() if items[0] == 'uid': if items[1] == 'r': return fpr return _handle_uid(items[9].strip(), fpr) else: return fpr
[docs]def process_keyrings(): """Process the keyrings and store the extracted data in an anydbm file.""" for keyring in _get_keyrings(): logging.debug("get data from %s", keyring) proc = subprocess.Popen([ "gpg", "--no-options", "--no-default-keyring", "--homedir", os.path.expanduser( CONFIG.get('DEFAULT', 'gnupghome')), "--no-expensive-trust-checks", "--keyring", keyring, "--list-keys", "--with-colons", "--fixed-list-mode", "--with-fingerprint", "--with-fingerprint"], stdout=subprocess.PIPE) fpr = None for line in proc.stdout.readlines(): try: line = line.decode('utf8') except UnicodeDecodeError: line = line.decode('iso8859-1') fpr = process_gpg_list_keys_line(line, fpr) retcode = proc.wait() if retcode != 0: logging.error("subprocess ended with return code %d", retcode) db = dbm.open(pkg_resources.resource_filename(__name__, 'keyringcache'), 'c') for key in resultdict: db[key] = ":".join(resultdict[key]) db.close()
if __name__ == '__main__': logging.basicConfig(stream=sys.stderr, level=logging.WARNING) CONFIG.read_string(pkg_resources.resource_string( __name__, 'portfolio.ini').decode('utf8')) gpghome = os.path.expanduser(CONFIG.get('DEFAULT', 'gnupghome')) if not os.path.isdir(gpghome): os.makedirs(gpghome, 0o700) process_keyrings()