import Queue
import logging
import threading
import time
import subprocess
import os
from datetime import datetime, timedelta
from mirrors.libmirrors import t2s
class Singleton(type):
"""Singleton Class for RepoManager."""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
[docs]class Repo(object):
[docs] def __init__(self, name, config):
"""A repo object which stores info about a single repo.
:param str name: Name of repo
:param config: running config options
:type config: ConfigParser.ConfigParser
"""
# ConfigParser Object which all of the repo info is stored under the repo name
self.config = config
# Name of the Repo
self.name = name
# inactive repos will not run
self.deactive = False
# Status of Repo Queue
self.queued = False
# Singleton of RepoManager
self.repo_manager = RepoManager()
# Contains rsync_thread
self.__sync = None
# Config Validation Section
if not self.config.has_option(self.name, 'source'):
raise RepoConfigError("No Source Defined".format(self.name), self.name)
if not self.config.has_option(self.name, 'destination'):
self.config.set(self.name, 'destination', './distro/')
directory = os.path.dirname(self.config.get(self.name, 'destination'))
if not os.path.exists(directory):
logging.info("Creating {0}".format(directory))
os.makedirs(directory)
if not self.config.has_option(self.name, 'rsync_args'):
raise RepoConfigError("No rsync_args Defined".format(self.name), self.name)
if not self.config.has_option(self.name, 'weight'):
self.config.set(self.name, 'weight', '0')
if self.config.has_option(self.name, 'deactive'):
self.deactive = self.config.getboolean(self.name, 'deactive')
else:
self.config.set(self.name, 'deactive', 'False')
if self.config.has_option(self.name, 'async_sleep') and self.config.has_option(self.name, 'hourly_sync'):
raise RepoConfigError("Both async_sleep and hourly_sync cannot be defined".format(self.name), self.name)
elif not self.config.has_option(self.name, 'async_sleep') and not self.config.has_option(self.name, 'hourly_sync'):
raise RepoConfigError("Either async_sleep or hourly_sync must be defined".format(self.name), self.name)
if not self.config.has_option(self.name, 'pre_command'):
self.config.set(self.name, 'pre_command', '')
if not self.config.has_option(self.name, 'post_command'):
self.config.set(self.name, 'post_command', '')
if not self.config.has_option(self.name, 'log_file'):
self.config.set(self.name, 'log_file', './log/{0}.log'.format(self.name))
logging.info("No log_file declared in {0}, defaulting to '{0}.log'".format(self.name))
# end config validation section
log_file = self.config.get(self.name, "log_file")
directory = os.path.dirname(log_file)
if not os.path.exists(directory):
logging.info("Creating {0}".format(directory))
try:
os.makedirs(directory)
except IOError:
logging.error("Failed to create {0}".format(directory))
try:
open(log_file, 'a').close()
logging.debug("{0} log file good for writing".format(self.name))
except IOError:
logging.error("Error opening {0} for writing".format(self.name))
if(self.deactive):
logging.info("{0} loaded successfully, but disabled".format(self.name))
else:
logging.info("{0} loaded successfully".format(self.name))
[docs] def is_alive(self):
"""Bool of syncing status."""
if self.__sync:
return bool(self.__sync.p)
return False
[docs] def running_time(self):
"""Total running time of active sync.
:rtype: int
:returns: An int of total syncing time elapsed
:rtype: None
:returns: None if not syncing
"""
if self.__sync:
if self.is_alive():
delta = datetime.now() - self.__sync.start_time
return delta - timedelta(microseconds=delta.microseconds)
[docs] def sleep_time(self):
"""Sleep duration of sleeping sync.
:rtype: int
:returns: A int of time elapsed since sleeping
:rtype: None
:returns: None if not in sleeping state
"""
if self.__sync:
if self.__sync.sleep_start:
delta = datetime.now() - self.__sync.sleep_start
return delta - timedelta(microseconds=delta.microseconds)
[docs] def time_remaining(self):
"""Return time left until sleep is over.
:rtype: int
:returns: A int of time remaining in sleep state
:rtype: None
:returns: None if not in sleeping state
"""
if self.__sync:
if self.__sync.sleep_start:
delta = timedelta(seconds=t2s(self.config.get(self.name, "async_sleep"))) - self.sleep_time()
return delta - timedelta(microseconds=delta.microseconds)
[docs] def terminate(self):
"""Send SIGTERM To the rsync process."""
if self.is_alive():
logging.info("Terminating {0}".format(self.name))
self.__sync.p.terminate()
[docs] def kill(self):
"""Send SIGKILL To the rsync process."""
if self.is_alive():
logging.info("KIlling {0}".format(self.name))
self.__sync.p.kill()
def __rebuild(self):
"""Destroy and recreate the rsync object and settings.
This will wipe all currently running rsync timers
"""
self.__sync = self.rsync_thread(self.name, self.config)
[docs] def start_sync(self):
"""Run an rsync against the repo source."""
self.__rebuild()
self.__sync.start()
[docs] class rsync_thread(threading.Thread):
"""Extended threading.Thread class to control rsync via subprocess.
:param str name: Name of repo
:param config: Running config options
:type config: Configparser.Configparser
"""
def __init__(self, name, config):
threading.Thread.__init__(self)
self.config = config
self.p = None
self.name = name
# Singleton of RepoManager
self.repo_manager = RepoManager()
self.start_time = None
self.finish_time = None
self.start_sleep = None
self.thread_timer = None
self.daemon = True
def run(self):
logging.debug("Opening {0} for writing".format(self.config.get(self.name, 'log_file')))
output_file = open(self.config.get(self.name, 'log_file'), 'a')
logging.debug("Running rsync with {0} {1} {2}".format(
self.config.get(self.name, "rsync_args"),
self.config.get(self.name, "source"),
self.config.get(self.name, "destination")))
self.start_time = datetime.now()
logging.info("Starting sync {0} at {1}".format(self.name, self.start_time))
self.p = subprocess.Popen("rsync {0} {1} {2}".format(
self.config.get(self.name, "rsync_args"),
self.config.get(self.name, "source"),
self.config.get(self.name, "destination")).split(),
shell=False,
stdout=output_file,
stderr=subprocess.STDOUT)
# bock until the subprocess is done
self.p.wait()
if self.config.get(self.name, "post_command"):
logging.debug("running post_cmd {0}".format(self.config.get(self.name, "post_command")))
self.post_cmd = subprocess.Popen("{0}".format(
self.config.get(self.name, "post_command")),
shell=True,
stdout=output_file,
stderr=subprocess.STDOUT)
self.post_cmd.wait()
logging.info("Done running post_command for {0}".format(self.name))
t = t2s(self.config.get(self.name, "async_sleep"))
self.thread_timer = threading.Timer(t, self.repo_manager.enqueue, [self.name])
self.thread_timer.start()
# Time that thread starts sleeping
self.sleep_start = datetime.now()
# clear out the current process when it finishes
self.p = None
# Remove state from running_syncs
self.repo_manager.running_syncs -= 1
self.finish_time = datetime.now()
logging.info("finished {0} at {1}, sleeping for {2}".format(self.name, self.finish_time, self.config.get(self.name, "async_sleep")))
logging.debug("closing {0}".format(self.config.get(self.name, 'log_file')))
output_file.close()
[docs]class RepoManager(object):
__metaclass__ = Singleton
[docs] def __init__(self, config):
"""Singleton manager of the repositories and threading.
:param config: Running config options
:type config: Configparser.Configparser
"""
# configparser object which all of the repomanager configs are stored under the GLOBAL Section
self.config = config
# priority queue for async processing
self.repo_queue = Queue.PriorityQueue(0)
# list of repo objects
self._repo_dict = dict()
if not self.config.has_section("GLOBAL"):
raise GlobalError("Config requires GLOBAL Section")
if not self.config.has_option('GLOBAL', 'async_processes'):
raise GlobalError("No async_processes value defined in GLOBAL")
if not self.config.has_option('GLOBAL', 'check_sleep'):
config.set("GLOBAL", 'check_sleep', '30')
# current running syncs: compared against max set in config
self.running_syncs = 0
self.async_thread = threading.Thread(name="async_control", target=self.__check_queue)
self.async_thread.daemon = True
self.async_thread.start()
def __check_queue(self):
"""Queue loop checker for async_control."""
while(True):
# Check for inactive repos
found = None
while not found:
repo = self.repo_queue.get()[1]
if not repo.deactive:
found = True
else:
# If inactive, toss aside
break
if self.running_syncs <= self.config.getint("GLOBAL", "async_processes"):
logging.debug("Acquired {0}".format(repo.name))
repo.queued = False
self.running_syncs += 1
logging.debug("Running Sync {0}, {1} slots available".format(repo.name, self.config.getint("GLOBAL", "async_processes")-self.running_syncs))
repo.start_sync()
else:
logging.debug("Requeuing {0}, no open threads".format(repo.name))
self.repo_queue.put([-11, repo])
time.sleep(30)
[docs] def get_repo(self, name):
"""Return repo object if exists.
:param str name: name of repo
:rtype: Repo
:returns: Repo Object
:rtype: None
:returns: None if no repo exists by passed in name
"""
if name in self._repo_dict:
return self._repo_dict[name]
[docs] def gen_repo(self):
"""Generator for repo_dict.
:rtype: Repo
:returns: Repo Object
"""
for name in self._repo_dict:
yield self._repo_dict[name]
[docs] def add_repo(self, name):
"""Create a repo for a section in the running config.
:param str name: Name of repo
:raises Repo.RepoConfigError: if no config exists for given repo name
"""
if self.get_repo(name):
raise RepoConfigError("Cannon create repo {0}, already created".format(name), name)
if self.config.has_section(name):
repo = Repo(name, self.config)
self._repo_dict[name] = repo
else:
raise RepoConfigError("Cannot create repo, section {0} does not exist".format(name), name)
[docs] def deactivate(self, name):
"""Deactivate repo from syncing.
:param str name: Name of repo
:raises Repo.RepoError: if no repo exists by given name
"""
if self.get_repo(name):
if self.get_repo(name).deactive:
# nothing to do, already deactive
return
self.get_repo(name).deactive = True
logging.info("Deactivating {0}".format(name))
else:
raise RepoError("No Repo Named {0}".format(name), name)
[docs] def activate(self, name):
"""Activate repo for syncing.
:param str name: Name of Repo
:raises Repo.RepoError: if no repo exists by given name
"""
if self.get_repo(name):
if not self.get_repo(name).deactive:
# nothing to do, already active
return
self.get_repo(name).deactive = False
self.enqueue(name)
logging.info("Activating {0}".format(name))
else:
raise RepoError("No Repo Named {0}".format(name), name)
[docs] def status(self, name):
"""Return status of Repo.
:param str name: Name of Repo
:rtype: str
:returns: str status of Repo
"""
if not self.get_repo(name):
raise RepoError("Repo {0} doesn't exist".format(name), name)
if self.get_repo(name).deactive:
return "{0} is deactive".format(name)
elif self.get_repo(name).queued:
return "{0} is queued".format(name)
elif self.get_repo(name).is_alive():
return "{0} is syncing, active for {1}".format(name, self.get_repo(name).running_time())
else:
return "{0} is sleeping, sync in {1}".format(name, self.get_repo(name).time_remaining())
[docs] def del_repo(self, name):
"""Delete repo object from dict.
:param str name: Name of repo
:raises Repo.RepoError: if no repo exists by passed in name.
"""
if self.get_repo(name):
del self._repo_dict[name]
else:
raise RepoError("Cannot delete repo, repo {0} does not exist".format(name))
[docs] def enqueue(self, name):
"""Add repo to the queue.
:param str name: Name of repo
:raises Repo.RepoError: if repo is already queued or doesn't exist
"""
if not self.get_repo(name):
raise RepoError("Repo {0} doesn't exist".format(name), name)
if self.get_repo(name).deactive:
raise RepoError("Failed to queue repo, {0} is deactive.".format(name), name)
if self.get_repo(name).queued:
raise RepoError("Failed to queue repo, {0} already queued.".format(name), name)
if self.get_repo(name).is_alive():
raise RepoError("Failed to queue Repo, {0} is syncing.".format(name), name)
self.repo_queue.put([self.config.get(name, "weight"), self.get_repo(name)])
self.get_repo(name).queued = True
class GlobalError(Exception):
def __init__(self, message):
"""Fatal GLOBAL Section Config Error."""
Exception.__init__(self, message)
self.message = message
class RepoConfigError(Exception):
def __init__(self, message, name):
"""Non-Fatal Repo config Error."""
Exception.__init__(self, message)
self.message = message
self.name = name
class RepoError(Exception):
def __init__(self, message, name):
"""Non-Fatal Repo Error."""
Exception.__init__(self, message)
self.message = message
self.name = name