Source code for mirrors.repo

import Queue
import logging
import threading
import time
import subprocess
import os
from datetime import datetime, timedelta
from mirrors.libmirrors import t2s


class Singleton(type):
    """Singleton Class for RepoManager."""
    _instances = {}

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


[docs]class Repo(object):
[docs] def __init__(self, name, config): """A repo object which stores info about a single repo. :param str name: Name of repo :param config: running config options :type config: ConfigParser.ConfigParser """ # ConfigParser Object which all of the repo info is stored under the repo name self.config = config # Name of the Repo self.name = name # inactive repos will not run self.deactive = False # Status of Repo Queue self.queued = False # Singleton of RepoManager self.repo_manager = RepoManager() # Contains rsync_thread self.__sync = None # Config Validation Section if not self.config.has_option(self.name, 'source'): raise RepoConfigError("No Source Defined".format(self.name), self.name) if not self.config.has_option(self.name, 'destination'): self.config.set(self.name, 'destination', './distro/') directory = os.path.dirname(self.config.get(self.name, 'destination')) if not os.path.exists(directory): logging.info("Creating {0}".format(directory)) os.makedirs(directory) if not self.config.has_option(self.name, 'rsync_args'): raise RepoConfigError("No rsync_args Defined".format(self.name), self.name) if not self.config.has_option(self.name, 'weight'): self.config.set(self.name, 'weight', '0') if self.config.has_option(self.name, 'deactive'): self.deactive = self.config.getboolean(self.name, 'deactive') else: self.config.set(self.name, 'deactive', 'False') if self.config.has_option(self.name, 'async_sleep') and self.config.has_option(self.name, 'hourly_sync'): raise RepoConfigError("Both async_sleep and hourly_sync cannot be defined".format(self.name), self.name) elif not self.config.has_option(self.name, 'async_sleep') and not self.config.has_option(self.name, 'hourly_sync'): raise RepoConfigError("Either async_sleep or hourly_sync must be defined".format(self.name), self.name) if not self.config.has_option(self.name, 'pre_command'): self.config.set(self.name, 'pre_command', '') if not self.config.has_option(self.name, 'post_command'): self.config.set(self.name, 'post_command', '') if not self.config.has_option(self.name, 'log_file'): self.config.set(self.name, 'log_file', './log/{0}.log'.format(self.name)) logging.info("No log_file declared in {0}, defaulting to '{0}.log'".format(self.name)) # end config validation section log_file = self.config.get(self.name, "log_file") directory = os.path.dirname(log_file) if not os.path.exists(directory): logging.info("Creating {0}".format(directory)) try: os.makedirs(directory) except IOError: logging.error("Failed to create {0}".format(directory)) try: open(log_file, 'a').close() logging.debug("{0} log file good for writing".format(self.name)) except IOError: logging.error("Error opening {0} for writing".format(self.name)) if(self.deactive): logging.info("{0} loaded successfully, but disabled".format(self.name)) else: logging.info("{0} loaded successfully".format(self.name))
[docs] def is_alive(self): """Bool of syncing status.""" if self.__sync: return bool(self.__sync.p) return False
[docs] def running_time(self): """Total running time of active sync. :rtype: int :returns: An int of total syncing time elapsed :rtype: None :returns: None if not syncing """ if self.__sync: if self.is_alive(): delta = datetime.now() - self.__sync.start_time return delta - timedelta(microseconds=delta.microseconds)
[docs] def sleep_time(self): """Sleep duration of sleeping sync. :rtype: int :returns: A int of time elapsed since sleeping :rtype: None :returns: None if not in sleeping state """ if self.__sync: if self.__sync.sleep_start: delta = datetime.now() - self.__sync.sleep_start return delta - timedelta(microseconds=delta.microseconds)
[docs] def time_remaining(self): """Return time left until sleep is over. :rtype: int :returns: A int of time remaining in sleep state :rtype: None :returns: None if not in sleeping state """ if self.__sync: if self.__sync.sleep_start: delta = timedelta(seconds=t2s(self.config.get(self.name, "async_sleep"))) - self.sleep_time() return delta - timedelta(microseconds=delta.microseconds)
[docs] def terminate(self): """Send SIGTERM To the rsync process.""" if self.is_alive(): logging.info("Terminating {0}".format(self.name)) self.__sync.p.terminate()
[docs] def kill(self): """Send SIGKILL To the rsync process.""" if self.is_alive(): logging.info("KIlling {0}".format(self.name)) self.__sync.p.kill()
def __rebuild(self): """Destroy and recreate the rsync object and settings. This will wipe all currently running rsync timers """ self.__sync = self.rsync_thread(self.name, self.config)
[docs] def start_sync(self): """Run an rsync against the repo source.""" self.__rebuild() self.__sync.start()
[docs] class rsync_thread(threading.Thread): """Extended threading.Thread class to control rsync via subprocess. :param str name: Name of repo :param config: Running config options :type config: Configparser.Configparser """ def __init__(self, name, config): threading.Thread.__init__(self) self.config = config self.p = None self.name = name # Singleton of RepoManager self.repo_manager = RepoManager() self.start_time = None self.finish_time = None self.start_sleep = None self.thread_timer = None self.daemon = True def run(self): logging.debug("Opening {0} for writing".format(self.config.get(self.name, 'log_file'))) output_file = open(self.config.get(self.name, 'log_file'), 'a') logging.debug("Running rsync with {0} {1} {2}".format( self.config.get(self.name, "rsync_args"), self.config.get(self.name, "source"), self.config.get(self.name, "destination"))) self.start_time = datetime.now() logging.info("Starting sync {0} at {1}".format(self.name, self.start_time)) self.p = subprocess.Popen("rsync {0} {1} {2}".format( self.config.get(self.name, "rsync_args"), self.config.get(self.name, "source"), self.config.get(self.name, "destination")).split(), shell=False, stdout=output_file, stderr=subprocess.STDOUT) # bock until the subprocess is done self.p.wait() if self.config.get(self.name, "post_command"): logging.debug("running post_cmd {0}".format(self.config.get(self.name, "post_command"))) self.post_cmd = subprocess.Popen("{0}".format( self.config.get(self.name, "post_command")), shell=True, stdout=output_file, stderr=subprocess.STDOUT) self.post_cmd.wait() logging.info("Done running post_command for {0}".format(self.name)) t = t2s(self.config.get(self.name, "async_sleep")) self.thread_timer = threading.Timer(t, self.repo_manager.enqueue, [self.name]) self.thread_timer.start() # Time that thread starts sleeping self.sleep_start = datetime.now() # clear out the current process when it finishes self.p = None # Remove state from running_syncs self.repo_manager.running_syncs -= 1 self.finish_time = datetime.now() logging.info("finished {0} at {1}, sleeping for {2}".format(self.name, self.finish_time, self.config.get(self.name, "async_sleep"))) logging.debug("closing {0}".format(self.config.get(self.name, 'log_file'))) output_file.close()
[docs]class RepoManager(object): __metaclass__ = Singleton
[docs] def __init__(self, config): """Singleton manager of the repositories and threading. :param config: Running config options :type config: Configparser.Configparser """ # configparser object which all of the repomanager configs are stored under the GLOBAL Section self.config = config # priority queue for async processing self.repo_queue = Queue.PriorityQueue(0) # list of repo objects self._repo_dict = dict() if not self.config.has_section("GLOBAL"): raise GlobalError("Config requires GLOBAL Section") if not self.config.has_option('GLOBAL', 'async_processes'): raise GlobalError("No async_processes value defined in GLOBAL") if not self.config.has_option('GLOBAL', 'check_sleep'): config.set("GLOBAL", 'check_sleep', '30') # current running syncs: compared against max set in config self.running_syncs = 0 self.async_thread = threading.Thread(name="async_control", target=self.__check_queue) self.async_thread.daemon = True self.async_thread.start()
def __check_queue(self): """Queue loop checker for async_control.""" while(True): # Check for inactive repos found = None while not found: repo = self.repo_queue.get()[1] if not repo.deactive: found = True else: # If inactive, toss aside break if self.running_syncs <= self.config.getint("GLOBAL", "async_processes"): logging.debug("Acquired {0}".format(repo.name)) repo.queued = False self.running_syncs += 1 logging.debug("Running Sync {0}, {1} slots available".format(repo.name, self.config.getint("GLOBAL", "async_processes")-self.running_syncs)) repo.start_sync() else: logging.debug("Requeuing {0}, no open threads".format(repo.name)) self.repo_queue.put([-11, repo]) time.sleep(30)
[docs] def get_repo(self, name): """Return repo object if exists. :param str name: name of repo :rtype: Repo :returns: Repo Object :rtype: None :returns: None if no repo exists by passed in name """ if name in self._repo_dict: return self._repo_dict[name]
[docs] def gen_repo(self): """Generator for repo_dict. :rtype: Repo :returns: Repo Object """ for name in self._repo_dict: yield self._repo_dict[name]
[docs] def add_repo(self, name): """Create a repo for a section in the running config. :param str name: Name of repo :raises Repo.RepoConfigError: if no config exists for given repo name """ if self.get_repo(name): raise RepoConfigError("Cannon create repo {0}, already created".format(name), name) if self.config.has_section(name): repo = Repo(name, self.config) self._repo_dict[name] = repo else: raise RepoConfigError("Cannot create repo, section {0} does not exist".format(name), name)
[docs] def deactivate(self, name): """Deactivate repo from syncing. :param str name: Name of repo :raises Repo.RepoError: if no repo exists by given name """ if self.get_repo(name): if self.get_repo(name).deactive: # nothing to do, already deactive return self.get_repo(name).deactive = True logging.info("Deactivating {0}".format(name)) else: raise RepoError("No Repo Named {0}".format(name), name)
[docs] def activate(self, name): """Activate repo for syncing. :param str name: Name of Repo :raises Repo.RepoError: if no repo exists by given name """ if self.get_repo(name): if not self.get_repo(name).deactive: # nothing to do, already active return self.get_repo(name).deactive = False self.enqueue(name) logging.info("Activating {0}".format(name)) else: raise RepoError("No Repo Named {0}".format(name), name)
[docs] def status(self, name): """Return status of Repo. :param str name: Name of Repo :rtype: str :returns: str status of Repo """ if not self.get_repo(name): raise RepoError("Repo {0} doesn't exist".format(name), name) if self.get_repo(name).deactive: return "{0} is deactive".format(name) elif self.get_repo(name).queued: return "{0} is queued".format(name) elif self.get_repo(name).is_alive(): return "{0} is syncing, active for {1}".format(name, self.get_repo(name).running_time()) else: return "{0} is sleeping, sync in {1}".format(name, self.get_repo(name).time_remaining())
[docs] def del_repo(self, name): """Delete repo object from dict. :param str name: Name of repo :raises Repo.RepoError: if no repo exists by passed in name. """ if self.get_repo(name): del self._repo_dict[name] else: raise RepoError("Cannot delete repo, repo {0} does not exist".format(name))
[docs] def enqueue(self, name): """Add repo to the queue. :param str name: Name of repo :raises Repo.RepoError: if repo is already queued or doesn't exist """ if not self.get_repo(name): raise RepoError("Repo {0} doesn't exist".format(name), name) if self.get_repo(name).deactive: raise RepoError("Failed to queue repo, {0} is deactive.".format(name), name) if self.get_repo(name).queued: raise RepoError("Failed to queue repo, {0} already queued.".format(name), name) if self.get_repo(name).is_alive(): raise RepoError("Failed to queue Repo, {0} is syncing.".format(name), name) self.repo_queue.put([self.config.get(name, "weight"), self.get_repo(name)]) self.get_repo(name).queued = True
class GlobalError(Exception): def __init__(self, message): """Fatal GLOBAL Section Config Error.""" Exception.__init__(self, message) self.message = message class RepoConfigError(Exception): def __init__(self, message, name): """Non-Fatal Repo config Error.""" Exception.__init__(self, message) self.message = message self.name = name class RepoError(Exception): def __init__(self, message, name): """Non-Fatal Repo Error.""" Exception.__init__(self, message) self.message = message self.name = name