#!/usr/bin/python # -*- mode: python; coding: utf-8 -*- # Miniature version of "dinstall", for installing .changes into an # archive # Copyright © 2002,2003 Colin Walters # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os, sys, re, glob, getopt, time, traceback, gzip, getpass, socket import shutil, signal, threading, select, Queue, SocketServer import logging, logging.handlers #logging.basicConfig() import apt_pkg apt_pkg.init() from ConfigParser import * from minidinstall.ChangeFile import * from minidinstall.Dnotify import * from minidinstall.DebianSigVerifier import * from minidinstall.GPGSigVerifier import * from minidinstall.version import * from minidinstall import misc debchanges_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.changes$') debpackage_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.u?deb$') debsrc_dsc_re = re.compile('([-a-z0-9+.]+)_(.+?)\.dsc$') debsrc_diff_re = re.compile('([-a-z0-9+.]+)_(.+?)\.diff\.gz$') debsrc_orig_re = re.compile('([-a-z0-9+.]+)_(.+?)\.orig\.tar\.gz$') debsrc_native_re = re.compile('([-a-z0-9+.]+)_(.+?)\.tar\.gz$') native_version_re = re.compile('\s*.*-'); toplevel_directory = None tmp_new_suffix = '.dinstall-new' tmp_old_suffix = '.dinstall-old' dinstall_subdir = 'mini-dinstall' incoming_subdir = 'incoming' socket_name = 'master' logfile_name = 'mini-dinstall.log' configfile_names = ['/etc/mini-dinstall.conf', '~/.mini-dinstall.conf'] use_dnotify = 0 mail_on_success = 1 default_poll_time = 30 default_max_retry_time = 2 * 24 * 60 * 60 default_mail_log_level = logging.ERROR trigger_reindex = 1 mail_log_flush_level = logging.ERROR mail_log_flush_count = 10 mail_to = getpass.getuser() mail_server = 'localhost' incoming_permissions = 0750 default_architectures = ["all", "i386"] default_distributions = ("unstable",) distributions = {} scantime = 60 def usage(ecode, ver_only=None): print "mini-dinstall", pkg_version if ver_only: sys.exit(ecode) print "Copyright (C) 2002 Colin Walters " print "Licensed under the GNU GPL." print "Usage: mini-dinstall [OPTIONS...] [DIRECTORY]" print "Options:" print " -v, --verbose\t\tDisplay extra information" print " -q, --quiet\t\tDisplay less information" print " -c, --config=FILE\tParse configuration info from FILE" print " -d, --debug\t\tOutput information to stdout as well as log" print " --no-log\t\tDon't write information to log file" print " -n, --no-act\t\tDon't actually perform changes" print " -b, --batch\t\tDon't daemonize; run once, then exit" print " -r, --run\t\tProcess queue immediately" print " -k, --kill\t\tKill the running mini-dinstall" print " --help\t\tWhat you're looking at" print " --version\t\tPrint the software version and exit" sys.exit(ecode) try: opts, args = getopt.getopt(sys.argv[1:], 'vqc:dnbrk', ['verbose', 'quiet', 'config=', 'debug', 'no-log', 'no-act', 'batch', 'run', 'kill', 'help', 'version', ]) except getopt.GetoptError, e: sys.stderr.write("Error reading arguments: %s\n" % e) usage(1) for (key, val) in opts: if key == '--help': usage(0) elif key == '--version': usage(0, ver_only=1) if len(args) > 1: sys.stderr.write("Unknown arguments: %s\n" % args[1:]) usage(1) # don't propagate exceptions that happen while logging logging.raiseExceptions = 0 logger = logging.getLogger("mini-dinstall") loglevel = logging.WARN no_act = 0 debug_mode = 0 run_mode = 0 kill_mode = 0 no_log = 0 batch_mode = 0 custom_config_files = 0 for key, val in opts: if key in ('-v', '--verbose'): if loglevel == logging.INFO: loglevel = logging.DEBUG elif loglevel == logging.WARN: loglevel = logging.INFO elif key in ('-q', '--quiet'): if loglevel == logging.WARN: loglevel = logging.ERROR elif loglevel == logging.WARN: loglevel = logging.CRITICAL elif key in ('-c', '--config'): if not custom_config_files: custom_config_files = 1 configfile_names = [] configfile_names.append(os.path.abspath(os.path.expanduser(val))) elif key in ('-n', '--no-act'): no_act = 1 elif key in ('-d', '--debug'): debug_mode = 1 elif key in ('--no-log',): no_log = 1 elif key in ('-b', '--batch'): batch_mode = 1 elif key in ('-r', '--run'): run_mode = 1 elif key in ('-k', '--kill'): kill_mode = 1 def do_mkdir(name): if os.access(name, os.X_OK): return try: logger.info('Creating directory "%s"' % (name)) except: pass if not no_act: os.mkdir(name) def do_rename(source, target): try: logger.debug('Renaming "%s" to "%s"' % (source, target)) except: pass if not no_act: os.rename(source, target) def do_chmod(name, mode): try: logger.info('Changing mode of "%s" to %o' % (name, mode)) except: pass if not no_act: os.chmod(name, mode) logger.setLevel(logging.DEBUG) stderr_handler = logging.StreamHandler(strm=sys.stderr) stderr_handler.setLevel(loglevel) logger.addHandler(stderr_handler) stderr_handler.setLevel(loglevel) stderr_handler.setFormatter(logging.Formatter(fmt="%(name)s [%(thread)d] %(levelname)s: %(message)s")) configp = ConfigParser() configfile_names = map(lambda x: os.path.abspath(os.path.expanduser(x)), configfile_names) logger.debug("Reading config files: %s" % (configfile_names,)) configp.read(configfile_names) class SubjectSpecifyingLoggingSMTPHandler(logging.handlers.SMTPHandler): def __init__(self, subject, *args, **kwargs): self._subject = subject apply(logging.handlers.SMTPHandler.__init__, [self] + list(args) + ['dummy'], kwargs) def getSubject(self, record): return re.sub('%l', record.levelname, self._subject) if not (configp.has_option('DEFAULT', 'mail_log_level') and configp.get('DEFAULT', 'mail_log_level') == 'NONE'): if configp.has_option('DEFAULT', 'mail_log_level'): mail_log_level = logging.__dict__[configp.get('DEFAULT', 'mail_log_level')] else: mail_log_level = default_mail_log_level if configp.has_option('DEFAULT', 'mail_to'): mail_to = configp.get('DEFAULT', 'mail_to') if configp.has_option('DEFAULT', 'mail_server'): mail_server = configp.get('DEFAULT', 'mail_server') if configp.has_option('DEFAULT', 'mail_log_flush_count'): mail_log_flush_count = configp.getint('DEFAULT', 'mail_log_flush_count') if configp.has_option('DEFAULT', 'mail_log_flush_level'): mail_log_flush_level = logging.__dict__[configp.get('DEFAULT', 'mail_log_flush_level')] mail_smtp_handler = SubjectSpecifyingLoggingSMTPHandler('mini-dinstall log notice (%l)', mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(),socket.gethostname()), [mail_to]) mail_handler = logging.handlers.MemoryHandler(mail_log_flush_count, flushLevel=mail_log_flush_level, target=mail_smtp_handler) mail_handler.setLevel(mail_log_level) logger.addHandler(mail_handler) if configp.has_option('DEFAULT', 'archivedir'): toplevel_directory = os.path.expanduser(configp.get('DEFAULT', 'archivedir')) elif len(args) > 0: toplevel_directory = args[0] else: logger.error("No archivedir specified on command line or in config files.") sys.exit(1) if configp.has_option('DEFAULT', 'incoming_permissions'): incoming_permissions = int(configp.get('DEFAULT', 'incoming_permissions'), 8) do_mkdir(toplevel_directory) dinstall_subdir = os.path.join(toplevel_directory, dinstall_subdir) do_mkdir(dinstall_subdir) lockfilename = os.path.join(dinstall_subdir, 'mini-dinstall.lock') def process_exists(pid): try: os.kill(pid, 0) except OSError, e: return 0 return 1 if os.access(lockfilename, os.R_OK): pid = int(open(lockfilename).read()) if not process_exists(pid): if run_mode: logger.error("No process running at %d; use mini-dinstall -k to remove lockfile") sys.exit(1) logger.warn("No process running at %d, removing lockfile" % (pid,)) os.unlink(lockfilename) if kill_mode: sys.exit(0) if not os.path.isabs(socket_name): socket_name = os.path.join(dinstall_subdir, socket_name) if run_mode or kill_mode: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) logger.debug('Connecting...') sock.connect(socket_name) if run_mode: logger.debug('Sending RUN command') sock.send('RUN\n') else: logger.debug('Sending DIE command') sock.send('DIE\n') logger.debug('Reading response') response = sock.recv(8192) print response sys.exit(0) if configp.has_option('DEFAULT', 'logfile'): logfile_name = configp.get('DEFAULT', 'logfile') if not no_log: if not os.path.isabs(logfile_name): logfile_name = os.path.join(dinstall_subdir, logfile_name) logger.debug("Adding log file: %s" % (logfile_name,)) filehandler = logging.FileHandler(logfile_name) if loglevel == logging.WARN: filehandler.setLevel(logging.INFO) else: filehandler.setLevel(logging.DEBUG) logger.addHandler(filehandler) filehandler.setFormatter(logging.Formatter(fmt="%(asctime)s %(name)s [%(thread)d] %(levelname)s: %(message)s", datefmt="%b %d %H:%M:%S")) logger.info('Booting mini-dinstall ' + pkg_version) class DinstallException(Exception): def __init__(self, value): self._value = value def __str__(self): return `self._value` if not configp.has_option('DEFAULT', 'archive_style'): logger.critical("You must set the default archive_style option (since version 0.4.0)") logging.shutdown() sys.exit(1) default_verify_sigs = os.access('/usr/share/keyrings/debian-keyring.gpg', os.R_OK) default_extra_keyrings = [] default_keyrings = None if configp.has_option('DEFAULT', 'architectures'): default_architectures = string.split(configp.get('DEFAULT', 'architectures'), ', ') if configp.has_option('DEFAULT', 'verify_sigs'): default_verify_sigs = configp.getboolean('DEFAULT', 'verify_sigs') if configp.has_option('DEFAULT', 'trigger_reindex'): default_trigger_reindex = configp.getboolean('DEFAULT', 'trigger_reindex') if configp.has_option('DEFAULT', 'poll_time'): default_poll_time = configp.getint('DEFAULT', 'poll_time') if configp.has_option('DEFAULT', 'max_retry_time'): default_max_retry_time = configp.getint('DEFAULT', 'max_retry_time') if configp.has_option('DEFAULT', 'extra_keyrings'): default_extra_keyrings = re.split(', ?', configp.get('DEFAULT', 'extra_keyrings')) if configp.has_option('DEFAULT', 'keyrings'): default_keyrings = re.split(', ?', configp.get('DEFAULT', 'keyrings')) if configp.has_option('DEFAULT', 'use_dnotify'): use_dnotify = configp.getboolean('DEFAULT', 'use_dnotify') sects = configp.sections() if not len(sects) == 0: for sect in sects: distributions[sect] = {} if configp.has_option(sect, "architectures"): distributions[sect]["arches"] = string.split(configp.get(sect, "architectures"), ', ') else: distributions[sect]["arches"] = default_architectures else: for dist in default_distributions: distributions[dist] = {"arches": default_architectures} class DistOptionHandler: def __init__(self, distributions, configp): self._configp = configp self._distributions = distributions self._optionmap = {} self._optionmap['poll_time'] = ['int', default_poll_time] # two days self._optionmap['max_retry_time'] = ['int', default_max_retry_time] self._optionmap['post_install_script'] = ['str', None] self._optionmap['pre_install_script'] = ['str', None] self._optionmap['dynamic_reindex'] = ['bool', 1] self._optionmap['chown_changes_files'] = ['bool', 1] self._optionmap['keep_old'] = ['bool', None] self._optionmap['mail_on_success'] = ['bool', 1] self._optionmap['archive_style'] = ['str', None] # Release file stuff self._optionmap['generate_release'] = ['bool', 0] self._optionmap['release_origin'] = ['str', getpass.getuser()] self._optionmap['release_label'] = ['str', self._optionmap['release_origin'][1]] self._optionmap['release_suite'] = ['str', None] self._optionmap['release_codename'] = ['str', None] self._optionmap['release_description'] = ['str', None] self._optionmap['release_signscript'] = ['str', None] def get_option_map(self, dist): ret = self._distributions[dist] for key in self._optionmap.keys(): type = self._optionmap[key][0] ret[key] = self._optionmap[key][1] if self._configp.has_option ('DEFAULT', key): ret[key] = self.get_option (type, 'DEFAULT', key) if self._configp.has_option (dist, key): ret[key] = self.get_option (type, dist, key) return ret def get_option (self, type, dist, key): if type == 'int': return self._configp.getint(dist, key) elif type == 'str': return self._configp.get(dist, key) elif type == 'bool': return self._configp.getboolean(dist, key) assert(None) distoptionhandler = DistOptionHandler(distributions, configp) for dist in distributions.keys(): distributions[dist] = distoptionhandler.get_option_map(dist) if not distributions[dist]['archive_style'] in ('simple-subdir', 'flat'): raise DinstallException("Unknown archive style \"%s\"" % (distributions[dist]['archive_style'],)) logger.debug("Distributions: %s" % (distributions,)) # class DinstallTransaction: # def __init__(self, dir): # self._dir = dir # def start(self, pkgname): # self._pkgname = pkgname # self._transfilename = os.path.join(dir, pkgname + ".transaction") # def _log_op(self, type, state, str): # tmpfile = self._transfilename + ".tmp" # if (os.access(self._transfilename), os.R_OK): # shutil.copyFile(self._transfilename, tmpfile) # f = open(tmpfile, 'w') # f.write('%s %s' % (type, str) ) # f.close() # def _start_op(self, type, str): # self._log_op(type, 'start', str) # def _stop_op(self, type, str): # self._log_op(type, 'stop', str) # def renameFile(self, source, dst): # self._start_op('rename', # def _sync(): # os.system("sync") os.chdir(toplevel_directory) do_mkdir(dinstall_subdir) rejectdir = os.path.join(dinstall_subdir, 'REJECT') incoming_subdir = os.path.join(dinstall_subdir, incoming_subdir) do_mkdir(rejectdir) do_mkdir(incoming_subdir) do_chmod(incoming_subdir, incoming_permissions) ## IPC stuff # Used by all threads to determine whether or not they should exit die_event = threading.Event() # These global variables are used in IncomingDir::daemonize # I couldn't figure out any way to pass state to a BaseRequestHandler. reprocess_needed = threading.Event() reprocess_finished = threading.Event() reprocess_lock = threading.Lock() class IncomingDirRequestHandler(SocketServer.StreamRequestHandler, SocketServer.BaseRequestHandler): def handle(self): logger.debug('Got request from %s' % (self.client_address,)) req = self.rfile.readline() if req == 'RUN\n': logger.debug('Doing RUN command') reprocess_lock.acquire() reprocess_needed.set() logger.debug('Waiting on reprocessing') reprocess_finished.wait() reprocess_finished.clear() reprocess_lock.release() self.wfile.write('200 Reprocessing complete\n') elif req == 'DIE\n': logger.debug('Doing DIE command') self.wfile.write('200 Beginning shutdown\n') die_event.set() else: logger.debug('Got unknown command %s' % (req,)) self.wfile.write('500 Unknown request\n') class ExceptionThrowingThreadedUnixStreamServer(SocketServer.ThreadingUnixStreamServer): def handle_error(self, request, client_address): self._logger.exception("Unhandled exception during request processing; shutting down") die_event.set() class IncomingDir(threading.Thread): def __init__(self, dir, archivemap, logger, trigger_reindex=1, poll_time=30, max_retry_time=172800, batch_mode=0, verify_sigs=0): threading.Thread.__init__(self, name="incoming") self._dir = dir self._archivemap = archivemap self._logger = logger self._trigger_reindex = trigger_reindex self._poll_time = poll_time self._batch_mode = batch_mode self._verify_sigs = verify_sigs self._max_retry_time = max_retry_time self._last_failed_targets = {} self._eventqueue = Queue.Queue() self._done_event = threading.Event() # ensure we always have some reprocess queue self._reprocess_queue = {} def run(self): self._logger.info('Created new installer thread (%s)' % (self.getName(),)) self._logger.info('Entering batch mode...') initial_reprocess_queue = [] initial_fucked_list = [] try: for (changefilename, changefile) in self._get_changefiles(): if self._changefile_ready(changefilename, changefile): try: self._install_changefile(changefilename, changefile, 0) except Exception: logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,)) initial_fucked_list.append(changefilename) else: self._logger.warn('Skipping "%s"; upload incomplete' % (changefilename,)) initial_reprocess_queue.append(changefilename) if not self._batch_mode: self._daemonize(initial_reprocess_queue, initial_fucked_list) self._done_event.set() self._logger.info('All packages in incoming dir installed; exiting') except Exception, e: self._logger.exception("Unhandled exception; shutting down") die_event.set() self._done_event.set() return 0 def _abspath(self, *args): return os.path.abspath(apply(os.path.join, [self._dir] + list(args))) def _get_changefiles(self): ret = [] globpath = self._abspath("*.changes") self._logger.debug("glob: " + globpath) changefilenames = glob.glob(globpath) for changefilename in changefilenames: if not self._reprocess_queue.has_key(changefilename): self._logger.info('Examining "%s"' % (changefilename,)) changefile = ChangeFile() try: changefile.load_from_file(changefilename) except ChangeFileException: self._logger.debug("Unable to parse \"%s\", skipping" % (changefilename,)) continue ret.append((changefilename, changefile)) else: self._logger.debug('Skipping "%s" during new scan because it is in the reprocess queue.' % (changefilename,)) return ret def _changefile_ready(self, changefilename, changefile): try: dist = changefile['distribution'] except KeyError, e: self._logger.warn("Unable to read distribution field for \"%s\"; data: %s" % (changefilename, changefile,)) return 0 try: changefile.verify(self._abspath('')) except ChangeFileException: return 0 return 1 def _install_changefile(self, changefilename, changefile, doing_reprocess): dist = changefile['distribution'] if not dist in self._archivemap.keys(): raise DinstallException('Unknown distribution "%s" in \"%s\"' % (dist, changefilename,)) logger.debug('Installing %s in archive %s' % (changefilename, self._archivemap[dist][1].getName())) self._archivemap[dist][0].install(changefilename, changefile, self._verify_sigs) if self._trigger_reindex: if doing_reprocess: logger.debug('Waiting on archive %s to reprocess' % (self._archivemap[dist][1].getName())) self._archivemap[dist][1].wait_reprocess() else: logger.debug('Notifying archive %s of change' % (self._archivemap[dist][1].getName())) self._archivemap[dist][1].notify() logger.debug('Finished processing %s' % (changefilename)) def _reject_changefile(self, changefilename, changefile, e): dist = changefile['distribution'] if not dist in self._archivemap: raise DinstallException('Unknown distribution "%s" in \"%s\"' % (dist, changefilename,)) self._archivemap[dist][0].reject(changefilename, changefile, e) def _daemon_server_isready(self): (inready, outready, exready) = select.select([self._server.fileno()], [], [], 0) return len(inready) > 0 def _daemon_event_ispending(self): return die_event.isSet() or reprocess_needed.isSet() or self._daemon_server_isready() or (not self._eventqueue.empty()) def _daemon_reprocess_pending(self): curtime = time.time() for changefilename in self._reprocess_queue.keys(): (starttime, nexttime, delay) = self._reprocess_queue[changefilename] if curtime >= nexttime: return 1 return 0 def _daemonize(self, init_reprocess_queue, init_fucked_list): self._logger.info('Entering daemon mode...') self._dnotify = DirectoryNotifierFactory().create([self._dir], use_dnotify=use_dnotify, poll_time=self._poll_time, cancel_event=die_event) self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name="Incoming watcher") self._async_dnotify.start() try: os.unlink(socket_name) except OSError, e: pass self._server = ExceptionThrowingThreadedUnixStreamServer(socket_name, IncomingDirRequestHandler) self._server.allow_reuse_address = 1 retry_time = 30 self._reprocess_queue = {} fucked = init_fucked_list doing_reprocess = 0 # Initialize the reprocessing queue for changefilename in init_reprocess_queue: curtime = time.time() self._reprocess_queue[changefilename] = [curtime, curtime, retry_time] # The main daemon loop while 1: # Wait until we have something to do while not (self._daemon_event_ispending() or self._daemon_reprocess_pending()): time.sleep(0.5) self._logger.debug('Checking for pending server requests') if self._daemon_server_isready(): self._logger.debug('Handling one request') self._server.handle_request() self._logger.debug('Checking for DIE event') if die_event.isSet(): self._logger.debug('DIE event caught') break self._logger.debug('Scanning for changes') # do we have anything to reprocess? for changefilename in self._reprocess_queue.keys(): (starttime, nexttime, delay) = self._reprocess_queue[changefilename] curtime = time.time() try: changefile = ChangeFile() changefile.load_from_file(changefilename) except (ChangeFileException,IOError), e: if not os.path.exists(changefilename): self._logger.info('Changefile "%s" got removed' % (changefilename,)) else: self._logger.exception("Unable to load change file \"%s\"" % (changefilename,)) self._logger.warn("Marking \"%s\" as screwed" % (changefilename,)) fucked.append(changefilename) del self._reprocess_queue[changefilename] continue if (curtime - starttime) > self._max_retry_time: # We've tried too many times; reject it. self._reject_changefile(changefilename, changefile, DinstallException("Couldn't install \"%s\" in %d seconds" % (changefilename, self._max_retry_time))) elif curtime >= nexttime: if self._changefile_ready(changefilename, changefile): # Let's do it! self._logger.debug('Preparing to install "%s"' % (changefilename,)) try: self._install_changefile(changefilename, changefile, doing_reprocess) self._logger.debug('Removing "%s" from incoming queue after successful install.' % (changefilename,)) del self._reprocess_queue[changefilename] except Exception, e: logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,)) fucked.append(changefilename) else: delay *= 2 if delay > 60 * 60: delay = 60 * 60 self._logger.info('Upload "%s" isn\'t complete; marking for retry in %d seconds' % (changefilename, delay)) self._reprocess_queue[changefilename][1:3] = [time.time() + delay, delay] # done reprocessing; now scan for changed dirs. relname = None self._logger.debug('Checking dnotify event queue') if not self._eventqueue.empty(): relname = os.path.basename(os.path.abspath(self._eventqueue.get())) self._logger.debug('Got %s from dnotify' % (relname,)) if relname is None: if (not doing_reprocess) and reprocess_needed.isSet(): self._logger.info('Got reprocessing event') reprocess_needed.clear() doing_reprocess = 1 if relname is None and (not doing_reprocess): self._logger.debug('No events to process') continue for (changefilename, changefile) in self._get_changefiles(): if changefilename in fucked: self._logger.warn("Skipping screwed changefile \"%s\"" % (changefilename,)) continue # Have we tried this changefile before? if not self._reprocess_queue.has_key(changefilename): self._logger.debug('New change file "%s"' % (changefilename,)) if self._changefile_ready(changefilename, changefile): try: self._install_changefile(changefilename, changefile, doing_reprocess) except Exception, e: logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,)) fucked.append(changefilename) else: curtime = time.time() self._logger.info('Upload "%s" isn\'t complete; marking for retry in %d seconds' % (changefilename, retry_time)) self._reprocess_queue[changefilename] = [curtime, curtime + retry_time, retry_time] if doing_reprocess: doing_reprocess = 0 self._logger.info('Reprocessing complete') reprocess_finished.set() def wait(self): self._done_event.wait() def parse_versions(fullversion): debianversion = re.sub('^[0-9]+:', '', fullversion) upstreamver = re.sub('-[^-]*$', '', debianversion) return (upstreamver, debianversion) class ArchiveDir: def __init__(self, dir, logger, configdict, batch_mode=0, keyrings=None, extra_keyrings=None): self._dir = dir self._name = os.path.basename(os.path.abspath(dir)) self._logger = logger for key in configdict.keys(): self._logger.debug("Setting \"%s\" => \"%s\" in archive \"%s\"" % ('_'+key, configdict[key], self._name)) self.__dict__['_' + key] = configdict[key] do_mkdir(dir) self._batch_mode = batch_mode self._keyrings = keyrings if not extra_keyrings is None : self._extra_keyrings = extra_keyrings else: self._extra_keyrings = [] if self._mail_on_success: self._success_logger = logging.Logger("mini-dinstall." + self._name) self._success_logger.setLevel(logging.DEBUG) handler = SubjectSpecifyingLoggingSMTPHandler('mini-dinstall success notice', mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(),socket.gethostname()), [mail_to]) handler.setLevel(logging.DEBUG) self._success_logger.addHandler(handler) self._clean_targets = [] # self._filerefmap = {} # self._changefiles = [] def _abspath(self, *args): return os.path.abspath(apply(os.path.join, [self._dir] + list(args))) def _relpath(self, *args): return apply(os.path.join, [self._name] + list(args)) def install(self, changefilename, changefile, verify_sigs): retval = 0 try: retval = self._install_run_scripts(changefilename, changefile, verify_sigs) except Exception: self._logger.exception("Unhandled exception during installation") if not retval: self._logger.info('Failed to install "%s"' % (changefilename,)) def reject(self, changefilename, changefile, reason): self._reject_changefile(changefilename, changefile, reason) def _install_run_scripts(self, changefilename, changefile, verify_sigs): self._logger.info('Preparing to install \"%s\" in archive %s' % (changefilename, self._name,)) sourcename = changefile['source'] version = changefile['version'] if verify_sigs: self._logger.info('Verifying signature on "%s"' % (changefilename,)) try: if self._keyrings: verifier = DebianSigVerifier(keyrings=map(os.path.expanduser, self._keyrings), extra_keyrings=self._extra_keyrings) else: verifier = DebianSigVerifier(extra_keyrings=self._extra_keyrings) output = verifier.verify(changefilename) logger.debug(output) logger.info('Good signature on "%s"' % (changefilename,)) except GPGSigVerificationFailure, e: msg = "Failed to verify signature on \"%s\": %s\n" % (changefilename, e) msg += string.join(e.getOutput(), '') logger.error(msg) self._reject_changefile(changefilename, changefile, e) return 0 else: self._logger.debug('Skipping signature verification on "%s"' % (changefilename,)) if self._pre_install_script: try: self._logger.debug("Running pre-installation script: " + self._pre_install_script) if self._run_script(os.path.abspath(changefilename), self._pre_install_script): return 0 except: self._logger.exception("failure while running pre-installation script") return 0 try: self._install_changefile_internal(changefilename, changefile) except Exception, e: self._logger.exception('Failed to process "%s"' % (changefilename,)) self._reject_changefile(changefilename, changefile, e) return 0 if self._chown_changes_files: do_chmod(changefilename, 0600) target = os.path.join(self._dir, os.path.basename(changefilename)) # the final step do_rename(changefilename, target) self._logger.info('Successfully installed %s %s to %s' % (sourcename, version, self._name)) if self._mail_on_success: self._success_logger.info('Successfully installed %s %s to %s' % (sourcename, version, self._name)) if self._post_install_script: try: self._logger.debug("Running post-installation script: " + self._post_install_script) self._run_script(target, self._post_install_script) except: self._logger.exception("failure while running post-installation script") return 0 return 1 def _install_changefile_internal(self, changefilename, changefile): sourcename = changefile['source'] version = changefile['version'] incomingdir = os.path.dirname(changefilename) newfiles = [] is_native = not native_version_re.match(version) if is_native: (ignored, newdebianver) = parse_versions(version) else: (newupstreamver, newdebianver) = parse_versions(version) is_sourceful = 0 for file in map(lambda x: x[4], changefile.getFiles()): match = debpackage_re.search(file) if match: arch = match.group(3) if not arch in self._arches: raise DinstallException("Unknown architecture: %s" % (arch)) target = self._arch_target(arch, file) newfiles.append((os.path.join(incomingdir, file), target, match.group(1), arch)) continue match = debsrc_diff_re.search(file) if match: is_sourceful = 1 target = self._source_target(file) newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) continue match = debsrc_orig_re.search(file) if match: is_sourceful = 1 target = self._source_target(file) newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) continue match = debsrc_native_re.search(file) if match: is_sourceful = 1 target = self._source_target(file) newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) continue match = debsrc_dsc_re.search(file) or debsrc_orig_re.search(file) if match: is_sourceful = 1 target = self._source_target(file) newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source')) continue all_arches = {} for arch in map(lambda x: x[3], newfiles): all_arches[arch] = 1 completed = [] oldfiles = [] if not self._keep_old: found_old_bins = 0 for (oldversion, oldarch) in map(lambda x: x[1:], self._get_package_versions()): if not all_arches.has_key(oldarch) and apt_pkg.VersionCompare(oldversion, version) < 0: found_old_bins = 1 for (pkgname, arch) in map(lambda x: x[2:], newfiles): if arch == 'source' and found_old_bins: continue self._logger.debug('Scanning for old files') for file in self._read_arch_dir(arch): match = debpackage_re.search(file) if not match: continue oldpkgname = match.group(1) oldarch = match.group(3) file = self._arch_target(arch, file) if not file in map(lambda x: x[0], oldfiles): target = file + tmp_old_suffix if oldpkgname == pkgname and oldarch == arch: oldfiles.append((file, target)) self._logger.debug('Scanning "%s" for old files' % (self._abspath('source'))) for file in self._read_source_dir(): file = self._source_target(file) if not file in map(lambda x: x[0], oldfiles): target = file + tmp_old_suffix match = debchanges_re.search(file) if not match and is_sourceful: match = debsrc_dsc_re.search(file) or debsrc_diff_re.search(file) if match and match.group(1) == sourcename: oldfiles.append((file, target)) continue # We skip the rest of this if it wasn't a # sourceful upload; really all we do if it isn't # is clean out old .changes files. if not is_sourceful: continue match = debsrc_orig_re.search(file) if match and match.group(1) == sourcename: if not is_native: (oldupstreamver, olddebianver) = parse_versions(match.group(2)) if apt_pkg.VersionCompare(oldupstreamver, newupstreamver) < 0: self._logger.debug('old upstream tarball "%s" version %s < %s, tagging for deletion' % (file, oldupstreamver, newupstreamver)) oldfiles.append((file, target)) continue else: self._logger.debug('keeping upstream tarball "%s" version %s' % (file, oldupstreamver)) continue else: self._logger.debug('old native tarball "%s", tagging for deletion' % (file,)) oldfiles.append((file, target)) continue match = debsrc_native_re.search(file) if match and match.group(1) in map(lambda x: x[2], newfiles): oldfiles.append((file, target)) continue self._clean_targets = map(lambda x: x[1], oldfiles) allrenames = oldfiles + map(lambda x: x[:2], newfiles) try: while not allrenames == []: (oldname, newname) = allrenames[0] do_rename(oldname, newname) completed.append(allrenames[0]) allrenames = allrenames[1:] except OSError, e: logger.exception("Failed to do rename (%s); attempting rollback" % (e.strerror,)) try: self._logger.error(traceback.format_tb(sys.exc_traceback)) except: pass # Unwind to previous state for (newname, oldname) in completed: do_rename(oldname, newname) raise self._clean_targets = [] # remove old files self.clean() def _run_script(self, changefilename, script): if script: script = os.path.expanduser(script) cmd = '%s %s' % (script, changefilename) self._logger.info('Running \"%s\"' % (cmd,)) if not no_act: if not os.access(script, os.X_OK): self._logger.error("Can't execute script \"%s\"" % (script,)) return 1 pid = os.fork() if pid == 0: os.execlp(script, script, changefilename) sys.exit(1) (pid, status) = os.waitpid(pid, 0) if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)): self._logger.error("script \"%s\" exited with error code %d" % (cmd, os.WEXITSTATUS(status))) return 1 return 0 def _reject_changefile(self, changefilename, changefile, exception): sourcename = changefile['source'] version = changefile['version'] incomingdir = os.path.dirname(changefilename) try: f = open(os.path.join(rejectdir, "%s_%s.reason" % (sourcename, version)), 'w') if type(exception) == type('string'): f.write(exception) else: traceback.print_exception(Exception, exception, None, None, f) f.close() for file in map(lambda x: x[4], changefile.getFiles()): if os.access(os.path.join(incomingdir, file), os.R_OK): file = os.path.join(incomingdir, file) else: file = self._abspath(file) target = os.path.join(rejectdir, os.path.basename(file)) do_rename(file, target) self._logger.info('Rejecting "%s": %s' % (changefilename, `exception`)) except Exception: self._logger.error("Unhandled exception while rejecting %s; archive may be in inconsistent state" % (changefilename,)) raise def clean(self): self._logger.debug('Removing old files') for file in self._clean_targets: self._logger.debug('Deleting "%s"' % (file,)) if not no_act: os.unlink(file) class SimpleSubdirArchiveDir(ArchiveDir): def __init__(self, *args, **kwargs): apply(ArchiveDir.__init__, [self] + list(args), kwargs) for arch in list(self._arches) + ['source']: target = os.path.join(self._dir, arch) do_mkdir(target) def _read_source_dir(self): return os.listdir(self._abspath('source')) def _read_arch_dir(self, arch): return os.listdir(self._abspath(arch)) def _arch_target(self, arch, file): return self._abspath(arch, file) def _source_target(self, file): return self._arch_target('source', file) def _get_package_versions(self): ret = [] for arch in self._arches: for file in self._read_arch_dir(arch): match = debpackage_re.search(file) if match: ret.append((match.group(1), match.group(2), match.group(3))) return ret class FlatArchiveDir(ArchiveDir): def _read_source_dir(self): return os.listdir(self._dir) def _read_arch_dir(self, arch): return os.listdir(self._dir) def _arch_target(self, arch, file): return self._abspath(file) def _source_target(self, file): return self._arch_target('source', file) def _get_package_versions(self): ret = [] for file in self._abspath(''): match = debpackage_re.search(file) if match: ret.append((match.group(1), match.group(2), match.group(3))) return ret class ArchiveDirIndexer(threading.Thread): def __init__(self, dir, logger, configdict, use_dnotify=0, batch_mode=1): self._dir = dir self._name = os.path.basename(os.path.abspath(dir)) threading.Thread.__init__(self, name=self._name) self._logger = logger self._eventqueue = Queue.Queue() for key in configdict.keys(): self._logger.debug("Setting \"%s\" => \"%s\" in archive \"%s\"" % ('_'+key, configdict[key], self._name)) self.__dict__['_' + key] = configdict[key] do_mkdir(dir) self._use_dnotify = use_dnotify self._batch_mode = batch_mode self._done_event = threading.Event() def _abspath(self, *args): return os.path.abspath(apply(os.path.join, [self._dir] + list(args))) def _relpath(self, *args): return apply(os.path.join, [self._name] + list(args)) def _make_indexfile(self, dir, type, name): cmdline = ['apt-ftparchive', type, dir] self._logger.debug("Running: " + string.join(cmdline, ' ')) if no_act: return (infd, outfd) = os.pipe() pid = os.fork() if pid == 0: os.chdir(self._dir) os.chdir('..') os.close(infd) misc.dup2(outfd, 1) os.execvp('apt-ftparchive', cmdline) os.exit(1) os.close(outfd) stdout = os.fdopen(infd) packagesfilename = os.path.join(dir, name) newpackagesfilename = packagesfilename + '.new' zpackagesfilename = packagesfilename + '.gz' newzpackagesfilename = newpackagesfilename + '.gz' newpackagesfile = open(newpackagesfilename, 'w') newzpackagesfile = gzip.GzipFile(newzpackagesfilename, 'w') buf = stdout.read(8192) while buf != '': newpackagesfile.write(buf) newzpackagesfile.write(buf) buf = stdout.read(8192) stdout.close() (pid, status) = os.waitpid(pid, 0) if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)): raise DinstallException("apt-ftparchive exited with status code %d" % (status,)) newpackagesfile.close() newzpackagesfile.close() shutil.move(newpackagesfilename, packagesfilename) shutil.move(newzpackagesfilename, zpackagesfilename) def _make_packagesfile(self, dir): self._make_indexfile(dir, 'packages', 'Packages') def _make_sourcesfile(self, dir): self._make_indexfile(dir, 'sources', 'Sources') def _make_releasefile(self): targetname = self._abspath('Release') if not self._generate_release: if os.access(targetname, os.R_OK): self._logger.info("Release generation disabled, removing existing Release file") try: os.unlink(targetname) except OSError, e: pass return tmpname = targetname + tmp_new_suffix release_needed = 0 uncompr_indexfiles = self._get_all_indexfiles() indexfiles = [] comprexts = ['.gz'] for index in uncompr_indexfiles: indexfiles = indexfiles + [index] for ext in comprexts: indexfiles = indexfiles + [index + ext] if os.access(targetname, os.R_OK): release_mtime = os.stat(targetname)[stat.ST_MTIME] for file in indexfiles: if release_needed: break if os.stat(self._abspath(file))[stat.ST_MTIME] > release_mtime: release_needed = 1 else: release_needed = 1 if not release_needed: self._logger.info("Skipping Release generation") return self._logger.info("Generating Release...") if no_act: self._logger.info("Release generation complete") return f = open(tmpname, 'w') f.write('Origin: ' + self._release_origin + '\n') f.write('Label: ' + self._release_label + '\n') suite = self._release_suite if not suite: suite = self._name f.write('Suite: ' + suite + '\n') codename = self._release_codename if not codename: codename = suite f.write('Codename: ' + codename + '\n') f.write('Date: ' + time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) + '\n') f.write('Architectures: ' + string.join(self._arches, ' ') + '\n') if self._release_description: f.write('Description: ' + self._release_description + '\n') f.write('MD5Sum:\n') for file in indexfiles: absfile = self._abspath(file) md5sum = self._get_file_sum('md5', absfile) size = os.stat(absfile)[stat.ST_SIZE] f.write(' %s% 16d %s\n' % (md5sum, size, file)) f.write('SHA1:\n') for file in indexfiles: absfile = self._abspath(file) shasum = self._get_file_sum('sha1', absfile) size = os.stat(absfile)[stat.ST_SIZE] f.write(' %s% 16d %s\n' % (shasum, size, file)) f.close() if self._sign_releasefile(tmpname): os.rename(tmpname, targetname) self._logger.info("Release generation complete") def _sign_releasefile(self, name): if self._release_signscript: try: self._logger.debug("Running Release signing script: " + self._release_signscript) if self._run_script(name, self._release_signscript, dir=self._abspath()): return None except: self._logger.exception("failure while running Release signature script") return None return 1 # Copied from ArchiveDir def _run_script(self, changefilename, script, dir=None): if script: script = os.path.expanduser(script) cmd = '%s %s' % (script, changefilename) self._logger.info('Running \"%s\"' % (cmd,)) if not no_act: if not os.access(script, os.X_OK): self._logger.error("Can't execute script \"%s\"" % (script,)) return 1 pid = os.fork() if pid == 0: if dir: os.chdir(dir) os.execlp(script, script, changefilename) sys.exit(1) (pid, status) = os.waitpid(pid, 0) if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)): self._logger.error("script \"%s\" exited with error code %d" % (cmd, os.WEXITSTATUS(status))) return 1 return 0 # Hacked up from ChangeFile.py; FIXME: merge the two def _get_file_sum(self, type, filename): if os.access('/usr/bin/%ssum' % (type,), os.X_OK): cmd = '/usr/bin/%ssum %s' % (type, filename,) self._logger.debug("Running: %s" % (cmd,)) child = popen2.Popen3(cmd, 1) child.tochild.close() erroutput = child.childerr.read() child.childerr.close() if erroutput != '': child.fromchild.close() raise DinstallException("%ssum returned error output \"%s\"" % (type, erroutput,)) (sum, filename) = string.split(child.fromchild.read(), None, 1) child.fromchild.close() status = child.wait() if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)): if os.WIFEXITED(status): msg = "%ssum exited with error code %d" % (type, os.WEXITSTATUS(status),) elif os.WIFSTOPPED(status): msg = "%ssum stopped unexpectedly with signal %d" % (type, os.WSTOPSIG(status),) elif os.WIFSIGNALED(status): msg = "%ssum died with signal %d" % (type, os.WTERMSIG(status),) raise DinstallException(msg) return sum.strip() if type == 'md5': import md5 f = open(filename) md5sum = md5.new() buf = f.read(8192) while buf != '': md5sum.update(buf) buf = f.read(8192) return md5sum.hexdigest() elif type == 'sha1': import sha f = open(filename) shasum = sha.new() buf = f.read(8192) while buf != '': shasum.update(buf) buf = f.read(8192) return shasum.hexdigest() else: raise DinstallException('cannot compute hash of type %s; no builtin method or /usr/bin/%ssum', type, type) def _index_all(self, force=None): self._index(self._arches + ['source'], force) def run(self): self._logger.info('Created new thread (%s) for archive indexer %s' % (self.getName(), self._name,)) self._logger.info('Entering batch mode...') try: self._index_all(1) self._make_releasefile() if not self._batch_mode: # never returns self._daemonize() self._done_event.set() except Exception, e: self._logger.exception("Unhandled exception; shutting down") die_event.set() self._done_event.set() self._logger.info('Thread \"%s\" exiting' % (self.getName(),)) def _daemon_event_ispending(self): return die_event.isSet() or (not self._eventqueue.empty()) def _daemonize(self): self._logger.info('Entering daemon mode...') if self._dynamic_reindex: self._dnotify = DirectoryNotifierFactory().create(self._get_dnotify_dirs(), use_dnotify=self._use_dnotify, poll_time=self._poll_time, cancel_event=die_event) self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name=self._name + " Indexer") self._async_dnotify.start() # The main daemon loop while 1: # Wait until we have a pending event while not self._daemon_event_ispending(): time.sleep(1) if die_event.isSet(): break self._logger.debug('Reading from event queue') setevent = None dir = None obj = self._eventqueue.get() if type(obj) == type(''): self._logger.debug('got dir change') dir = obj elif type(obj) == type(None): self._logger.debug('got general event') setevent = None elif obj.__class__ == threading.Event().__class__: self._logger.debug('got wait_reprocess event') setevent = obj else: self._logger.error("unknown object %s in event queue" % (obj,)) assert None # This is to protect against both lots of activity, and to # prevent race conditions, so we can rely on timestamps. time.sleep(1) if not self._reindex_needed(): if setevent: self._logger.debug('setting wait_reprocess event') setevent.set() continue if dir is None: self._logger.debug('Got general change') self._index_all(1) else: self._logger.debug('Got change in %s' % (dir,)) self._index([os.path.basename(os.path.abspath(dir))]) self._make_releasefile() if setevent: self._logger.debug('setting wait_reprocess event') setevent.set() def _reindex_needed(self): reindex_needed = 0 if os.access(self._abspath('Release.gpg'), os.R_OK): gpg_mtime = os.stat(self._abspath('Release.gpg'))[stat.ST_MTIME] for dir in self._get_dnotify_dirs(): dir_mtime = os.stat(self._abspath(dir))[stat.ST_MTIME] if dir_mtime > gpg_mtime: reindex_needed = 1 else: reindex_needed = 1 return reindex_needed def _index(self, arches, force=None): self._index_impl(arches, force=force) def wait_reprocess(self): e = threading.Event() self._eventqueue.put(e) self._logger.debug('waiting on reprocess') while not (e.isSet() or die_event.isSet()): time.sleep(0.5) self._logger.debug('done waiting on reprocess') def wait(self): self._done_event.wait() def notify(self): self._eventqueue.put(None) class SimpleSubdirArchiveDirIndexer(ArchiveDirIndexer): def __init__(self, *args, **kwargs): apply(ArchiveDirIndexer.__init__, [self] + list(args), kwargs) for arch in list(self._arches) + ['source']: target = os.path.join(self._dir, arch) do_mkdir(target) def _index_impl(self, arches, force=None): for arch in arches: dirmtime = os.stat(self._relpath(arch))[stat.ST_MTIME] if arch != 'source': pkgsfile = self._relpath(arch, 'Packages') if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Packages file for %s...' % (arch,)) self._make_packagesfile(self._relpath(arch)) self._logger.info('Packages generation complete') else: self._logger.info('Skipping generation of Packages file for %s' % (arch,)) else: pkgsfile = self._relpath(arch, 'Sources') if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Sources file for %s...' % (arch,)) self._make_sourcesfile(self._relpath('source')) self._logger.info('Sources generation complete') else: self._logger.info('Skipping generation of Sources file for %s' % (arch,)) def _in_archdir(self, *args): return apply(lambda x,self=self: self._abspath(x), args) def _get_dnotify_dirs(self): return map(lambda x, self=self: self._abspath(x), self._arches + ['source']) def _get_all_indexfiles(self): return map(lambda arch: os.path.join(arch, 'Packages'), self._arches) + ['source/Sources'] class FlatArchiveDirIndexer(ArchiveDirIndexer): def __init__(self, *args, **kwargs): apply(ArchiveDirIndexer.__init__, [self] + list(args), kwargs) def _index_impl(self, arches, force=None): pkgsfile = self._abspath('Packages') dirmtime = os.stat(self._relpath())[stat.ST_MTIME] if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Packages file...') self._make_packagesfile(self._relpath()) self._logger.info('Packages generation complete') else: self._logger.info('Skipping generation of Packages file') pkgsfile = self._abspath('Sources') if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]: self._logger.info('Generating Sources file...') self._make_sourcesfile(self._relpath()) self._logger.info('Sources generation complete') else: self._logger.info('Skipping generation of Sources file') def _in_archdir(self, *args): return apply(lambda x,self=self: self._abspath(x), args[1:]) def _get_dnotify_dirs(self): return [self._dir] def _get_all_indexfiles(self): return ['Packages', 'Sources'] if os.access(lockfilename, os.R_OK): logger.critical("lockfile \"%s\" exists (pid %s): is another mini-dinstall running?" % (lockfilename, open(lockfilename).read(10))) logging.shutdown() sys.exit(1) logger.debug('Creating lock file: ' + lockfilename) if not no_act: lockfile = open(lockfilename, 'w') lockfile.close() if not batch_mode: # daemonize logger.debug("Daemonizing...") if os.fork() == 0: os.setsid() if os.fork() != 0: sys.exit(0) else: sys.exit(0) sys.stdin.close() sys.stdout.close() sys.stderr.close() os.close(0) os.close(1) os.close(2) # unix file descriptor allocation ensures that the followin are fd 0,1,2 sys.stdin = open("/dev/null") sys.stdout = open("/dev/null") sys.stderr = open("/dev/null") logger.debug("Finished daemonizing (pid %s)" % (os.getpid(),)) lockfile = open(lockfilename, 'w') lockfile.write("%s" % (os.getpid(),)) lockfile.close() if not (debug_mode or batch_mode): # Don't log to stderr past this point logger.removeHandler(stderr_handler) archivemap = {} # Instantiaate archive classes for installing files for dist in distributions.keys(): if distributions[dist]['archive_style'] == 'simple-subdir': newclass = SimpleSubdirArchiveDir else: newclass = FlatArchiveDir archivemap[dist] = [newclass(dist, logger, distributions[dist], batch_mode=batch_mode, keyrings=default_keyrings, extra_keyrings=default_extra_keyrings), None] # Create archive indexing threads, but don't start them yet for dist in distributions.keys(): targetdir = os.path.join(toplevel_directory, dist) logger.info('Initializing archive indexer %s' % (dist,)) if distributions[dist]['archive_style'] == 'simple-subdir': newclass = SimpleSubdirArchiveDirIndexer else: newclass = FlatArchiveDirIndexer archive = newclass(targetdir, logger, distributions[dist], use_dnotify=use_dnotify, batch_mode=batch_mode) archivemap[dist][1] = archive # Now: kick off the incoming processor logger.info('Initializing incoming processor') incoming = IncomingDir(incoming_subdir, archivemap, logger, trigger_reindex=trigger_reindex, poll_time=default_poll_time, max_retry_time=default_max_retry_time, batch_mode=batch_mode, verify_sigs=default_verify_sigs) logger.debug('Starting incoming processor') incoming.start() if batch_mode: logger.debug('Waiting for incoming processor to finish') incoming.wait() # Once we've installed everything, start the indexing threads for dist in distributions.keys(): archive = archivemap[dist][1] logger.debug('Starting archive %s' % (archive.getName(),)) archive.start() # Wait for all the indexing threads to finish; none of these ever # return if we're in daemon mode if batch_mode: for dist in distributions.keys(): archive = archivemap[dist][1] logger.debug('Waiting for archive %s to finish' % (archive.getName(),)) archive.wait() else: logger.debug("Waiting for die event") die_event.wait() logger.info('Die event caught; waiting for incoming processor to finish') incoming.wait() for dist in distributions.keys(): archive = archivemap[dist][1] logger.info('Die event caught; waiting for archive %s to finish' % (archive.getName(),)) archive.wait() #logging.shutdown() logger.debug('Removing lock file: ' + lockfilename) os.unlink(lockfilename) logger.info("main thread exiting...") sys.exit(0) # vim:ts=4:sw=4:et: