From e5f5a2ab23ace19c6de39512f76f8fed5f5ad912 Mon Sep 17 00:00:00 2001 From: Guido Guenther Date: Thu, 23 Aug 2007 15:47:20 +0200 Subject: Imported upstream version 0.6.21 --- mini-dinstall | 1483 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1483 insertions(+) create mode 100755 mini-dinstall (limited to 'mini-dinstall') diff --git a/mini-dinstall b/mini-dinstall new file mode 100755 index 0000000..e8386a2 --- /dev/null +++ b/mini-dinstall @@ -0,0 +1,1483 @@ +#!/usr/bin/python +# -*- mode: python; coding: utf-8 -*- +# Miniature version of "dinstall", for installing .changes into an +# archive +# Copyright © 2002,2003 Colin Walters + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import os, sys, re, glob, getopt, time, traceback, gzip, getpass, socket +import signal, threading, select, Queue, SocketServer +import logging, logging.handlers +#logging.basicConfig() +import apt_pkg +apt_pkg.init() +from ConfigParser import * + +from minidinstall.ChangeFile import * +from minidinstall.Dnotify import * +from minidinstall.DebianSigVerifier import * +from minidinstall.GPGSigVerifier import * +from minidinstall.version import * +from minidinstall import misc + +debchanges_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.changes$') +debpackage_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.u?deb$') +debsrc_dsc_re = re.compile('([-a-z0-9+.]+)_(.+?)\.dsc$') +debsrc_diff_re = re.compile('([-a-z0-9+.]+)_(.+?)\.diff\.gz$') +debsrc_orig_re = re.compile('([-a-z0-9+.]+)_(.+?)\.orig\.tar\.gz$') +debsrc_native_re = re.compile('([-a-z0-9+.]+)_(.+?)\.tar\.gz$') + +native_version_re = re.compile('\s*.*-'); + +toplevel_directory = None +tmp_new_suffix = '.dinstall-new' +tmp_old_suffix = '.dinstall-old' +dinstall_subdir = 'mini-dinstall' +incoming_subdir = 'incoming' +socket_name = 'master' +logfile_name = 'mini-dinstall.log' +configfile_names = ['/etc/mini-dinstall.conf', '~/.mini-dinstall.conf'] +use_dnotify = 0 +mail_on_success = 1 +default_poll_time = 30 +default_max_retry_time = 2 * 24 * 60 * 60 +default_mail_log_level = logging.ERROR +trigger_reindex = 1 +mail_log_flush_level = logging.ERROR +mail_log_flush_count = 10 +mail_to = getpass.getuser() +mail_server = 'localhost' +incoming_permissions = 0750 + +default_architectures = ["all", "i386"] +default_distributions = ("unstable",) + +distributions = {} +scantime = 60 + +def usage(ecode, ver_only=None): + print "mini-dinstall", pkg_version + if ver_only: + sys.exit(ecode) + print "Copyright (C) 2002 Colin Walters " + print "Licensed under the GNU GPL." + print "Usage: mini-dinstall [OPTIONS...] [DIRECTORY]" + print "Options:" + print " -v, --verbose\t\tDisplay extra information" + print " -q, --quiet\t\tDisplay less information" + print " -c, --config=FILE\tParse configuration info from FILE" + print " -d, --debug\t\tOutput information to stdout as well as log" + print " --no-log\t\tDon't write information to log file" + print " -n, --no-act\t\tDon't actually perform changes" + print " -b, --batch\t\tDon't daemonize; run once, then exit" + print " -r, --run\t\tProcess queue immediately" + print " -k, --kill\t\tKill the running mini-dinstall" + print " --help\t\tWhat you're looking at" + print " --version\t\tPrint the software version and exit" + sys.exit(ecode) + +try: + opts, args = getopt.getopt(sys.argv[1:], 'vqc:dnbrk', + ['verbose', 'quiet', 'config=', 'debug', 'no-log', + 'no-act', 'batch', 'run', 'kill', 'help', 'version', ]) +except getopt.GetoptError, e: + sys.stderr.write("Error reading arguments: %s\n" % e) + usage(1) +for (key, val) in opts: + if key == '--help': + usage(0) + elif key == '--version': + usage(0, ver_only=1) +if len(args) > 1: + sys.stderr.write("Unknown arguments: %s\n" % args[1:]) + usage(1) + +# don't propagate exceptions that happen while logging +logging.raiseExceptions = 0 + +logger = logging.getLogger("mini-dinstall") + +loglevel = logging.WARN +no_act = 0 +debug_mode = 0 +run_mode = 0 +kill_mode = 0 +no_log = 0 +batch_mode = 0 +custom_config_files = 0 +for key, val in opts: + if key in ('-v', '--verbose'): + if loglevel == logging.INFO: + loglevel = logging.DEBUG + elif loglevel == logging.WARN: + loglevel = logging.INFO + elif key in ('-q', '--quiet'): + if loglevel == logging.WARN: + loglevel = logging.ERROR + elif loglevel == logging.WARN: + loglevel = logging.CRITICAL + elif key in ('-c', '--config'): + if not custom_config_files: + custom_config_files = 1 + configfile_names = [] + configfile_names.append(os.path.abspath(os.path.expanduser(val))) + elif key in ('-n', '--no-act'): + no_act = 1 + elif key in ('-d', '--debug'): + debug_mode = 1 + elif key in ('--no-log',): + no_log = 1 + elif key in ('-b', '--batch'): + batch_mode = 1 + elif key in ('-r', '--run'): + run_mode = 1 + elif key in ('-k', '--kill'): + kill_mode = 1 + +def do_mkdir(name): + if os.access(name, os.X_OK): + return + try: + logger.info('Creating directory "%s"' % (name)) + except: + pass + if not no_act: + os.mkdir(name) + +def do_rename(source, target): + try: + logger.debug('Renaming "%s" to "%s"' % (source, target)) + except: + pass + if not no_act: + os.rename(source, target) + +def do_chmod(name, mode): + try: + logger.info('Changing mode of "%s" to %o' % (name, mode)) + except: + pass + if not no_act: + os.chmod(name, mode) + +logger.setLevel(logging.DEBUG) +stderr_handler = logging.StreamHandler(strm=sys.stderr) +stderr_handler.setLevel(loglevel) +logger.addHandler(stderr_handler) +stderr_handler.setLevel(loglevel) +stderr_handler.setFormatter(logging.Formatter(fmt="%(name)s [%(thread)d] %(levelname)s: %(message)s")) + +configp = ConfigParser() +configfile_names = map(lambda x: os.path.abspath(os.path.expanduser(x)), configfile_names) +logger.debug("Reading config files: %s" % (configfile_names,)) +configp.read(configfile_names) + +class SubjectSpecifyingLoggingSMTPHandler(logging.handlers.SMTPHandler): + def __init__(self, subject, *args, **kwargs): + self._subject = subject + apply(logging.handlers.SMTPHandler.__init__, [self] + list(args) + ['dummy'], kwargs) + + def getSubject(self, record): + return re.sub('%l', record.levelname, self._subject) + +if not (configp.has_option('DEFAULT', 'mail_log_level') and configp.get('DEFAULT', 'mail_log_level') == 'NONE'): + if configp.has_option('DEFAULT', 'mail_log_level'): + mail_log_level = logging.__dict__[configp.get('DEFAULT', 'mail_log_level')] + else: + mail_log_level = default_mail_log_level + if configp.has_option('DEFAULT', 'mail_to'): + mail_to = configp.get('DEFAULT', 'mail_to') + if configp.has_option('DEFAULT', 'mail_server'): + mail_server = configp.get('DEFAULT', 'mail_server') + if configp.has_option('DEFAULT', 'mail_log_flush_count'): + mail_log_flush_count = configp.getint('DEFAULT', 'mail_log_flush_count') + if configp.has_option('DEFAULT', 'mail_log_flush_level'): + mail_log_flush_level = logging.__dict__[configp.get('DEFAULT', 'mail_log_flush_level')] + mail_smtp_handler = SubjectSpecifyingLoggingSMTPHandler('mini-dinstall log notice (%l)', mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(),socket.gethostname()), [mail_to]) + mail_handler = logging.handlers.MemoryHandler(mail_log_flush_count, flushLevel=mail_log_flush_level, target=mail_smtp_handler) + + mail_handler.setLevel(mail_log_level) + logger.addHandler(mail_handler) + +if configp.has_option('DEFAULT', 'archivedir'): + toplevel_directory = os.path.expanduser(configp.get('DEFAULT', 'archivedir')) +elif len(args) > 0: + toplevel_directory = args[0] +else: + logger.error("No archivedir specified on command line or in config files.") + sys.exit(1) + +if configp.has_option('DEFAULT', 'incoming_permissions'): + incoming_permissions = int(configp.get('DEFAULT', 'incoming_permissions'), 8) + +do_mkdir(toplevel_directory) +dinstall_subdir = os.path.join(toplevel_directory, dinstall_subdir) +do_mkdir(dinstall_subdir) + +lockfilename = os.path.join(dinstall_subdir, 'mini-dinstall.lock') + +def process_exists(pid): + try: + os.kill(pid, 0) + except OSError, e: + return 0 + return 1 + +if os.access(lockfilename, os.R_OK): + pid = int(open(lockfilename).read()) + if not process_exists(pid): + if run_mode: + logger.error("No process running at %d; use mini-dinstall -k to remove lockfile") + sys.exit(1) + logger.warn("No process running at %d, removing lockfile" % (pid,)) + os.unlink(lockfilename) + if kill_mode: + sys.exit(0) + +if not os.path.isabs(socket_name): + socket_name = os.path.join(dinstall_subdir, socket_name) + +if run_mode or kill_mode: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + logger.debug('Connecting...') + sock.connect(socket_name) + if run_mode: + logger.debug('Sending RUN command') + sock.send('RUN\n') + else: + logger.debug('Sending DIE command') + sock.send('DIE\n') + logger.debug('Reading response') + response = sock.recv(8192) + print response + sys.exit(0) + +if configp.has_option('DEFAULT', 'logfile'): + logfile_name = configp.get('DEFAULT', 'logfile') + +if not no_log: + if not os.path.isabs(logfile_name): + logfile_name = os.path.join(dinstall_subdir, logfile_name) + logger.debug("Adding log file: %s" % (logfile_name,)) + filehandler = logging.FileHandler(logfile_name) + if loglevel == logging.WARN: + filehandler.setLevel(logging.INFO) + else: + filehandler.setLevel(logging.DEBUG) + logger.addHandler(filehandler) + filehandler.setFormatter(logging.Formatter(fmt="%(asctime)s %(name)s [%(thread)d] %(levelname)s: %(message)s", datefmt="%b %d %H:%M:%S")) + +logger.info('Booting mini-dinstall ' + pkg_version) + +class DinstallException(Exception): + def __init__(self, value): + self._value = value + def __str__(self): + return `self._value` + +if not configp.has_option('DEFAULT', 'archive_style'): + logger.critical("You must set the default archive_style option (since version 0.4.0)") + logging.shutdown() + sys.exit(1) + +default_verify_sigs = os.access('/usr/share/keyrings/debian-keyring.gpg', os.R_OK) +default_extra_keyrings = [] +default_keyrings = None + +if configp.has_option('DEFAULT', 'architectures'): + default_architectures = string.split(configp.get('DEFAULT', 'architectures'), ', ') +if configp.has_option('DEFAULT', 'verify_sigs'): + default_verify_sigs = configp.getboolean('DEFAULT', 'verify_sigs') +if configp.has_option('DEFAULT', 'trigger_reindex'): + default_trigger_reindex = configp.getboolean('DEFAULT', 'trigger_reindex') +if configp.has_option('DEFAULT', 'poll_time'): + default_poll_time = configp.getint('DEFAULT', 'poll_time') +if configp.has_option('DEFAULT', 'max_retry_time'): + default_max_retry_time = configp.getint('DEFAULT', 'max_retry_time') +if configp.has_option('DEFAULT', 'extra_keyrings'): + default_extra_keyrings = re.split(', ?', configp.get('DEFAULT', 'extra_keyrings')) +if configp.has_option('DEFAULT', 'keyrings'): + default_keyrings = re.split(', ?', configp.get('DEFAULT', 'keyrings')) +if configp.has_option('DEFAULT', 'use_dnotify'): + use_dnotify = configp.getboolean('DEFAULT', 'use_dnotify') + +sects = configp.sections() +if not len(sects) == 0: + for sect in sects: + distributions[sect] = {} + if configp.has_option(sect, "architectures"): + distributions[sect]["arches"] = string.split(configp.get(sect, "architectures"), ', ') + else: + distributions[sect]["arches"] = default_architectures +else: + for dist in default_distributions: + distributions[dist] = {"arches": default_architectures} + +class DistOptionHandler: + def __init__(self, distributions, configp): + self._configp = configp + self._distributions = distributions + self._optionmap = {} + self._optionmap['poll_time'] = ['int', default_poll_time] + # two days + self._optionmap['max_retry_time'] = ['int', default_max_retry_time] + self._optionmap['post_install_script'] = ['str', None] + self._optionmap['pre_install_script'] = ['str', None] + self._optionmap['dynamic_reindex'] = ['bool', 1] + self._optionmap['chown_changes_files'] = ['bool', 1] + self._optionmap['keep_old'] = ['bool', None] + self._optionmap['mail_on_success'] = ['bool', 1] + self._optionmap['archive_style'] = ['str', None] + # Release file stuff + self._optionmap['generate_release'] = ['bool', 0] + self._optionmap['release_origin'] = ['str', getpass.getuser()] + self._optionmap['release_label'] = ['str', self._optionmap['release_origin'][1]] + self._optionmap['release_suite'] = ['str', None] + self._optionmap['release_codename'] = ['str', None] + self._optionmap['release_description'] = ['str', None] + self._optionmap['release_signscript'] = ['str', None] + + def get_option_map(self, dist): + ret = self._distributions[dist] + for key in self._optionmap.keys(): + type = self._optionmap[key][0] + ret[key] = self._optionmap[key][1] + if self._configp.has_option ('DEFAULT', key): + ret[key] = self.get_option (type, 'DEFAULT', key) + if self._configp.has_option (dist, key): + ret[key] = self.get_option (type, dist, key) + return ret + + def get_option (self, type, dist, key): + if type == 'int': + return self._configp.getint(dist, key) + elif type == 'str': + return self._configp.get(dist, key) + elif type == 'bool': + return self._configp.getboolean(dist, key) + + assert(None) + + +distoptionhandler = DistOptionHandler(distributions, configp) + +for dist in distributions.keys(): + distributions[dist] = distoptionhandler.get_option_map(dist) + if not distributions[dist]['archive_style'] in ('simple-subdir', 'flat'): + raise DinstallException("Unknown archive style \"%s\"" % (distributions[dist]['archive_style'],)) + +logger.debug("Distributions: %s" % (distributions,)) + +# class DinstallTransaction: +# def __init__(self, dir): +# self._dir = dir + +# def start(self, pkgname): +# self._pkgname = pkgname +# self._transfilename = os.path.join(dir, pkgname + ".transaction") + +# def _log_op(self, type, state, str): +# tmpfile = self._transfilename + ".tmp" +# if (os.access(self._transfilename), os.R_OK): +# shutil.copyFile(self._transfilename, tmpfile) +# f = open(tmpfile, 'w') +# f.write('%s %s' % (type, str) ) +# f.close() + +# def _start_op(self, type, str): +# self._log_op(type, 'start', str) + +# def _stop_op(self, type, str): +# self._log_op(type, 'stop', str) + +# def renameFile(self, source, dst): +# self._start_op('rename', + + +# def _sync(): +# os.system("sync") +os.chdir(toplevel_directory) +do_mkdir(dinstall_subdir) +rejectdir = os.path.join(dinstall_subdir, 'REJECT') +incoming_subdir = os.path.join(dinstall_subdir, incoming_subdir) +do_mkdir(rejectdir) +do_mkdir(incoming_subdir) +do_chmod(incoming_subdir, incoming_permissions) + +## IPC stuff +# Used by all threads to determine whether or not they should exit +die_event = threading.Event() + +# These global variables are used in IncomingDir::daemonize +# I couldn't figure out any way to pass state to a BaseRequestHandler. +reprocess_needed = threading.Event() +reprocess_finished = threading.Event() + +reprocess_lock = threading.Lock() +class IncomingDirRequestHandler(SocketServer.StreamRequestHandler, SocketServer.BaseRequestHandler): + def handle(self): + logger.debug('Got request from %s' % (self.client_address,)) + req = self.rfile.readline() + if req == 'RUN\n': + logger.debug('Doing RUN command') + reprocess_lock.acquire() + reprocess_needed.set() + logger.debug('Waiting on reprocessing') + reprocess_finished.wait() + reprocess_finished.clear() + reprocess_lock.release() + self.wfile.write('200 Reprocessing complete\n') + elif req == 'DIE\n': + logger.debug('Doing DIE command') + self.wfile.write('200 Beginning shutdown\n') + die_event.set() + else: + logger.debug('Got unknown command %s' % (req,)) + self.wfile.write('500 Unknown request\n') + +class ExceptionThrowingThreadedUnixStreamServer(SocketServer.ThreadingUnixStreamServer): + def handle_error(self, request, client_address): + self._logger.exception("Unhandled exception during request processing; shutting down") + die_event.set() + +class IncomingDir(threading.Thread): + def __init__(self, dir, archivemap, logger, trigger_reindex=1, poll_time=30, max_retry_time=172800, batch_mode=0, verify_sigs=0): + threading.Thread.__init__(self, name="incoming") + self._dir = dir + self._archivemap = archivemap + self._logger = logger + self._trigger_reindex = trigger_reindex + self._poll_time = poll_time + self._batch_mode = batch_mode + self._verify_sigs = verify_sigs + self._max_retry_time = max_retry_time + self._last_failed_targets = {} + self._eventqueue = Queue.Queue() + self._done_event = threading.Event() + # ensure we always have some reprocess queue + self._reprocess_queue = {} + + def run(self): + self._logger.info('Created new installer thread (%s)' % (self.getName(),)) + self._logger.info('Entering batch mode...') + initial_reprocess_queue = [] + initial_fucked_list = [] + try: + for (changefilename, changefile) in self._get_changefiles(): + if self._changefile_ready(changefilename, changefile): + try: + self._install_changefile(changefilename, changefile, 0) + except Exception: + logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,)) + initial_fucked_list.append(changefilename) + else: + self._logger.warn('Skipping "%s"; upload incomplete' % (changefilename,)) + initial_reprocess_queue.append(changefilename) + if not self._batch_mode: + self._daemonize(initial_reprocess_queue, initial_fucked_list) + self._done_event.set() + self._logger.info('All packages in incoming dir installed; exiting') + except Exception, e: + self._logger.exception("Unhandled exception; shutting down") + die_event.set() + self._done_event.set() + return 0 + + def _abspath(self, *args): + return os.path.abspath(apply(os.path.join, [self._dir] + list(args))) + + def _get_changefiles(self): + ret = [] + globpath = self._abspath("*.changes") + self._logger.debug("glob: " + globpath) + changefilenames = glob.glob(globpath) + for changefilename in changefilenames: + if not self._reprocess_queue.has_key(changefilename): + self._logger.info('Examining "%s"' % (changefilename,)) + changefile = ChangeFile() + try: + changefile.load_from_file(changefilename) + except ChangeFileException: + self._logger.debug("Unable to parse \"%s\", skipping" % (changefilename,)) + continue + ret.append((changefilename, changefile)) + else: + self._logger.debug('Skipping "%s" during new scan because it is in the reprocess queue.' % (changefilename,)) + return ret + + def _changefile_ready(self, changefilename, changefile): + try: + dist = changefile['distribution'] + except KeyError, e: + self._logger.warn("Unable to read distribution field for \"%s\"; data: %s" % (changefilename, changefile,)) + return 0 + try: + changefile.verify(self._abspath('')) + except ChangeFileException: + return 0 + return 1 + + def _install_changefile(self, changefilename, changefile, doing_reprocess): + dist = changefile['distribution'] + if not dist in self._archivemap.keys(): + raise DinstallException('Unknown distribution "%s" in \"%s\"' % (dist, changefilename,)) + logger.debug('Installing %s in archive %s' % (changefilename, self._archivemap[dist][1].getName())) + self._archivemap[dist][0].install(changefilename, changefile, self._verify_sigs) + if self._trigger_reindex: + if doing_reprocess: + logger.debug('Waiting on archive %s to reprocess' % (self._archivemap[dist][1].getName())) + self._archivemap[dist][1].wait_reprocess() + else: + logger.debug('Notifying archive %s of change' % (self._archivemap[dist][1].getName())) + self._archivemap[dist][1].notify() + logger.debug('Finished processing %s' % (changefilename)) + + def _reject_changefile(self, changefilename, changefile, e): + dist = changefile['distribution'] + if not dist in self._archivemap: + raise DinstallException('Unknown distribution "%s" in \"%s\"' % (dist, changefilename,)) + self._archivemap[dist][0].reject(changefilename, changefile, e) + + def _daemon_server_isready(self): + (inready, outready, exready) = select.select([self._server.fileno()], [], [], 0) + return len(inready) > 0 + + def _daemon_event_ispending(self): + return die_event.isSet() or reprocess_needed.isSet() or self._daemon_server_isready() or (not self._eventqueue.empty()) + + def _daemon_reprocess_pending(self): + curtime = time.time() + for changefilename in self._reprocess_queue.keys(): + (starttime, nexttime, delay) = self._reprocess_queue[changefilename] + if curtime >= nexttime: + return 1 + return 0 + + def _daemonize(self, init_reprocess_queue, init_fucked_list): + self._logger.info('Entering daemon mode...') + self._dnotify = DirectoryNotifierFactory().create([self._dir], use_dnotify=use_dnotify, poll_time=self._poll_time, cancel_event=die_event) + self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name="Incoming watcher") + self._async_dnotify.start() + try: + os.unlink(socket_name) + except OSError, e: + pass + self._server = ExceptionThrowingThreadedUnixStreamServer(socket_name, IncomingDirRequestHandler) + self._server.allow_reuse_address = 1 + retry_time = 30 + self._reprocess_queue = {} + fucked = init_fucked_list + doing_reprocess = 0 + # Initialize the reprocessing queue + for changefilename in init_reprocess_queue: + curtime = time.time() + self._reprocess_queue[changefilename] = [curtime, curtime, retry_time] + + # The main daemon loop + while 1: + # Wait until we have something to do + while not (self._daemon_event_ispending() or self._daemon_reprocess_pending()): + time.sleep(0.5) + + self._logger.debug('Checking for pending server requests') + if self._daemon_server_isready(): + self._logger.debug('Handling one request') + self._server.handle_request() + + self._logger.debug('Checking for DIE event') + if die_event.isSet(): + self._logger.debug('DIE event caught') + break + + self._logger.debug('Scanning for changes') + # do we have anything to reprocess? + for changefilename in self._reprocess_queue.keys(): + (starttime, nexttime, delay) = self._reprocess_queue[changefilename] + curtime = time.time() + try: + changefile = ChangeFile() + changefile.load_from_file(changefilename) + except (ChangeFileException,IOError), e: + if not os.path.exists(changefilename): + self._logger.info('Changefile "%s" got removed' % (changefilename,)) + else: + self._logger.exception("Unable to load change file \"%s\"" % (changefilename,)) + self._logger.warn("Marking \"%s\" as screwed" % (changefilename,)) + fucked.append(changefilename) + del self._reprocess_queue[changefilename] + continue + if (curtime - starttime) > self._max_retry_time: + # We've tried too many times; reject it. + self._reject_changefile(changefilename, changefile, DinstallException("Couldn't install \"%s\" in %d seconds" % (changefilename, self._max_retry_time))) + elif curtime >= nexttime: + if self._changefile_ready(changefilename, changefile): + # Let's do it! + self._logger.debug('Preparing to install "%s"' % (changefilename,)) + try: + self._install_changefile(changefilename, changefile, doing_reprocess) + self._logger.debug('Removing "%s" from incoming queue after successful install.' % (changefilename,)) + del self._reprocess_queue[changefilename] + except Exception, e: + logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,)) + fucked.append(changefilename) + else: + delay *= 2 + if delay > 60 * 60: + delay = 60 * 60 + self._logger.info('Upload "%s" isn\'t complete; marking for retry in %d seconds' % (changefilename, delay)) + self._reprocess_queue[changefilename][1:3] = [time.time() + delay, delay] + # done reprocessing; now scan for changed dirs. + relname = None + self._logger.debug('Checking dnotify event queue') + if not self._eventqueue.empty(): + relname = os.path.basename(os.path.abspath(self._eventqueue.get())) + self._logger.debug('Got %s from dnotify' % (relname,)) + if relname is None: + if (not doing_reprocess) and reprocess_needed.isSet(): + self._logger.info('Got reprocessing event') + reprocess_needed.clear() + doing_reprocess = 1 + if relname is None and (not doing_reprocess): + self._logger.debug('No events to process') + continue + + for (changefilename, changefile) in self._get_changefiles(): + if changefilename in fucked: + self._logger.warn("Skipping screwed changefile \"%s\"" % (changefilename,)) + continue + # Have we tried this changefile before? + if not self._reprocess_queue.has_key(changefilename): + self._logger.debug('New change file "%s"' % (changefilename,)) + if self._changefile_ready(changefilename, changefile): + try: + self._install_changefile(changefilename, changefile, doing_reprocess) + except Exception, e: + logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,)) + fucked.append(changefilename) + else: + curtime = time.time() + self._logger.info('Upload "%s" isn\'t complete; marking for retry in %d seconds' % (changefilename, retry_time)) + self._reprocess_queue[changefilename] = [curtime, curtime + retry_time, retry_time] + if doing_reprocess: + doing_reprocess = 0 + self._logger.info('Reprocessing complete') + reprocess_finished.set() + + def wait(self): + self._done_event.wait() + +def parse_versions(fullversion): + debianversion = re.sub('^[0-9]+:', '', fullversion) + upstreamver = re.sub('-[^-]*$', '', debianversion) + + return (upstreamver, debianversion) + +class ArchiveDir: + def __init__(self, dir, logger, configdict, batch_mode=0, keyrings=None, extra_keyrings=None): + self._dir = dir + self._name = os.path.basename(os.path.abspath(dir)) + self._logger = logger + for key in configdict.keys(): + self._logger.debug("Setting \"%s\" => \"%s\" in archive \"%s\"" % ('_'+key, configdict[key], self._name)) + self.__dict__['_' + key] = configdict[key] + do_mkdir(dir) + self._batch_mode = batch_mode + self._keyrings = keyrings + if not extra_keyrings is None : + self._extra_keyrings = extra_keyrings + else: + self._extra_keyrings = [] + if self._mail_on_success: + self._success_logger = logging.Logger("mini-dinstall." + self._name) + self._success_logger.setLevel(logging.DEBUG) + handler = SubjectSpecifyingLoggingSMTPHandler('mini-dinstall success notice', mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(),socket.gethostname()), [mail_to]) + handler.setLevel(logging.DEBUG) + self._success_logger.addHandler(handler) + self._clean_targets = [] + +# self._filerefmap = {} +# self._changefiles = [] + + def _abspath(self, *args): + return os.path.abspath(apply(os.path.join, [self._dir] + list(args))) + + def _relpath(self, *args): + return apply(os.path.join, [self._name] + list(args)) + + def install(self, changefilename, changefile, verify_sigs): + retval = 0 + try: + retval = self._install_run_scripts(changefilename, changefile, verify_sigs) + except Exception: + self._logger.exception("Unhandled exception during installation") + if not retval: + self._logger.info('Failed to install "%s"' % (changefilename,)) + + def reject(self, changefilename, changefile, reason): + self._reject_changefile(changefilename, changefile, reason) + + def _install_run_scripts(self, changefilename, changefile, verify_sigs): + self._logger.info('Preparing to install \"%s\" in archive %s' % (changefilename, self._name,)) + sourcename = changefile['source'] + version = changefile['version'] + if verify_sigs: + self._logger.info('Verifying signature on "%s"' % (changefilename,)) + try: + if self._keyrings: + verifier = DebianSigVerifier(keyrings=map(os.path.expanduser, self._keyrings), extra_keyrings=self._extra_keyrings) + else: + verifier = DebianSigVerifier(extra_keyrings=self._extra_keyrings) + output = verifier.verify(changefilename) + logger.debug(output) + logger.info('Good signature on "%s"' % (changefilename,)) + except GPGSigVerificationFailure, e: + msg = "Failed to verify signature on \"%s\": %s\n" % (changefilename, e) + msg += string.join(e.getOutput(), '') + logger.error(msg) + self._reject_changefile(changefilename, changefile, e) + return 0 + else: + self._logger.debug('Skipping signature verification on "%s"' % (changefilename,)) + if self._pre_install_script: + try: + self._logger.debug("Running pre-installation script: " + self._pre_install_script) + if self._run_script(os.path.abspath(changefilename), self._pre_install_script): + return 0 + except: + self._logger.exception("failure while running pre-installation script") + return 0 + try: + self._install_changefile_internal(changefilename, changefile) + except Exception, e: + self._logger.exception('Failed to process "%s"' % (changefilename,)) + self._reject_changefile(changefilename, changefile, e) + return 0 + if self._chown_changes_files: + do_chmod(changefilename, 0600) + target = os.path.join(self._dir, os.path.basename(changefilename)) + # the final step + do_rename(changefilename, target) + self._logger.info('Successfully installed %s %s to %s' % (sourcename, version, self._name)) + if self._mail_on_success: + self._success_logger.info('Successfully installed %s %s to %s' % (sourcename, version, self._name)) + + if self._post_install_script: + try: + self._logger.debug("Running post-installation script: " + self._post_install_script) + self._run_script(target, self._post_install_script) + except: + self._logger.exception("failure while running post-installation script") + return 0 + return 1 + + def _install_changefile_internal(self, changefilename, changefile): + sourcename = changefile['source'] + version = changefile['version'] + incomingdir = os.path.dirname(changefilename) + newfiles = [] + is_native = not native_version_re.match(version) + if is_native: + (ignored, newdebianver) = parse_versions(version) + else: + (newupstreamver, newdebianver) = parse_versions(version) + is_sourceful = 0 + for file in map(lambda x: x[4], changefile.getFiles()): + match = debpackage_re.search(file) + if match: + arch = match.group(3) + if not arch in self._arches: + raise DinstallException("Unknown architecture: %s" % (arch)) + target = self._arch_target(arch, file) + newfiles.append((os.path.join(incomingdir, file), target, match.group(1), arch)) + continue + match = debsrc_diff_re.search(file) + if match: + is_sourceful = 1 + target = self._source_target(file) + newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) + continue + match = debsrc_orig_re.search(file) + if match: + is_sourceful = 1 + target = self._source_target(file) + newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) + continue + match = debsrc_native_re.search(file) + if match: + is_sourceful = 1 + target = self._source_target(file) + newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) + continue + match = debsrc_dsc_re.search(file) or debsrc_orig_re.search(file) + if match: + is_sourceful = 1 + target = self._source_target(file) + newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) + continue + + all_arches = {} + for arch in map(lambda x: x[3], newfiles): + all_arches[arch] = 1 + completed = [] + oldfiles = [] + if not self._keep_old: + found_old_bins = 0 + for (oldversion, oldarch) in map(lambda x: x[1:], self._get_package_versions()): + if not all_arches.has_key(oldarch) and apt_pkg.VersionCompare(oldversion, version) < 0: + found_old_bins = 1 + for (pkgname, arch) in map(lambda x: x[2:], newfiles): + if arch == 'source' and found_old_bins: + continue + self._logger.debug('Scanning for old files') + for file in self._read_arch_dir(arch): + match = debpackage_re.search(file) + if not match: + continue + oldpkgname = match.group(1) + oldarch = match.group(3) + file = self._arch_target(arch, file) + if not file in map(lambda x: x[0], oldfiles): + target = file + tmp_old_suffix + if oldpkgname == pkgname and oldarch == arch: + oldfiles.append((file, target)) + self._logger.debug('Scanning "%s" for old files' % (self._abspath('source'))) + for file in self._read_source_dir(): + file = self._source_target(file) + if not file in map(lambda x: x[0], oldfiles): + target = file + tmp_old_suffix + match = debchanges_re.search(file) + if not match and is_sourceful: + match = debsrc_dsc_re.search(file) or debsrc_diff_re.search(file) + if match and match.group(1) == sourcename: + oldfiles.append((file, target)) + continue + # We skip the rest of this if it wasn't a + # sourceful upload; really all we do if it isn't + # is clean out old .changes files. + if not is_sourceful: + continue + match = debsrc_orig_re.search(file) + if match and match.group(1) == sourcename: + if not is_native: + (oldupstreamver, olddebianver) = parse_versions(match.group(2)) + if apt_pkg.VersionCompare(oldupstreamver, newupstreamver) < 0: + self._logger.debug('old upstream tarball "%s" version %s < %s, tagging for deletion' % (file, oldupstreamver, newupstreamver)) + oldfiles.append((file, target)) + continue + else: + self._logger.debug('keeping upstream tarball "%s" version %s' % (file, oldupstreamver)) + continue + else: + self._logger.debug('old native tarball "%s", tagging for deletion' % (file,)) + oldfiles.append((file, target)) + continue + match = debsrc_native_re.search(file) + if match and match.group(1) in map(lambda x: x[2], newfiles): + oldfiles.append((file, target)) + continue + + self._clean_targets = map(lambda x: x[1], oldfiles) + allrenames = oldfiles + map(lambda x: x[:2], newfiles) + try: + while not allrenames == []: + (oldname, newname) = allrenames[0] + do_rename(oldname, newname) + completed.append(allrenames[0]) + allrenames = allrenames[1:] + except OSError, e: + logger.exception("Failed to do rename (%s); attempting rollback" % (e.strerror,)) + try: + self._logger.error(traceback.format_tb(sys.exc_traceback)) + except: + pass + # Unwind to previous state + for (newname, oldname) in completed: + do_rename(oldname, newname) + raise + self._clean_targets = [] + # remove old files + self.clean() + + def _run_script(self, changefilename, script): + if script: + script = os.path.expanduser(script) + cmd = '%s %s' % (script, changefilename) + self._logger.info('Running \"%s\"' % (cmd,)) + if not no_act: + if not os.access(script, os.X_OK): + self._logger.error("Can't execute script \"%s\"" % (script,)) + return 1 + pid = os.fork() + if pid == 0: + os.execlp(script, script, changefilename) + sys.exit(1) + (pid, status) = os.waitpid(pid, 0) + if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)): + self._logger.error("script \"%s\" exited with error code %d" % (cmd, os.WEXITSTATUS(status))) + return 1 + return 0 + + def _reject_changefile(self, changefilename, changefile, exception): + sourcename = changefile['source'] + version = changefile['version'] + incomingdir = os.path.dirname(changefilename) + try: + f = open(os.path.join(rejectdir, "%s_%s.reason" % (sourcename, version)), 'w') + if type(exception) == type('string'): + f.write(exception) + else: + traceback.print_exception(Exception, exception, None, None, f) + f.close() + for file in map(lambda x: x[4], changefile.getFiles()): + if os.access(os.path.join(incomingdir, file), os.R_OK): + file = os.path.join(incomingdir, file) + else: + file = self._abspath(file) + target = os.path.join(rejectdir, os.path.basename(file)) + do_rename(file, target) + self._logger.info('Rejecting "%s": %s' % (changefilename, `exception`)) + except Exception: + self._logger.error("Unhandled exception while rejecting %s; archive may be in inconsistent state" % (changefilename,)) + raise + + def clean(self): + self._logger.debug('Removing old files') + for file in self._clean_targets: + self._logger.debug('Deleting "%s"' % (file,)) + if not no_act: + os.unlink(file) + +class SimpleSubdirArchiveDir(ArchiveDir): + def __init__(self, *args, **kwargs): + apply(ArchiveDir.__init__, [self] + list(args), kwargs) + for arch in list(self._arches) + ['source']: + target = os.path.join(self._dir, arch) + do_mkdir(target) + + def _read_source_dir(self): + return os.listdir(self._abspath('source')) + + def _read_arch_dir(self, arch): + return os.listdir(self._abspath(arch)) + + def _arch_target(self, arch, file): + return self._abspath(arch, file) + + def _source_target(self, file): + return self._arch_target('source', file) + + def _get_package_versions(self): + ret = [] + for arch in self._arches: + for file in self._read_arch_dir(arch): + match = debpackage_re.search(file) + if match: + ret.append((match.group(1), match.group(2), match.group(3))) + return ret + + +class FlatArchiveDir(ArchiveDir): + def _read_source_dir(self): + return os.listdir(self._dir) + + def _read_arch_dir(self, arch): + return os.listdir(self._dir) + + def _arch_target(self, arch, file): + return self._abspath(file) + + def _source_target(self, file): + return self._arch_target('source', file) + + def _get_package_versions(self): + ret = [] + for file in self._abspath(''): + match = debpackage_re.search(file) + if match: + ret.append((match.group(1), match.group(2), match.group(3))) + return ret + +class ArchiveDirIndexer(threading.Thread): + def __init__(self, dir, logger, configdict, use_dnotify=0, batch_mode=1): + self._dir = dir + self._name = os.path.basename(os.path.abspath(dir)) + threading.Thread.__init__(self, name=self._name) + self._logger = logger + self._eventqueue = Queue.Queue() + for key in configdict.keys(): + self._logger.debug("Setting \"%s\" => \"%s\" in archive \"%s\"" % ('_'+key, configdict[key], self._name)) + self.__dict__['_' + key] = configdict[key] + do_mkdir(dir) + self._use_dnotify = use_dnotify + self._batch_mode = batch_mode + self._done_event = threading.Event() + + def _abspath(self, *args): + return os.path.abspath(apply(os.path.join, [self._dir] + list(args))) + + def _relpath(self, *args): + return apply(os.path.join, [self._name] + list(args)) + + def _make_indexfile(self, dir, type, name): + cmdline = ['apt-ftparchive', type, dir] + self._logger.debug("Running: " + string.join(cmdline, ' ')) + if no_act: + return + (infd, outfd) = os.pipe() + pid = os.fork() + if pid == 0: + os.chdir(self._dir) + os.chdir('..') + os.close(infd) + misc.dup2(outfd, 1) + os.execvp('apt-ftparchive', cmdline) + os.exit(1) + os.close(outfd) + stdout = os.fdopen(infd) + packagesfile = open(os.path.join(dir, name), 'w') + zpackagesfile = gzip.GzipFile(os.path.join(dir, name + '.gz'), 'w') + buf = stdout.read(8192) + while buf != '': + packagesfile.write(buf) + zpackagesfile.write(buf) + buf = stdout.read(8192) + stdout.close() + (pid, status) = os.waitpid(pid, 0) + if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)): + raise DinstallException("apt-ftparchive exited with status code %d" % (status,)) + packagesfile.close() + zpackagesfile.close() + + def _make_packagesfile(self, dir): + self._make_indexfile(dir, 'packages', 'Packages') + + def _make_sourcesfile(self, dir): + self._make_indexfile(dir, 'sources', 'Sources') + + def _make_releasefile(self): + targetname = self._abspath('Release') + if not self._generate_release: + if os.access(targetname, os.R_OK): + self._logger.info("Release generation disabled, removing existing Release file") + try: + os.unlink(targetname) + except OSError, e: + pass + return + tmpname = targetname + tmp_new_suffix + release_needed = 0 + uncompr_indexfiles = self._get_all_indexfiles() + indexfiles = [] + comprexts = ['.gz'] + for index in uncompr_indexfiles: + indexfiles = indexfiles + [index] + for ext in comprexts: + indexfiles = indexfiles + [index + ext] + if os.access(targetname, os.R_OK): + release_mtime = os.stat(targetname)[stat.ST_MTIME] + for file in indexfiles: + if release_needed: + break + if os.stat(self._abspath(file))[stat.ST_MTIME] > release_mtime: + release_needed = 1 + else: + release_needed = 1 + + if not release_needed: + self._logger.info("Skipping Release generation") + return + self._logger.info("Generating Release...") + if no_act: + self._logger.info("Release generation complete") + return + f = open(tmpname, 'w') + f.write('Origin: ' + self._release_origin + '\n') + f.write('Label: ' + self._release_label + '\n') + suite = self._release_suite + if not suite: + suite = self._name + f.write('Suite: ' + suite + '\n') + codename = self._release_codename + if not codename: + codename = suite + f.write('Codename: ' + codename + '\n') + f.write('Date: ' + time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) + '\n') + f.write('Architectures: ' + string.join(self._arches, ' ') + '\n') + if self._release_description: + f.write('Description: ' + self._release_description + '\n') + f.write('MD5Sum:\n') + for file in indexfiles: + absfile = self._abspath(file) + md5sum = self._get_file_sum('md5', absfile) + size = os.stat(absfile)[stat.ST_SIZE] + f.write(' %s% 16d %s\n' % (md5sum, size, file)) + f.write('SHA1:\n') + for file in indexfiles: + absfile = self._abspath(file) + shasum = self._get_file_sum('sha1', absfile) + size = os.stat(absfile)[stat.ST_SIZE] + f.write(' %s% 16d %s\n' % (shasum, size, file)) + f.close() + if self._sign_releasefile(tmpname): + os.rename(tmpname, targetname) + self._logger.info("Release generation complete") + + def _sign_releasefile(self, name): + if self._release_signscript: + try: + self._logger.debug("Running Release signing script: " + self._release_signscript) + if self._run_script(name, self._release_signscript, dir=self._abspath()): + return None + except: + self._logger.exception("failure while running Release signature script") + return None + return 1 + + # Copied from ArchiveDir + def _run_script(self, changefilename, script, dir=None): + if script: + script = os.path.expanduser(script) + cmd = '%s %s' % (script, changefilename) + self._logger.info('Running \"%s\"' % (cmd,)) + if not no_act: + if not os.access(script, os.X_OK): + self._logger.error("Can't execute script \"%s\"" % (script,)) + return 1 + pid = os.fork() + if pid == 0: + if dir: + os.chdir(dir) + os.execlp(script, script, changefilename) + sys.exit(1) + (pid, status) = os.waitpid(pid, 0) + if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)): + self._logger.error("script \"%s\" exited with error code %d" % (cmd, os.WEXITSTATUS(status))) + return 1 + return 0 + + # Hacked up from ChangeFile.py; FIXME: merge the two + def _get_file_sum(self, type, filename): + if os.access('/usr/bin/%ssum' % (type,), os.X_OK): + cmd = '/usr/bin/%ssum %s' % (type, filename,) + self._logger.debug("Running: %s" % (cmd,)) + child = popen2.Popen3(cmd, 1) + child.tochild.close() + erroutput = child.childerr.read() + child.childerr.close() + if erroutput != '': + child.fromchild.close() + raise DinstallException("%ssum returned error output \"%s\"" % (type, erroutput,)) + (sum, filename) = string.split(child.fromchild.read(), None, 1) + child.fromchild.close() + status = child.wait() + if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)): + if os.WIFEXITED(status): + msg = "%ssum exited with error code %d" % (type, os.WEXITSTATUS(status),) + elif os.WIFSTOPPED(status): + msg = "%ssum stopped unexpectedly with signal %d" % (type, os.WSTOPSIG(status),) + elif os.WIFSIGNALED(status): + msg = "%ssum died with signal %d" % (type, os.WTERMSIG(status),) + raise DinstallException(msg) + return sum.strip() + if type == 'md5': + import md5 + f = open(filename) + md5sum = md5.new() + buf = f.read(8192) + while buf != '': + md5sum.update(buf) + buf = f.read(8192) + return md5sum.hexdigest() + elif type == 'sha1': + import sha + f = open(filename) + shasum = sha.new() + buf = f.read(8192) + while buf != '': + shasum.update(buf) + buf = f.read(8192) + return shasum.hexdigest() + else: + raise DinstallException('cannot compute hash of type %s; no builtin method or /usr/bin/%ssum', type, type) + + def _index_all(self, force=None): + self._index(self._arches + ['source'], force) + + def run(self): + self._logger.info('Created new thread (%s) for archive indexer %s' % (self.getName(), self._name,)) + self._logger.info('Entering batch mode...') + try: + self._index_all(1) + self._make_releasefile() + if not self._batch_mode: + # never returns + self._daemonize() + self._done_event.set() + except Exception, e: + self._logger.exception("Unhandled exception; shutting down") + die_event.set() + self._done_event.set() + self._logger.info('Thread \"%s\" exiting' % (self.getName(),)) + + def _daemon_event_ispending(self): + return die_event.isSet() or (not self._eventqueue.empty()) + def _daemonize(self): + self._logger.info('Entering daemon mode...') + if self._dynamic_reindex: + self._dnotify = DirectoryNotifierFactory().create(self._get_dnotify_dirs(), use_dnotify=self._use_dnotify, poll_time=self._poll_time, cancel_event=die_event) + + self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name=self._name + " Indexer") + self._async_dnotify.start() + + # The main daemon loop + while 1: + + # Wait until we have a pending event + while not self._daemon_event_ispending(): + time.sleep(1) + + if die_event.isSet(): + break + + self._logger.debug('Reading from event queue') + setevent = None + dir = None + obj = self._eventqueue.get() + if type(obj) == type(''): + self._logger.debug('got dir change') + dir = obj + elif type(obj) == type(None): + self._logger.debug('got general event') + setevent = None + elif obj.__class__ == threading.Event().__class__: + self._logger.debug('got wait_reprocess event') + setevent = obj + else: + self._logger.error("unknown object %s in event queue" % (obj,)) + assert None + + # This is to protect against both lots of activity, and to + # prevent race conditions, so we can rely on timestamps. + time.sleep(1) + if not self._reindex_needed(): + if setevent: + self._logger.debug('setting wait_reprocess event') + setevent.set() + continue + if dir is None: + self._logger.debug('Got general change') + self._index_all(1) + else: + self._logger.debug('Got change in %s' % (dir,)) + self._index([os.path.basename(os.path.abspath(dir))]) + self._make_releasefile() + if setevent: + self._logger.debug('setting wait_reprocess event') + setevent.set() + + def _reindex_needed(self): + reindex_needed = 0 + if os.access(self._abspath('Release.gpg'), os.R_OK): + gpg_mtime = os.stat(self._abspath('Release.gpg'))[stat.ST_MTIME] + for dir in self._get_dnotify_dirs(): + dir_mtime = os.stat(self._abspath(dir))[stat.ST_MTIME] + if dir_mtime > gpg_mtime: + reindex_needed = 1 + else: + reindex_needed = 1 + return reindex_needed + + def _index(self, arches, force=None): + self._index_impl(arches, force=force) + + def wait_reprocess(self): + e = threading.Event() + self._eventqueue.put(e) + self._logger.debug('waiting on reprocess') + while not (e.isSet() or die_event.isSet()): + time.sleep(0.5) + self._logger.debug('done waiting on reprocess') + + def wait(self): + self._done_event.wait() + + def notify(self): + self._eventqueue.put(None) + +class SimpleSubdirArchiveDirIndexer(ArchiveDirIndexer): + def __init__(self, *args, **kwargs): + apply(ArchiveDirIndexer.__init__, [self] + list(args), kwargs) + for arch in list(self._arches) + ['source']: + target = os.path.join(self._dir, arch) + do_mkdir(target) + + def _index_impl(self, arches, force=None): + for arch in arches: + dirmtime = os.stat(self._relpath(arch))[stat.ST_MTIME] + if arch != 'source': + pkgsfile = self._relpath(arch, 'Packages') + if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: + self._logger.info('Generating Packages file for %s...' % (arch,)) + self._make_packagesfile(self._relpath(arch)) + self._logger.info('Packages generation complete') + else: + self._logger.info('Skipping generation of Packages file for %s' % (arch,)) + + else: + pkgsfile = self._relpath(arch, 'Sources') + if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: + self._logger.info('Generating Sources file for %s...' % (arch,)) + self._make_sourcesfile(self._relpath('source')) + self._logger.info('Sources generation complete') + else: + self._logger.info('Skipping generation of Sources file for %s' % (arch,)) + + def _in_archdir(self, *args): + return apply(lambda x,self=self: self._abspath(x), args) + + def _get_dnotify_dirs(self): + return map(lambda x, self=self: self._abspath(x), self._arches + ['source']) + + def _get_all_indexfiles(self): + return map(lambda arch: os.path.join(arch, 'Packages'), self._arches) + ['source/Sources'] + +class FlatArchiveDirIndexer(ArchiveDirIndexer): + def __init__(self, *args, **kwargs): + apply(ArchiveDirIndexer.__init__, [self] + list(args), kwargs) + + def _index_impl(self, arches, force=None): + pkgsfile = self._abspath('Packages') + dirmtime = os.stat(self._relpath())[stat.ST_MTIME] + if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: + self._logger.info('Generating Packages file...') + self._make_packagesfile(self._relpath()) + self._logger.info('Packages generation complete') + else: + self._logger.info('Skipping generation of Packages file') + pkgsfile = self._abspath('Sources') + if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: + self._logger.info('Generating Sources file...') + self._make_sourcesfile(self._relpath()) + self._logger.info('Sources generation complete') + else: + self._logger.info('Skipping generation of Sources file') + + def _in_archdir(self, *args): + return apply(lambda x,self=self: self._abspath(x), args[1:]) + + def _get_dnotify_dirs(self): + return [self._dir] + + def _get_all_indexfiles(self): + return ['Packages', 'Sources'] + +if os.access(lockfilename, os.R_OK): + logger.critical("lockfile \"%s\" exists (pid %s): is another mini-dinstall running?" % (lockfilename, open(lockfilename).read(10))) + logging.shutdown() + sys.exit(1) +logger.debug('Creating lock file: ' + lockfilename) +if not no_act: + lockfile = open(lockfilename, 'w') + lockfile.close() + +if not batch_mode: + # daemonize + logger.debug("Daemonizing...") + if os.fork() == 0: + os.setsid() + if os.fork() != 0: + sys.exit(0) + else: + sys.exit(0) + sys.stdin.close() + sys.stdout.close() + sys.stderr.close() + os.close(0) + os.close(1) + os.close(2) + # unix file descriptor allocation ensures that the followin are fd 0,1,2 + sys.stdin = open("/dev/null") + sys.stdout = open("/dev/null") + sys.stderr = open("/dev/null") + logger.debug("Finished daemonizing (pid %s)" % (os.getpid(),)) + +lockfile = open(lockfilename, 'w') +lockfile.write("%s" % (os.getpid(),)) +lockfile.close() + +if not (debug_mode or batch_mode): + # Don't log to stderr past this point + logger.removeHandler(stderr_handler) + +archivemap = {} +# Instantiaate archive classes for installing files +for dist in distributions.keys(): + if distributions[dist]['archive_style'] == 'simple-subdir': + newclass = SimpleSubdirArchiveDir + else: + newclass = FlatArchiveDir + archivemap[dist] = [newclass(dist, logger, distributions[dist], batch_mode=batch_mode, keyrings=default_keyrings, extra_keyrings=default_extra_keyrings), None] + +# Create archive indexing threads, but don't start them yet +for dist in distributions.keys(): + targetdir = os.path.join(toplevel_directory, dist) + logger.info('Initializing archive indexer %s' % (dist,)) + if distributions[dist]['archive_style'] == 'simple-subdir': + newclass = SimpleSubdirArchiveDirIndexer + else: + newclass = FlatArchiveDirIndexer + archive = newclass(targetdir, logger, distributions[dist], use_dnotify=use_dnotify, batch_mode=batch_mode) + archivemap[dist][1] = archive + +# Now: kick off the incoming processor +logger.info('Initializing incoming processor') +incoming = IncomingDir(incoming_subdir, archivemap, logger, trigger_reindex=trigger_reindex, poll_time=default_poll_time, max_retry_time=default_max_retry_time, batch_mode=batch_mode, verify_sigs=default_verify_sigs) +logger.debug('Starting incoming processor') +incoming.start() +if batch_mode: + logger.debug('Waiting for incoming processor to finish') + incoming.wait() + +# Once we've installed everything, start the indexing threads +for dist in distributions.keys(): + archive = archivemap[dist][1] + logger.debug('Starting archive %s' % (archive.getName(),)) + archive.start() + +# Wait for all the indexing threads to finish; none of these ever +# return if we're in daemon mode +if batch_mode: + for dist in distributions.keys(): + archive = archivemap[dist][1] + logger.debug('Waiting for archive %s to finish' % (archive.getName(),)) + archive.wait() +else: + logger.debug("Waiting for die event") + die_event.wait() + logger.info('Die event caught; waiting for incoming processor to finish') + incoming.wait() + for dist in distributions.keys(): + archive = archivemap[dist][1] + logger.info('Die event caught; waiting for archive %s to finish' % (archive.getName(),)) + archive.wait() + +#logging.shutdown() +logger.debug('Removing lock file: ' + lockfilename) +os.unlink(lockfilename) +logger.info("main thread exiting...") +sys.exit(0) + +# vim:ts=4:sw=4:et: -- cgit v1.2.3