aboutsummaryrefslogtreecommitdiff
path: root/mini-dinstall
diff options
context:
space:
mode:
Diffstat (limited to 'mini-dinstall')
-rwxr-xr-xmini-dinstall1483
1 files changed, 1483 insertions, 0 deletions
diff --git a/mini-dinstall b/mini-dinstall
new file mode 100755
index 0000000..e8386a2
--- /dev/null
+++ b/mini-dinstall
@@ -0,0 +1,1483 @@
+#!/usr/bin/python
+# -*- mode: python; coding: utf-8 -*-
+# Miniature version of "dinstall", for installing .changes into an
+# archive
+# Copyright © 2002,2003 Colin Walters <walters@gnu.org>
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+import os, sys, re, glob, getopt, time, traceback, gzip, getpass, socket
+import signal, threading, select, Queue, SocketServer
+import logging, logging.handlers
+#logging.basicConfig()
+import apt_pkg
+apt_pkg.init()
+from ConfigParser import *
+
+from minidinstall.ChangeFile import *
+from minidinstall.Dnotify import *
+from minidinstall.DebianSigVerifier import *
+from minidinstall.GPGSigVerifier import *
+from minidinstall.version import *
+from minidinstall import misc
+
+debchanges_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.changes$')
+debpackage_re = re.compile('([-a-z0-9+.]+)_(.+?)_([-a-zA-Z0-9]+)\.u?deb$')
+debsrc_dsc_re = re.compile('([-a-z0-9+.]+)_(.+?)\.dsc$')
+debsrc_diff_re = re.compile('([-a-z0-9+.]+)_(.+?)\.diff\.gz$')
+debsrc_orig_re = re.compile('([-a-z0-9+.]+)_(.+?)\.orig\.tar\.gz$')
+debsrc_native_re = re.compile('([-a-z0-9+.]+)_(.+?)\.tar\.gz$')
+
+native_version_re = re.compile('\s*.*-');
+
+toplevel_directory = None
+tmp_new_suffix = '.dinstall-new'
+tmp_old_suffix = '.dinstall-old'
+dinstall_subdir = 'mini-dinstall'
+incoming_subdir = 'incoming'
+socket_name = 'master'
+logfile_name = 'mini-dinstall.log'
+configfile_names = ['/etc/mini-dinstall.conf', '~/.mini-dinstall.conf']
+use_dnotify = 0
+mail_on_success = 1
+default_poll_time = 30
+default_max_retry_time = 2 * 24 * 60 * 60
+default_mail_log_level = logging.ERROR
+trigger_reindex = 1
+mail_log_flush_level = logging.ERROR
+mail_log_flush_count = 10
+mail_to = getpass.getuser()
+mail_server = 'localhost'
+incoming_permissions = 0750
+
+default_architectures = ["all", "i386"]
+default_distributions = ("unstable",)
+
+distributions = {}
+scantime = 60
+
+def usage(ecode, ver_only=None):
+ print "mini-dinstall", pkg_version
+ if ver_only:
+ sys.exit(ecode)
+ print "Copyright (C) 2002 Colin Walters <walters@gnu.org>"
+ print "Licensed under the GNU GPL."
+ print "Usage: mini-dinstall [OPTIONS...] [DIRECTORY]"
+ print "Options:"
+ print " -v, --verbose\t\tDisplay extra information"
+ print " -q, --quiet\t\tDisplay less information"
+ print " -c, --config=FILE\tParse configuration info from FILE"
+ print " -d, --debug\t\tOutput information to stdout as well as log"
+ print " --no-log\t\tDon't write information to log file"
+ print " -n, --no-act\t\tDon't actually perform changes"
+ print " -b, --batch\t\tDon't daemonize; run once, then exit"
+ print " -r, --run\t\tProcess queue immediately"
+ print " -k, --kill\t\tKill the running mini-dinstall"
+ print " --help\t\tWhat you're looking at"
+ print " --version\t\tPrint the software version and exit"
+ sys.exit(ecode)
+
+try:
+ opts, args = getopt.getopt(sys.argv[1:], 'vqc:dnbrk',
+ ['verbose', 'quiet', 'config=', 'debug', 'no-log',
+ 'no-act', 'batch', 'run', 'kill', 'help', 'version', ])
+except getopt.GetoptError, e:
+ sys.stderr.write("Error reading arguments: %s\n" % e)
+ usage(1)
+for (key, val) in opts:
+ if key == '--help':
+ usage(0)
+ elif key == '--version':
+ usage(0, ver_only=1)
+if len(args) > 1:
+ sys.stderr.write("Unknown arguments: %s\n" % args[1:])
+ usage(1)
+
+# don't propagate exceptions that happen while logging
+logging.raiseExceptions = 0
+
+logger = logging.getLogger("mini-dinstall")
+
+loglevel = logging.WARN
+no_act = 0
+debug_mode = 0
+run_mode = 0
+kill_mode = 0
+no_log = 0
+batch_mode = 0
+custom_config_files = 0
+for key, val in opts:
+ if key in ('-v', '--verbose'):
+ if loglevel == logging.INFO:
+ loglevel = logging.DEBUG
+ elif loglevel == logging.WARN:
+ loglevel = logging.INFO
+ elif key in ('-q', '--quiet'):
+ if loglevel == logging.WARN:
+ loglevel = logging.ERROR
+ elif loglevel == logging.WARN:
+ loglevel = logging.CRITICAL
+ elif key in ('-c', '--config'):
+ if not custom_config_files:
+ custom_config_files = 1
+ configfile_names = []
+ configfile_names.append(os.path.abspath(os.path.expanduser(val)))
+ elif key in ('-n', '--no-act'):
+ no_act = 1
+ elif key in ('-d', '--debug'):
+ debug_mode = 1
+ elif key in ('--no-log',):
+ no_log = 1
+ elif key in ('-b', '--batch'):
+ batch_mode = 1
+ elif key in ('-r', '--run'):
+ run_mode = 1
+ elif key in ('-k', '--kill'):
+ kill_mode = 1
+
+def do_mkdir(name):
+ if os.access(name, os.X_OK):
+ return
+ try:
+ logger.info('Creating directory "%s"' % (name))
+ except:
+ pass
+ if not no_act:
+ os.mkdir(name)
+
+def do_rename(source, target):
+ try:
+ logger.debug('Renaming "%s" to "%s"' % (source, target))
+ except:
+ pass
+ if not no_act:
+ os.rename(source, target)
+
+def do_chmod(name, mode):
+ try:
+ logger.info('Changing mode of "%s" to %o' % (name, mode))
+ except:
+ pass
+ if not no_act:
+ os.chmod(name, mode)
+
+logger.setLevel(logging.DEBUG)
+stderr_handler = logging.StreamHandler(strm=sys.stderr)
+stderr_handler.setLevel(loglevel)
+logger.addHandler(stderr_handler)
+stderr_handler.setLevel(loglevel)
+stderr_handler.setFormatter(logging.Formatter(fmt="%(name)s [%(thread)d] %(levelname)s: %(message)s"))
+
+configp = ConfigParser()
+configfile_names = map(lambda x: os.path.abspath(os.path.expanduser(x)), configfile_names)
+logger.debug("Reading config files: %s" % (configfile_names,))
+configp.read(configfile_names)
+
+class SubjectSpecifyingLoggingSMTPHandler(logging.handlers.SMTPHandler):
+ def __init__(self, subject, *args, **kwargs):
+ self._subject = subject
+ apply(logging.handlers.SMTPHandler.__init__, [self] + list(args) + ['dummy'], kwargs)
+
+ def getSubject(self, record):
+ return re.sub('%l', record.levelname, self._subject)
+
+if not (configp.has_option('DEFAULT', 'mail_log_level') and configp.get('DEFAULT', 'mail_log_level') == 'NONE'):
+ if configp.has_option('DEFAULT', 'mail_log_level'):
+ mail_log_level = logging.__dict__[configp.get('DEFAULT', 'mail_log_level')]
+ else:
+ mail_log_level = default_mail_log_level
+ if configp.has_option('DEFAULT', 'mail_to'):
+ mail_to = configp.get('DEFAULT', 'mail_to')
+ if configp.has_option('DEFAULT', 'mail_server'):
+ mail_server = configp.get('DEFAULT', 'mail_server')
+ if configp.has_option('DEFAULT', 'mail_log_flush_count'):
+ mail_log_flush_count = configp.getint('DEFAULT', 'mail_log_flush_count')
+ if configp.has_option('DEFAULT', 'mail_log_flush_level'):
+ mail_log_flush_level = logging.__dict__[configp.get('DEFAULT', 'mail_log_flush_level')]
+ mail_smtp_handler = SubjectSpecifyingLoggingSMTPHandler('mini-dinstall log notice (%l)', mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(),socket.gethostname()), [mail_to])
+ mail_handler = logging.handlers.MemoryHandler(mail_log_flush_count, flushLevel=mail_log_flush_level, target=mail_smtp_handler)
+
+ mail_handler.setLevel(mail_log_level)
+ logger.addHandler(mail_handler)
+
+if configp.has_option('DEFAULT', 'archivedir'):
+ toplevel_directory = os.path.expanduser(configp.get('DEFAULT', 'archivedir'))
+elif len(args) > 0:
+ toplevel_directory = args[0]
+else:
+ logger.error("No archivedir specified on command line or in config files.")
+ sys.exit(1)
+
+if configp.has_option('DEFAULT', 'incoming_permissions'):
+ incoming_permissions = int(configp.get('DEFAULT', 'incoming_permissions'), 8)
+
+do_mkdir(toplevel_directory)
+dinstall_subdir = os.path.join(toplevel_directory, dinstall_subdir)
+do_mkdir(dinstall_subdir)
+
+lockfilename = os.path.join(dinstall_subdir, 'mini-dinstall.lock')
+
+def process_exists(pid):
+ try:
+ os.kill(pid, 0)
+ except OSError, e:
+ return 0
+ return 1
+
+if os.access(lockfilename, os.R_OK):
+ pid = int(open(lockfilename).read())
+ if not process_exists(pid):
+ if run_mode:
+ logger.error("No process running at %d; use mini-dinstall -k to remove lockfile")
+ sys.exit(1)
+ logger.warn("No process running at %d, removing lockfile" % (pid,))
+ os.unlink(lockfilename)
+ if kill_mode:
+ sys.exit(0)
+
+if not os.path.isabs(socket_name):
+ socket_name = os.path.join(dinstall_subdir, socket_name)
+
+if run_mode or kill_mode:
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ logger.debug('Connecting...')
+ sock.connect(socket_name)
+ if run_mode:
+ logger.debug('Sending RUN command')
+ sock.send('RUN\n')
+ else:
+ logger.debug('Sending DIE command')
+ sock.send('DIE\n')
+ logger.debug('Reading response')
+ response = sock.recv(8192)
+ print response
+ sys.exit(0)
+
+if configp.has_option('DEFAULT', 'logfile'):
+ logfile_name = configp.get('DEFAULT', 'logfile')
+
+if not no_log:
+ if not os.path.isabs(logfile_name):
+ logfile_name = os.path.join(dinstall_subdir, logfile_name)
+ logger.debug("Adding log file: %s" % (logfile_name,))
+ filehandler = logging.FileHandler(logfile_name)
+ if loglevel == logging.WARN:
+ filehandler.setLevel(logging.INFO)
+ else:
+ filehandler.setLevel(logging.DEBUG)
+ logger.addHandler(filehandler)
+ filehandler.setFormatter(logging.Formatter(fmt="%(asctime)s %(name)s [%(thread)d] %(levelname)s: %(message)s", datefmt="%b %d %H:%M:%S"))
+
+logger.info('Booting mini-dinstall ' + pkg_version)
+
+class DinstallException(Exception):
+ def __init__(self, value):
+ self._value = value
+ def __str__(self):
+ return `self._value`
+
+if not configp.has_option('DEFAULT', 'archive_style'):
+ logger.critical("You must set the default archive_style option (since version 0.4.0)")
+ logging.shutdown()
+ sys.exit(1)
+
+default_verify_sigs = os.access('/usr/share/keyrings/debian-keyring.gpg', os.R_OK)
+default_extra_keyrings = []
+default_keyrings = None
+
+if configp.has_option('DEFAULT', 'architectures'):
+ default_architectures = string.split(configp.get('DEFAULT', 'architectures'), ', ')
+if configp.has_option('DEFAULT', 'verify_sigs'):
+ default_verify_sigs = configp.getboolean('DEFAULT', 'verify_sigs')
+if configp.has_option('DEFAULT', 'trigger_reindex'):
+ default_trigger_reindex = configp.getboolean('DEFAULT', 'trigger_reindex')
+if configp.has_option('DEFAULT', 'poll_time'):
+ default_poll_time = configp.getint('DEFAULT', 'poll_time')
+if configp.has_option('DEFAULT', 'max_retry_time'):
+ default_max_retry_time = configp.getint('DEFAULT', 'max_retry_time')
+if configp.has_option('DEFAULT', 'extra_keyrings'):
+ default_extra_keyrings = re.split(', ?', configp.get('DEFAULT', 'extra_keyrings'))
+if configp.has_option('DEFAULT', 'keyrings'):
+ default_keyrings = re.split(', ?', configp.get('DEFAULT', 'keyrings'))
+if configp.has_option('DEFAULT', 'use_dnotify'):
+ use_dnotify = configp.getboolean('DEFAULT', 'use_dnotify')
+
+sects = configp.sections()
+if not len(sects) == 0:
+ for sect in sects:
+ distributions[sect] = {}
+ if configp.has_option(sect, "architectures"):
+ distributions[sect]["arches"] = string.split(configp.get(sect, "architectures"), ', ')
+ else:
+ distributions[sect]["arches"] = default_architectures
+else:
+ for dist in default_distributions:
+ distributions[dist] = {"arches": default_architectures}
+
+class DistOptionHandler:
+ def __init__(self, distributions, configp):
+ self._configp = configp
+ self._distributions = distributions
+ self._optionmap = {}
+ self._optionmap['poll_time'] = ['int', default_poll_time]
+ # two days
+ self._optionmap['max_retry_time'] = ['int', default_max_retry_time]
+ self._optionmap['post_install_script'] = ['str', None]
+ self._optionmap['pre_install_script'] = ['str', None]
+ self._optionmap['dynamic_reindex'] = ['bool', 1]
+ self._optionmap['chown_changes_files'] = ['bool', 1]
+ self._optionmap['keep_old'] = ['bool', None]
+ self._optionmap['mail_on_success'] = ['bool', 1]
+ self._optionmap['archive_style'] = ['str', None]
+ # Release file stuff
+ self._optionmap['generate_release'] = ['bool', 0]
+ self._optionmap['release_origin'] = ['str', getpass.getuser()]
+ self._optionmap['release_label'] = ['str', self._optionmap['release_origin'][1]]
+ self._optionmap['release_suite'] = ['str', None]
+ self._optionmap['release_codename'] = ['str', None]
+ self._optionmap['release_description'] = ['str', None]
+ self._optionmap['release_signscript'] = ['str', None]
+
+ def get_option_map(self, dist):
+ ret = self._distributions[dist]
+ for key in self._optionmap.keys():
+ type = self._optionmap[key][0]
+ ret[key] = self._optionmap[key][1]
+ if self._configp.has_option ('DEFAULT', key):
+ ret[key] = self.get_option (type, 'DEFAULT', key)
+ if self._configp.has_option (dist, key):
+ ret[key] = self.get_option (type, dist, key)
+ return ret
+
+ def get_option (self, type, dist, key):
+ if type == 'int':
+ return self._configp.getint(dist, key)
+ elif type == 'str':
+ return self._configp.get(dist, key)
+ elif type == 'bool':
+ return self._configp.getboolean(dist, key)
+
+ assert(None)
+
+
+distoptionhandler = DistOptionHandler(distributions, configp)
+
+for dist in distributions.keys():
+ distributions[dist] = distoptionhandler.get_option_map(dist)
+ if not distributions[dist]['archive_style'] in ('simple-subdir', 'flat'):
+ raise DinstallException("Unknown archive style \"%s\"" % (distributions[dist]['archive_style'],))
+
+logger.debug("Distributions: %s" % (distributions,))
+
+# class DinstallTransaction:
+# def __init__(self, dir):
+# self._dir = dir
+
+# def start(self, pkgname):
+# self._pkgname = pkgname
+# self._transfilename = os.path.join(dir, pkgname + ".transaction")
+
+# def _log_op(self, type, state, str):
+# tmpfile = self._transfilename + ".tmp"
+# if (os.access(self._transfilename), os.R_OK):
+# shutil.copyFile(self._transfilename, tmpfile)
+# f = open(tmpfile, 'w')
+# f.write('%s %s' % (type, str) )
+# f.close()
+
+# def _start_op(self, type, str):
+# self._log_op(type, 'start', str)
+
+# def _stop_op(self, type, str):
+# self._log_op(type, 'stop', str)
+
+# def renameFile(self, source, dst):
+# self._start_op('rename',
+
+
+# def _sync():
+# os.system("sync")
+os.chdir(toplevel_directory)
+do_mkdir(dinstall_subdir)
+rejectdir = os.path.join(dinstall_subdir, 'REJECT')
+incoming_subdir = os.path.join(dinstall_subdir, incoming_subdir)
+do_mkdir(rejectdir)
+do_mkdir(incoming_subdir)
+do_chmod(incoming_subdir, incoming_permissions)
+
+## IPC stuff
+# Used by all threads to determine whether or not they should exit
+die_event = threading.Event()
+
+# These global variables are used in IncomingDir::daemonize
+# I couldn't figure out any way to pass state to a BaseRequestHandler.
+reprocess_needed = threading.Event()
+reprocess_finished = threading.Event()
+
+reprocess_lock = threading.Lock()
+class IncomingDirRequestHandler(SocketServer.StreamRequestHandler, SocketServer.BaseRequestHandler):
+ def handle(self):
+ logger.debug('Got request from %s' % (self.client_address,))
+ req = self.rfile.readline()
+ if req == 'RUN\n':
+ logger.debug('Doing RUN command')
+ reprocess_lock.acquire()
+ reprocess_needed.set()
+ logger.debug('Waiting on reprocessing')
+ reprocess_finished.wait()
+ reprocess_finished.clear()
+ reprocess_lock.release()
+ self.wfile.write('200 Reprocessing complete\n')
+ elif req == 'DIE\n':
+ logger.debug('Doing DIE command')
+ self.wfile.write('200 Beginning shutdown\n')
+ die_event.set()
+ else:
+ logger.debug('Got unknown command %s' % (req,))
+ self.wfile.write('500 Unknown request\n')
+
+class ExceptionThrowingThreadedUnixStreamServer(SocketServer.ThreadingUnixStreamServer):
+ def handle_error(self, request, client_address):
+ self._logger.exception("Unhandled exception during request processing; shutting down")
+ die_event.set()
+
+class IncomingDir(threading.Thread):
+ def __init__(self, dir, archivemap, logger, trigger_reindex=1, poll_time=30, max_retry_time=172800, batch_mode=0, verify_sigs=0):
+ threading.Thread.__init__(self, name="incoming")
+ self._dir = dir
+ self._archivemap = archivemap
+ self._logger = logger
+ self._trigger_reindex = trigger_reindex
+ self._poll_time = poll_time
+ self._batch_mode = batch_mode
+ self._verify_sigs = verify_sigs
+ self._max_retry_time = max_retry_time
+ self._last_failed_targets = {}
+ self._eventqueue = Queue.Queue()
+ self._done_event = threading.Event()
+ # ensure we always have some reprocess queue
+ self._reprocess_queue = {}
+
+ def run(self):
+ self._logger.info('Created new installer thread (%s)' % (self.getName(),))
+ self._logger.info('Entering batch mode...')
+ initial_reprocess_queue = []
+ initial_fucked_list = []
+ try:
+ for (changefilename, changefile) in self._get_changefiles():
+ if self._changefile_ready(changefilename, changefile):
+ try:
+ self._install_changefile(changefilename, changefile, 0)
+ except Exception:
+ logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,))
+ initial_fucked_list.append(changefilename)
+ else:
+ self._logger.warn('Skipping "%s"; upload incomplete' % (changefilename,))
+ initial_reprocess_queue.append(changefilename)
+ if not self._batch_mode:
+ self._daemonize(initial_reprocess_queue, initial_fucked_list)
+ self._done_event.set()
+ self._logger.info('All packages in incoming dir installed; exiting')
+ except Exception, e:
+ self._logger.exception("Unhandled exception; shutting down")
+ die_event.set()
+ self._done_event.set()
+ return 0
+
+ def _abspath(self, *args):
+ return os.path.abspath(apply(os.path.join, [self._dir] + list(args)))
+
+ def _get_changefiles(self):
+ ret = []
+ globpath = self._abspath("*.changes")
+ self._logger.debug("glob: " + globpath)
+ changefilenames = glob.glob(globpath)
+ for changefilename in changefilenames:
+ if not self._reprocess_queue.has_key(changefilename):
+ self._logger.info('Examining "%s"' % (changefilename,))
+ changefile = ChangeFile()
+ try:
+ changefile.load_from_file(changefilename)
+ except ChangeFileException:
+ self._logger.debug("Unable to parse \"%s\", skipping" % (changefilename,))
+ continue
+ ret.append((changefilename, changefile))
+ else:
+ self._logger.debug('Skipping "%s" during new scan because it is in the reprocess queue.' % (changefilename,))
+ return ret
+
+ def _changefile_ready(self, changefilename, changefile):
+ try:
+ dist = changefile['distribution']
+ except KeyError, e:
+ self._logger.warn("Unable to read distribution field for \"%s\"; data: %s" % (changefilename, changefile,))
+ return 0
+ try:
+ changefile.verify(self._abspath(''))
+ except ChangeFileException:
+ return 0
+ return 1
+
+ def _install_changefile(self, changefilename, changefile, doing_reprocess):
+ dist = changefile['distribution']
+ if not dist in self._archivemap.keys():
+ raise DinstallException('Unknown distribution "%s" in \"%s\"' % (dist, changefilename,))
+ logger.debug('Installing %s in archive %s' % (changefilename, self._archivemap[dist][1].getName()))
+ self._archivemap[dist][0].install(changefilename, changefile, self._verify_sigs)
+ if self._trigger_reindex:
+ if doing_reprocess:
+ logger.debug('Waiting on archive %s to reprocess' % (self._archivemap[dist][1].getName()))
+ self._archivemap[dist][1].wait_reprocess()
+ else:
+ logger.debug('Notifying archive %s of change' % (self._archivemap[dist][1].getName()))
+ self._archivemap[dist][1].notify()
+ logger.debug('Finished processing %s' % (changefilename))
+
+ def _reject_changefile(self, changefilename, changefile, e):
+ dist = changefile['distribution']
+ if not dist in self._archivemap:
+ raise DinstallException('Unknown distribution "%s" in \"%s\"' % (dist, changefilename,))
+ self._archivemap[dist][0].reject(changefilename, changefile, e)
+
+ def _daemon_server_isready(self):
+ (inready, outready, exready) = select.select([self._server.fileno()], [], [], 0)
+ return len(inready) > 0
+
+ def _daemon_event_ispending(self):
+ return die_event.isSet() or reprocess_needed.isSet() or self._daemon_server_isready() or (not self._eventqueue.empty())
+
+ def _daemon_reprocess_pending(self):
+ curtime = time.time()
+ for changefilename in self._reprocess_queue.keys():
+ (starttime, nexttime, delay) = self._reprocess_queue[changefilename]
+ if curtime >= nexttime:
+ return 1
+ return 0
+
+ def _daemonize(self, init_reprocess_queue, init_fucked_list):
+ self._logger.info('Entering daemon mode...')
+ self._dnotify = DirectoryNotifierFactory().create([self._dir], use_dnotify=use_dnotify, poll_time=self._poll_time, cancel_event=die_event)
+ self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name="Incoming watcher")
+ self._async_dnotify.start()
+ try:
+ os.unlink(socket_name)
+ except OSError, e:
+ pass
+ self._server = ExceptionThrowingThreadedUnixStreamServer(socket_name, IncomingDirRequestHandler)
+ self._server.allow_reuse_address = 1
+ retry_time = 30
+ self._reprocess_queue = {}
+ fucked = init_fucked_list
+ doing_reprocess = 0
+ # Initialize the reprocessing queue
+ for changefilename in init_reprocess_queue:
+ curtime = time.time()
+ self._reprocess_queue[changefilename] = [curtime, curtime, retry_time]
+
+ # The main daemon loop
+ while 1:
+ # Wait until we have something to do
+ while not (self._daemon_event_ispending() or self._daemon_reprocess_pending()):
+ time.sleep(0.5)
+
+ self._logger.debug('Checking for pending server requests')
+ if self._daemon_server_isready():
+ self._logger.debug('Handling one request')
+ self._server.handle_request()
+
+ self._logger.debug('Checking for DIE event')
+ if die_event.isSet():
+ self._logger.debug('DIE event caught')
+ break
+
+ self._logger.debug('Scanning for changes')
+ # do we have anything to reprocess?
+ for changefilename in self._reprocess_queue.keys():
+ (starttime, nexttime, delay) = self._reprocess_queue[changefilename]
+ curtime = time.time()
+ try:
+ changefile = ChangeFile()
+ changefile.load_from_file(changefilename)
+ except (ChangeFileException,IOError), e:
+ if not os.path.exists(changefilename):
+ self._logger.info('Changefile "%s" got removed' % (changefilename,))
+ else:
+ self._logger.exception("Unable to load change file \"%s\"" % (changefilename,))
+ self._logger.warn("Marking \"%s\" as screwed" % (changefilename,))
+ fucked.append(changefilename)
+ del self._reprocess_queue[changefilename]
+ continue
+ if (curtime - starttime) > self._max_retry_time:
+ # We've tried too many times; reject it.
+ self._reject_changefile(changefilename, changefile, DinstallException("Couldn't install \"%s\" in %d seconds" % (changefilename, self._max_retry_time)))
+ elif curtime >= nexttime:
+ if self._changefile_ready(changefilename, changefile):
+ # Let's do it!
+ self._logger.debug('Preparing to install "%s"' % (changefilename,))
+ try:
+ self._install_changefile(changefilename, changefile, doing_reprocess)
+ self._logger.debug('Removing "%s" from incoming queue after successful install.' % (changefilename,))
+ del self._reprocess_queue[changefilename]
+ except Exception, e:
+ logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,))
+ fucked.append(changefilename)
+ else:
+ delay *= 2
+ if delay > 60 * 60:
+ delay = 60 * 60
+ self._logger.info('Upload "%s" isn\'t complete; marking for retry in %d seconds' % (changefilename, delay))
+ self._reprocess_queue[changefilename][1:3] = [time.time() + delay, delay]
+ # done reprocessing; now scan for changed dirs.
+ relname = None
+ self._logger.debug('Checking dnotify event queue')
+ if not self._eventqueue.empty():
+ relname = os.path.basename(os.path.abspath(self._eventqueue.get()))
+ self._logger.debug('Got %s from dnotify' % (relname,))
+ if relname is None:
+ if (not doing_reprocess) and reprocess_needed.isSet():
+ self._logger.info('Got reprocessing event')
+ reprocess_needed.clear()
+ doing_reprocess = 1
+ if relname is None and (not doing_reprocess):
+ self._logger.debug('No events to process')
+ continue
+
+ for (changefilename, changefile) in self._get_changefiles():
+ if changefilename in fucked:
+ self._logger.warn("Skipping screwed changefile \"%s\"" % (changefilename,))
+ continue
+ # Have we tried this changefile before?
+ if not self._reprocess_queue.has_key(changefilename):
+ self._logger.debug('New change file "%s"' % (changefilename,))
+ if self._changefile_ready(changefilename, changefile):
+ try:
+ self._install_changefile(changefilename, changefile, doing_reprocess)
+ except Exception, e:
+ logger.exception("Unable to install \"%s\"; adding to screwed list" % (changefilename,))
+ fucked.append(changefilename)
+ else:
+ curtime = time.time()
+ self._logger.info('Upload "%s" isn\'t complete; marking for retry in %d seconds' % (changefilename, retry_time))
+ self._reprocess_queue[changefilename] = [curtime, curtime + retry_time, retry_time]
+ if doing_reprocess:
+ doing_reprocess = 0
+ self._logger.info('Reprocessing complete')
+ reprocess_finished.set()
+
+ def wait(self):
+ self._done_event.wait()
+
+def parse_versions(fullversion):
+ debianversion = re.sub('^[0-9]+:', '', fullversion)
+ upstreamver = re.sub('-[^-]*$', '', debianversion)
+
+ return (upstreamver, debianversion)
+
+class ArchiveDir:
+ def __init__(self, dir, logger, configdict, batch_mode=0, keyrings=None, extra_keyrings=None):
+ self._dir = dir
+ self._name = os.path.basename(os.path.abspath(dir))
+ self._logger = logger
+ for key in configdict.keys():
+ self._logger.debug("Setting \"%s\" => \"%s\" in archive \"%s\"" % ('_'+key, configdict[key], self._name))
+ self.__dict__['_' + key] = configdict[key]
+ do_mkdir(dir)
+ self._batch_mode = batch_mode
+ self._keyrings = keyrings
+ if not extra_keyrings is None :
+ self._extra_keyrings = extra_keyrings
+ else:
+ self._extra_keyrings = []
+ if self._mail_on_success:
+ self._success_logger = logging.Logger("mini-dinstall." + self._name)
+ self._success_logger.setLevel(logging.DEBUG)
+ handler = SubjectSpecifyingLoggingSMTPHandler('mini-dinstall success notice', mail_server, 'Mini-Dinstall <%s@%s>' % (getpass.getuser(),socket.gethostname()), [mail_to])
+ handler.setLevel(logging.DEBUG)
+ self._success_logger.addHandler(handler)
+ self._clean_targets = []
+
+# self._filerefmap = {}
+# self._changefiles = []
+
+ def _abspath(self, *args):
+ return os.path.abspath(apply(os.path.join, [self._dir] + list(args)))
+
+ def _relpath(self, *args):
+ return apply(os.path.join, [self._name] + list(args))
+
+ def install(self, changefilename, changefile, verify_sigs):
+ retval = 0
+ try:
+ retval = self._install_run_scripts(changefilename, changefile, verify_sigs)
+ except Exception:
+ self._logger.exception("Unhandled exception during installation")
+ if not retval:
+ self._logger.info('Failed to install "%s"' % (changefilename,))
+
+ def reject(self, changefilename, changefile, reason):
+ self._reject_changefile(changefilename, changefile, reason)
+
+ def _install_run_scripts(self, changefilename, changefile, verify_sigs):
+ self._logger.info('Preparing to install \"%s\" in archive %s' % (changefilename, self._name,))
+ sourcename = changefile['source']
+ version = changefile['version']
+ if verify_sigs:
+ self._logger.info('Verifying signature on "%s"' % (changefilename,))
+ try:
+ if self._keyrings:
+ verifier = DebianSigVerifier(keyrings=map(os.path.expanduser, self._keyrings), extra_keyrings=self._extra_keyrings)
+ else:
+ verifier = DebianSigVerifier(extra_keyrings=self._extra_keyrings)
+ output = verifier.verify(changefilename)
+ logger.debug(output)
+ logger.info('Good signature on "%s"' % (changefilename,))
+ except GPGSigVerificationFailure, e:
+ msg = "Failed to verify signature on \"%s\": %s\n" % (changefilename, e)
+ msg += string.join(e.getOutput(), '')
+ logger.error(msg)
+ self._reject_changefile(changefilename, changefile, e)
+ return 0
+ else:
+ self._logger.debug('Skipping signature verification on "%s"' % (changefilename,))
+ if self._pre_install_script:
+ try:
+ self._logger.debug("Running pre-installation script: " + self._pre_install_script)
+ if self._run_script(os.path.abspath(changefilename), self._pre_install_script):
+ return 0
+ except:
+ self._logger.exception("failure while running pre-installation script")
+ return 0
+ try:
+ self._install_changefile_internal(changefilename, changefile)
+ except Exception, e:
+ self._logger.exception('Failed to process "%s"' % (changefilename,))
+ self._reject_changefile(changefilename, changefile, e)
+ return 0
+ if self._chown_changes_files:
+ do_chmod(changefilename, 0600)
+ target = os.path.join(self._dir, os.path.basename(changefilename))
+ # the final step
+ do_rename(changefilename, target)
+ self._logger.info('Successfully installed %s %s to %s' % (sourcename, version, self._name))
+ if self._mail_on_success:
+ self._success_logger.info('Successfully installed %s %s to %s' % (sourcename, version, self._name))
+
+ if self._post_install_script:
+ try:
+ self._logger.debug("Running post-installation script: " + self._post_install_script)
+ self._run_script(target, self._post_install_script)
+ except:
+ self._logger.exception("failure while running post-installation script")
+ return 0
+ return 1
+
+ def _install_changefile_internal(self, changefilename, changefile):
+ sourcename = changefile['source']
+ version = changefile['version']
+ incomingdir = os.path.dirname(changefilename)
+ newfiles = []
+ is_native = not native_version_re.match(version)
+ if is_native:
+ (ignored, newdebianver) = parse_versions(version)
+ else:
+ (newupstreamver, newdebianver) = parse_versions(version)
+ is_sourceful = 0
+ for file in map(lambda x: x[4], changefile.getFiles()):
+ match = debpackage_re.search(file)
+ if match:
+ arch = match.group(3)
+ if not arch in self._arches:
+ raise DinstallException("Unknown architecture: %s" % (arch))
+ target = self._arch_target(arch, file)
+ newfiles.append((os.path.join(incomingdir, file), target, match.group(1), arch))
+ continue
+ match = debsrc_diff_re.search(file)
+ if match:
+ is_sourceful = 1
+ target = self._source_target(file)
+ newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source'))
+ continue
+ match = debsrc_orig_re.search(file)
+ if match:
+ is_sourceful = 1
+ target = self._source_target(file)
+ newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source'))
+ continue
+ match = debsrc_native_re.search(file)
+ if match:
+ is_sourceful = 1
+ target = self._source_target(file)
+ newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source'))
+ continue
+ match = debsrc_dsc_re.search(file) or debsrc_orig_re.search(file)
+ if match:
+ is_sourceful = 1
+ target = self._source_target(file)
+ newfiles.append((os.path.join(incomingdir, file), target, match.group(1), 'source'))
+ continue
+
+ all_arches = {}
+ for arch in map(lambda x: x[3], newfiles):
+ all_arches[arch] = 1
+ completed = []
+ oldfiles = []
+ if not self._keep_old:
+ found_old_bins = 0
+ for (oldversion, oldarch) in map(lambda x: x[1:], self._get_package_versions()):
+ if not all_arches.has_key(oldarch) and apt_pkg.VersionCompare(oldversion, version) < 0:
+ found_old_bins = 1
+ for (pkgname, arch) in map(lambda x: x[2:], newfiles):
+ if arch == 'source' and found_old_bins:
+ continue
+ self._logger.debug('Scanning for old files')
+ for file in self._read_arch_dir(arch):
+ match = debpackage_re.search(file)
+ if not match:
+ continue
+ oldpkgname = match.group(1)
+ oldarch = match.group(3)
+ file = self._arch_target(arch, file)
+ if not file in map(lambda x: x[0], oldfiles):
+ target = file + tmp_old_suffix
+ if oldpkgname == pkgname and oldarch == arch:
+ oldfiles.append((file, target))
+ self._logger.debug('Scanning "%s" for old files' % (self._abspath('source')))
+ for file in self._read_source_dir():
+ file = self._source_target(file)
+ if not file in map(lambda x: x[0], oldfiles):
+ target = file + tmp_old_suffix
+ match = debchanges_re.search(file)
+ if not match and is_sourceful:
+ match = debsrc_dsc_re.search(file) or debsrc_diff_re.search(file)
+ if match and match.group(1) == sourcename:
+ oldfiles.append((file, target))
+ continue
+ # We skip the rest of this if it wasn't a
+ # sourceful upload; really all we do if it isn't
+ # is clean out old .changes files.
+ if not is_sourceful:
+ continue
+ match = debsrc_orig_re.search(file)
+ if match and match.group(1) == sourcename:
+ if not is_native:
+ (oldupstreamver, olddebianver) = parse_versions(match.group(2))
+ if apt_pkg.VersionCompare(oldupstreamver, newupstreamver) < 0:
+ self._logger.debug('old upstream tarball "%s" version %s < %s, tagging for deletion' % (file, oldupstreamver, newupstreamver))
+ oldfiles.append((file, target))
+ continue
+ else:
+ self._logger.debug('keeping upstream tarball "%s" version %s' % (file, oldupstreamver))
+ continue
+ else:
+ self._logger.debug('old native tarball "%s", tagging for deletion' % (file,))
+ oldfiles.append((file, target))
+ continue
+ match = debsrc_native_re.search(file)
+ if match and match.group(1) in map(lambda x: x[2], newfiles):
+ oldfiles.append((file, target))
+ continue
+
+ self._clean_targets = map(lambda x: x[1], oldfiles)
+ allrenames = oldfiles + map(lambda x: x[:2], newfiles)
+ try:
+ while not allrenames == []:
+ (oldname, newname) = allrenames[0]
+ do_rename(oldname, newname)
+ completed.append(allrenames[0])
+ allrenames = allrenames[1:]
+ except OSError, e:
+ logger.exception("Failed to do rename (%s); attempting rollback" % (e.strerror,))
+ try:
+ self._logger.error(traceback.format_tb(sys.exc_traceback))
+ except:
+ pass
+ # Unwind to previous state
+ for (newname, oldname) in completed:
+ do_rename(oldname, newname)
+ raise
+ self._clean_targets = []
+ # remove old files
+ self.clean()
+
+ def _run_script(self, changefilename, script):
+ if script:
+ script = os.path.expanduser(script)
+ cmd = '%s %s' % (script, changefilename)
+ self._logger.info('Running \"%s\"' % (cmd,))
+ if not no_act:
+ if not os.access(script, os.X_OK):
+ self._logger.error("Can't execute script \"%s\"" % (script,))
+ return 1
+ pid = os.fork()
+ if pid == 0:
+ os.execlp(script, script, changefilename)
+ sys.exit(1)
+ (pid, status) = os.waitpid(pid, 0)
+ if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)):
+ self._logger.error("script \"%s\" exited with error code %d" % (cmd, os.WEXITSTATUS(status)))
+ return 1
+ return 0
+
+ def _reject_changefile(self, changefilename, changefile, exception):
+ sourcename = changefile['source']
+ version = changefile['version']
+ incomingdir = os.path.dirname(changefilename)
+ try:
+ f = open(os.path.join(rejectdir, "%s_%s.reason" % (sourcename, version)), 'w')
+ if type(exception) == type('string'):
+ f.write(exception)
+ else:
+ traceback.print_exception(Exception, exception, None, None, f)
+ f.close()
+ for file in map(lambda x: x[4], changefile.getFiles()):
+ if os.access(os.path.join(incomingdir, file), os.R_OK):
+ file = os.path.join(incomingdir, file)
+ else:
+ file = self._abspath(file)
+ target = os.path.join(rejectdir, os.path.basename(file))
+ do_rename(file, target)
+ self._logger.info('Rejecting "%s": %s' % (changefilename, `exception`))
+ except Exception:
+ self._logger.error("Unhandled exception while rejecting %s; archive may be in inconsistent state" % (changefilename,))
+ raise
+
+ def clean(self):
+ self._logger.debug('Removing old files')
+ for file in self._clean_targets:
+ self._logger.debug('Deleting "%s"' % (file,))
+ if not no_act:
+ os.unlink(file)
+
+class SimpleSubdirArchiveDir(ArchiveDir):
+ def __init__(self, *args, **kwargs):
+ apply(ArchiveDir.__init__, [self] + list(args), kwargs)
+ for arch in list(self._arches) + ['source']:
+ target = os.path.join(self._dir, arch)
+ do_mkdir(target)
+
+ def _read_source_dir(self):
+ return os.listdir(self._abspath('source'))
+
+ def _read_arch_dir(self, arch):
+ return os.listdir(self._abspath(arch))
+
+ def _arch_target(self, arch, file):
+ return self._abspath(arch, file)
+
+ def _source_target(self, file):
+ return self._arch_target('source', file)
+
+ def _get_package_versions(self):
+ ret = []
+ for arch in self._arches:
+ for file in self._read_arch_dir(arch):
+ match = debpackage_re.search(file)
+ if match:
+ ret.append((match.group(1), match.group(2), match.group(3)))
+ return ret
+
+
+class FlatArchiveDir(ArchiveDir):
+ def _read_source_dir(self):
+ return os.listdir(self._dir)
+
+ def _read_arch_dir(self, arch):
+ return os.listdir(self._dir)
+
+ def _arch_target(self, arch, file):
+ return self._abspath(file)
+
+ def _source_target(self, file):
+ return self._arch_target('source', file)
+
+ def _get_package_versions(self):
+ ret = []
+ for file in self._abspath(''):
+ match = debpackage_re.search(file)
+ if match:
+ ret.append((match.group(1), match.group(2), match.group(3)))
+ return ret
+
+class ArchiveDirIndexer(threading.Thread):
+ def __init__(self, dir, logger, configdict, use_dnotify=0, batch_mode=1):
+ self._dir = dir
+ self._name = os.path.basename(os.path.abspath(dir))
+ threading.Thread.__init__(self, name=self._name)
+ self._logger = logger
+ self._eventqueue = Queue.Queue()
+ for key in configdict.keys():
+ self._logger.debug("Setting \"%s\" => \"%s\" in archive \"%s\"" % ('_'+key, configdict[key], self._name))
+ self.__dict__['_' + key] = configdict[key]
+ do_mkdir(dir)
+ self._use_dnotify = use_dnotify
+ self._batch_mode = batch_mode
+ self._done_event = threading.Event()
+
+ def _abspath(self, *args):
+ return os.path.abspath(apply(os.path.join, [self._dir] + list(args)))
+
+ def _relpath(self, *args):
+ return apply(os.path.join, [self._name] + list(args))
+
+ def _make_indexfile(self, dir, type, name):
+ cmdline = ['apt-ftparchive', type, dir]
+ self._logger.debug("Running: " + string.join(cmdline, ' '))
+ if no_act:
+ return
+ (infd, outfd) = os.pipe()
+ pid = os.fork()
+ if pid == 0:
+ os.chdir(self._dir)
+ os.chdir('..')
+ os.close(infd)
+ misc.dup2(outfd, 1)
+ os.execvp('apt-ftparchive', cmdline)
+ os.exit(1)
+ os.close(outfd)
+ stdout = os.fdopen(infd)
+ packagesfile = open(os.path.join(dir, name), 'w')
+ zpackagesfile = gzip.GzipFile(os.path.join(dir, name + '.gz'), 'w')
+ buf = stdout.read(8192)
+ while buf != '':
+ packagesfile.write(buf)
+ zpackagesfile.write(buf)
+ buf = stdout.read(8192)
+ stdout.close()
+ (pid, status) = os.waitpid(pid, 0)
+ if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)):
+ raise DinstallException("apt-ftparchive exited with status code %d" % (status,))
+ packagesfile.close()
+ zpackagesfile.close()
+
+ def _make_packagesfile(self, dir):
+ self._make_indexfile(dir, 'packages', 'Packages')
+
+ def _make_sourcesfile(self, dir):
+ self._make_indexfile(dir, 'sources', 'Sources')
+
+ def _make_releasefile(self):
+ targetname = self._abspath('Release')
+ if not self._generate_release:
+ if os.access(targetname, os.R_OK):
+ self._logger.info("Release generation disabled, removing existing Release file")
+ try:
+ os.unlink(targetname)
+ except OSError, e:
+ pass
+ return
+ tmpname = targetname + tmp_new_suffix
+ release_needed = 0
+ uncompr_indexfiles = self._get_all_indexfiles()
+ indexfiles = []
+ comprexts = ['.gz']
+ for index in uncompr_indexfiles:
+ indexfiles = indexfiles + [index]
+ for ext in comprexts:
+ indexfiles = indexfiles + [index + ext]
+ if os.access(targetname, os.R_OK):
+ release_mtime = os.stat(targetname)[stat.ST_MTIME]
+ for file in indexfiles:
+ if release_needed:
+ break
+ if os.stat(self._abspath(file))[stat.ST_MTIME] > release_mtime:
+ release_needed = 1
+ else:
+ release_needed = 1
+
+ if not release_needed:
+ self._logger.info("Skipping Release generation")
+ return
+ self._logger.info("Generating Release...")
+ if no_act:
+ self._logger.info("Release generation complete")
+ return
+ f = open(tmpname, 'w')
+ f.write('Origin: ' + self._release_origin + '\n')
+ f.write('Label: ' + self._release_label + '\n')
+ suite = self._release_suite
+ if not suite:
+ suite = self._name
+ f.write('Suite: ' + suite + '\n')
+ codename = self._release_codename
+ if not codename:
+ codename = suite
+ f.write('Codename: ' + codename + '\n')
+ f.write('Date: ' + time.strftime("%a, %d %b %Y %H:%M:%S UTC", time.gmtime()) + '\n')
+ f.write('Architectures: ' + string.join(self._arches, ' ') + '\n')
+ if self._release_description:
+ f.write('Description: ' + self._release_description + '\n')
+ f.write('MD5Sum:\n')
+ for file in indexfiles:
+ absfile = self._abspath(file)
+ md5sum = self._get_file_sum('md5', absfile)
+ size = os.stat(absfile)[stat.ST_SIZE]
+ f.write(' %s% 16d %s\n' % (md5sum, size, file))
+ f.write('SHA1:\n')
+ for file in indexfiles:
+ absfile = self._abspath(file)
+ shasum = self._get_file_sum('sha1', absfile)
+ size = os.stat(absfile)[stat.ST_SIZE]
+ f.write(' %s% 16d %s\n' % (shasum, size, file))
+ f.close()
+ if self._sign_releasefile(tmpname):
+ os.rename(tmpname, targetname)
+ self._logger.info("Release generation complete")
+
+ def _sign_releasefile(self, name):
+ if self._release_signscript:
+ try:
+ self._logger.debug("Running Release signing script: " + self._release_signscript)
+ if self._run_script(name, self._release_signscript, dir=self._abspath()):
+ return None
+ except:
+ self._logger.exception("failure while running Release signature script")
+ return None
+ return 1
+
+ # Copied from ArchiveDir
+ def _run_script(self, changefilename, script, dir=None):
+ if script:
+ script = os.path.expanduser(script)
+ cmd = '%s %s' % (script, changefilename)
+ self._logger.info('Running \"%s\"' % (cmd,))
+ if not no_act:
+ if not os.access(script, os.X_OK):
+ self._logger.error("Can't execute script \"%s\"" % (script,))
+ return 1
+ pid = os.fork()
+ if pid == 0:
+ if dir:
+ os.chdir(dir)
+ os.execlp(script, script, changefilename)
+ sys.exit(1)
+ (pid, status) = os.waitpid(pid, 0)
+ if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)):
+ self._logger.error("script \"%s\" exited with error code %d" % (cmd, os.WEXITSTATUS(status)))
+ return 1
+ return 0
+
+ # Hacked up from ChangeFile.py; FIXME: merge the two
+ def _get_file_sum(self, type, filename):
+ if os.access('/usr/bin/%ssum' % (type,), os.X_OK):
+ cmd = '/usr/bin/%ssum %s' % (type, filename,)
+ self._logger.debug("Running: %s" % (cmd,))
+ child = popen2.Popen3(cmd, 1)
+ child.tochild.close()
+ erroutput = child.childerr.read()
+ child.childerr.close()
+ if erroutput != '':
+ child.fromchild.close()
+ raise DinstallException("%ssum returned error output \"%s\"" % (type, erroutput,))
+ (sum, filename) = string.split(child.fromchild.read(), None, 1)
+ child.fromchild.close()
+ status = child.wait()
+ if not (status is None or (os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0)):
+ if os.WIFEXITED(status):
+ msg = "%ssum exited with error code %d" % (type, os.WEXITSTATUS(status),)
+ elif os.WIFSTOPPED(status):
+ msg = "%ssum stopped unexpectedly with signal %d" % (type, os.WSTOPSIG(status),)
+ elif os.WIFSIGNALED(status):
+ msg = "%ssum died with signal %d" % (type, os.WTERMSIG(status),)
+ raise DinstallException(msg)
+ return sum.strip()
+ if type == 'md5':
+ import md5
+ f = open(filename)
+ md5sum = md5.new()
+ buf = f.read(8192)
+ while buf != '':
+ md5sum.update(buf)
+ buf = f.read(8192)
+ return md5sum.hexdigest()
+ elif type == 'sha1':
+ import sha
+ f = open(filename)
+ shasum = sha.new()
+ buf = f.read(8192)
+ while buf != '':
+ shasum.update(buf)
+ buf = f.read(8192)
+ return shasum.hexdigest()
+ else:
+ raise DinstallException('cannot compute hash of type %s; no builtin method or /usr/bin/%ssum', type, type)
+
+ def _index_all(self, force=None):
+ self._index(self._arches + ['source'], force)
+
+ def run(self):
+ self._logger.info('Created new thread (%s) for archive indexer %s' % (self.getName(), self._name,))
+ self._logger.info('Entering batch mode...')
+ try:
+ self._index_all(1)
+ self._make_releasefile()
+ if not self._batch_mode:
+ # never returns
+ self._daemonize()
+ self._done_event.set()
+ except Exception, e:
+ self._logger.exception("Unhandled exception; shutting down")
+ die_event.set()
+ self._done_event.set()
+ self._logger.info('Thread \"%s\" exiting' % (self.getName(),))
+
+ def _daemon_event_ispending(self):
+ return die_event.isSet() or (not self._eventqueue.empty())
+ def _daemonize(self):
+ self._logger.info('Entering daemon mode...')
+ if self._dynamic_reindex:
+ self._dnotify = DirectoryNotifierFactory().create(self._get_dnotify_dirs(), use_dnotify=self._use_dnotify, poll_time=self._poll_time, cancel_event=die_event)
+
+ self._async_dnotify = DirectoryNotifierAsyncWrapper(self._dnotify, self._eventqueue, logger=self._logger, name=self._name + " Indexer")
+ self._async_dnotify.start()
+
+ # The main daemon loop
+ while 1:
+
+ # Wait until we have a pending event
+ while not self._daemon_event_ispending():
+ time.sleep(1)
+
+ if die_event.isSet():
+ break
+
+ self._logger.debug('Reading from event queue')
+ setevent = None
+ dir = None
+ obj = self._eventqueue.get()
+ if type(obj) == type(''):
+ self._logger.debug('got dir change')
+ dir = obj
+ elif type(obj) == type(None):
+ self._logger.debug('got general event')
+ setevent = None
+ elif obj.__class__ == threading.Event().__class__:
+ self._logger.debug('got wait_reprocess event')
+ setevent = obj
+ else:
+ self._logger.error("unknown object %s in event queue" % (obj,))
+ assert None
+
+ # This is to protect against both lots of activity, and to
+ # prevent race conditions, so we can rely on timestamps.
+ time.sleep(1)
+ if not self._reindex_needed():
+ if setevent:
+ self._logger.debug('setting wait_reprocess event')
+ setevent.set()
+ continue
+ if dir is None:
+ self._logger.debug('Got general change')
+ self._index_all(1)
+ else:
+ self._logger.debug('Got change in %s' % (dir,))
+ self._index([os.path.basename(os.path.abspath(dir))])
+ self._make_releasefile()
+ if setevent:
+ self._logger.debug('setting wait_reprocess event')
+ setevent.set()
+
+ def _reindex_needed(self):
+ reindex_needed = 0
+ if os.access(self._abspath('Release.gpg'), os.R_OK):
+ gpg_mtime = os.stat(self._abspath('Release.gpg'))[stat.ST_MTIME]
+ for dir in self._get_dnotify_dirs():
+ dir_mtime = os.stat(self._abspath(dir))[stat.ST_MTIME]
+ if dir_mtime > gpg_mtime:
+ reindex_needed = 1
+ else:
+ reindex_needed = 1
+ return reindex_needed
+
+ def _index(self, arches, force=None):
+ self._index_impl(arches, force=force)
+
+ def wait_reprocess(self):
+ e = threading.Event()
+ self._eventqueue.put(e)
+ self._logger.debug('waiting on reprocess')
+ while not (e.isSet() or die_event.isSet()):
+ time.sleep(0.5)
+ self._logger.debug('done waiting on reprocess')
+
+ def wait(self):
+ self._done_event.wait()
+
+ def notify(self):
+ self._eventqueue.put(None)
+
+class SimpleSubdirArchiveDirIndexer(ArchiveDirIndexer):
+ def __init__(self, *args, **kwargs):
+ apply(ArchiveDirIndexer.__init__, [self] + list(args), kwargs)
+ for arch in list(self._arches) + ['source']:
+ target = os.path.join(self._dir, arch)
+ do_mkdir(target)
+
+ def _index_impl(self, arches, force=None):
+ for arch in arches:
+ dirmtime = os.stat(self._relpath(arch))[stat.ST_MTIME]
+ if arch != 'source':
+ pkgsfile = self._relpath(arch, 'Packages')
+ if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
+ self._logger.info('Generating Packages file for %s...' % (arch,))
+ self._make_packagesfile(self._relpath(arch))
+ self._logger.info('Packages generation complete')
+ else:
+ self._logger.info('Skipping generation of Packages file for %s' % (arch,))
+
+ else:
+ pkgsfile = self._relpath(arch, 'Sources')
+ if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
+ self._logger.info('Generating Sources file for %s...' % (arch,))
+ self._make_sourcesfile(self._relpath('source'))
+ self._logger.info('Sources generation complete')
+ else:
+ self._logger.info('Skipping generation of Sources file for %s' % (arch,))
+
+ def _in_archdir(self, *args):
+ return apply(lambda x,self=self: self._abspath(x), args)
+
+ def _get_dnotify_dirs(self):
+ return map(lambda x, self=self: self._abspath(x), self._arches + ['source'])
+
+ def _get_all_indexfiles(self):
+ return map(lambda arch: os.path.join(arch, 'Packages'), self._arches) + ['source/Sources']
+
+class FlatArchiveDirIndexer(ArchiveDirIndexer):
+ def __init__(self, *args, **kwargs):
+ apply(ArchiveDirIndexer.__init__, [self] + list(args), kwargs)
+
+ def _index_impl(self, arches, force=None):
+ pkgsfile = self._abspath('Packages')
+ dirmtime = os.stat(self._relpath())[stat.ST_MTIME]
+ if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
+ self._logger.info('Generating Packages file...')
+ self._make_packagesfile(self._relpath())
+ self._logger.info('Packages generation complete')
+ else:
+ self._logger.info('Skipping generation of Packages file')
+ pkgsfile = self._abspath('Sources')
+ if force or (not os.access(pkgsfile, os.R_OK)) or dirmtime > os.stat(pkgsfile)[stat.ST_MTIME]:
+ self._logger.info('Generating Sources file...')
+ self._make_sourcesfile(self._relpath())
+ self._logger.info('Sources generation complete')
+ else:
+ self._logger.info('Skipping generation of Sources file')
+
+ def _in_archdir(self, *args):
+ return apply(lambda x,self=self: self._abspath(x), args[1:])
+
+ def _get_dnotify_dirs(self):
+ return [self._dir]
+
+ def _get_all_indexfiles(self):
+ return ['Packages', 'Sources']
+
+if os.access(lockfilename, os.R_OK):
+ logger.critical("lockfile \"%s\" exists (pid %s): is another mini-dinstall running?" % (lockfilename, open(lockfilename).read(10)))
+ logging.shutdown()
+ sys.exit(1)
+logger.debug('Creating lock file: ' + lockfilename)
+if not no_act:
+ lockfile = open(lockfilename, 'w')
+ lockfile.close()
+
+if not batch_mode:
+ # daemonize
+ logger.debug("Daemonizing...")
+ if os.fork() == 0:
+ os.setsid()
+ if os.fork() != 0:
+ sys.exit(0)
+ else:
+ sys.exit(0)
+ sys.stdin.close()
+ sys.stdout.close()
+ sys.stderr.close()
+ os.close(0)
+ os.close(1)
+ os.close(2)
+ # unix file descriptor allocation ensures that the followin are fd 0,1,2
+ sys.stdin = open("/dev/null")
+ sys.stdout = open("/dev/null")
+ sys.stderr = open("/dev/null")
+ logger.debug("Finished daemonizing (pid %s)" % (os.getpid(),))
+
+lockfile = open(lockfilename, 'w')
+lockfile.write("%s" % (os.getpid(),))
+lockfile.close()
+
+if not (debug_mode or batch_mode):
+ # Don't log to stderr past this point
+ logger.removeHandler(stderr_handler)
+
+archivemap = {}
+# Instantiaate archive classes for installing files
+for dist in distributions.keys():
+ if distributions[dist]['archive_style'] == 'simple-subdir':
+ newclass = SimpleSubdirArchiveDir
+ else:
+ newclass = FlatArchiveDir
+ archivemap[dist] = [newclass(dist, logger, distributions[dist], batch_mode=batch_mode, keyrings=default_keyrings, extra_keyrings=default_extra_keyrings), None]
+
+# Create archive indexing threads, but don't start them yet
+for dist in distributions.keys():
+ targetdir = os.path.join(toplevel_directory, dist)
+ logger.info('Initializing archive indexer %s' % (dist,))
+ if distributions[dist]['archive_style'] == 'simple-subdir':
+ newclass = SimpleSubdirArchiveDirIndexer
+ else:
+ newclass = FlatArchiveDirIndexer
+ archive = newclass(targetdir, logger, distributions[dist], use_dnotify=use_dnotify, batch_mode=batch_mode)
+ archivemap[dist][1] = archive
+
+# Now: kick off the incoming processor
+logger.info('Initializing incoming processor')
+incoming = IncomingDir(incoming_subdir, archivemap, logger, trigger_reindex=trigger_reindex, poll_time=default_poll_time, max_retry_time=default_max_retry_time, batch_mode=batch_mode, verify_sigs=default_verify_sigs)
+logger.debug('Starting incoming processor')
+incoming.start()
+if batch_mode:
+ logger.debug('Waiting for incoming processor to finish')
+ incoming.wait()
+
+# Once we've installed everything, start the indexing threads
+for dist in distributions.keys():
+ archive = archivemap[dist][1]
+ logger.debug('Starting archive %s' % (archive.getName(),))
+ archive.start()
+
+# Wait for all the indexing threads to finish; none of these ever
+# return if we're in daemon mode
+if batch_mode:
+ for dist in distributions.keys():
+ archive = archivemap[dist][1]
+ logger.debug('Waiting for archive %s to finish' % (archive.getName(),))
+ archive.wait()
+else:
+ logger.debug("Waiting for die event")
+ die_event.wait()
+ logger.info('Die event caught; waiting for incoming processor to finish')
+ incoming.wait()
+ for dist in distributions.keys():
+ archive = archivemap[dist][1]
+ logger.info('Die event caught; waiting for archive %s to finish' % (archive.getName(),))
+ archive.wait()
+
+#logging.shutdown()
+logger.debug('Removing lock file: ' + lockfilename)
+os.unlink(lockfilename)
+logger.info("main thread exiting...")
+sys.exit(0)
+
+# vim:ts=4:sw=4:et: