Source code for openbases.utils.fileio

# Copyright (c) 2018, Vanessa Sochat All rights reserved.
# See the LICENSE in the main repository at:
#    https://www.github.com/openbases/openbases-python

from openbases.logger import bot
from subprocess import (
    Popen,
    PIPE,
    STDOUT
)
import errno
import json
import os
import re
import shutil
import sys
import yaml

################################################################################
## FOLDER OPERATIONS ###########################################################
################################################################################


[docs]def mkdir_p(path): '''mkdir_p attempts to get the same functionality as mkdir -p :param path: the path to create. ''' try: os.makedirs(path) except OSError as e: if e.errno == errno.EEXIST and os.path.isdir(path): pass else: bot.error("Error creating path %s, exiting." % path) sys.exit(1)
[docs]def get_installdir(): return os.path.dirname(os.path.abspath(__file__))
[docs]def find_subdirectories(basepath): ''' Return directories (and sub) starting from a base ''' directories = [] for root, dirnames, filenames in os.walk(basepath): new_directories = [d for d in dirnames if d not in directories] directories = directories + new_directories return directories
[docs]def find_directories(root,fullpath=True): ''' Return directories at one level specified by user (not recursive) ''' directories = [] for item in os.listdir(root): # Don't include hidden directories if not re.match("^[.]",item): if os.path.isdir(os.path.join(root, item)): if fullpath: directories.append(os.path.abspath(os.path.join(root, item))) else: directories.append(item) return directories
[docs]def find_files(root, pattern, fullpath=True): ''' Return files at one level specified by user (not recursive) ''' files = [] for root, dirnames, filenames in os.walk(root): new_files = [f for f in filenames if f not in files] new_files = [ os.path.join(root, f) for f in new_files if re.search(pattern, f)] files = files + new_files return files
[docs]def copy_directory(src, dest, force=False): ''' Copy an entire directory recursively ''' if os.path.exists(dest) and force is True: shutil.rmtree(dest) try: shutil.copytree(src, dest) except OSError as e: # If the error was caused because the source wasn't a directory if e.errno == errno.ENOTDIR: shutil.copy(src, dest) else: bot.error('Directory not copied. Error: %s' % e) sys.exit(1)
################################################################################ ## FILE OPERATIONS ############################################################# ################################################################################
[docs]def write_file(filename, content, mode="w"): '''write_file will open a file, "filename" and write content, "content" and properly close the file ''' with open(filename, mode) as filey: filey.writelines(content) return filename
[docs]def write_json(json_obj, filename, mode="w", print_pretty=True): '''write_json will (optionally,pretty print) a json object to file Parameters ========== json_obj: the dict to print to json filename: the output file to write to pretty_print: if True, will use nicer formatting ''' with open(filename, mode) as filey: if print_pretty: filey.writelines( json.dumps( json_obj, indent=4, separators=( ',', ': '))) else: filey.writelines(json.dumps(json_obj)) return filename
[docs]def read_file(filename, mode="r", readlines=True): '''write_file will open a file, "filename" and write content, "content" and properly close the file ''' with open(filename, mode) as filey: if readlines is True: content = filey.readlines() else: content = filey.read() return content
[docs]def read_json(filename, mode='r'): '''read_json reads in a json file and returns the data structure as dict. ''' with open(filename, mode) as filey: data = json.load(filey) return data
################################################################################ ## YAML ######################################################################## ################################################################################
[docs]def read_yaml(filename, mode='r', quiet=False): '''read a yaml file, only including sections between dashes ''' stream = read_file(filename, mode, readlines=False) return _read_yaml(stream, quiet=quiet)
[docs]def write_yaml(yaml_dict, filename, mode="w"): '''write a dictionary to yaml file Parameters ========== yaml_dict: the dict to print to yaml filename: the output file to write to pretty_print: if True, will use nicer formatting ''' with open(filename, mode) as filey: filey.writelines(yaml.dump(yaml_dict)) return filename
def _read_yaml(section, quiet=False): '''read yaml from a string, either read from file (read_frontmatter) or from yml file proper (read_yaml) Parameters ========== section: a string of unparsed yaml content. ''' metadata = {} docs = yaml.load_all(section) for doc in docs: if isinstance(doc, dict): for k,v in doc.items(): if not quiet: print('%s: %s' %(k,v)) metadata[k] = v return metadata
[docs]def read_frontmatter(filename, mode='r', quiet=False): '''read a yaml file, only including sections between dashes ''' stream = read_file(filename, mode, readlines=False) # The yml section always comes after the --- of the frontmatter section = stream.split('---')[1] return _read_yaml(section, quiet=quiet)
[docs]def read_markdown(filename, mode='r'): '''read the OTHER part of the markdown file (remove the frontend matter) ''' stream = read_file(filename, mode, readlines=False) # The yml section always comes after the --- of the frontmatter return stream.split('---')[-1]
################################################################################ # bibtex ################################################################################
[docs]def read_bibtex(filename, mode='r'): '''read a yaml file, only including sections between dashes ''' from pybtex.database.input import bibtex parser = bibtex.Parser() try: data = parser.parse_file(filename) return data.entries except Exception as e: bot.error(e)
################################################################################ # environment / options ################################################################################
[docs]def load_module(module_str): '''load a module based on a string name. Parameters ========== module_str: complete python path to module (and function). Note that this MUST be a python module (module.py) and not a function in an __init__.py ''' module_str, function = module_str.rsplit('.', 1) module = __import__(module_str, fromlist=['']) return getattr(module, function)
[docs]def convert2boolean(arg): '''convert2boolean is used for environmental variables that must be returned as boolean''' if not isinstance(arg, bool): return arg.lower() in ("yes", "true", "t", "1", "y") return arg
[docs]def getenv(variable_key, default=None, required=False, silent=True): '''getenv will attempt to get an environment variable. If the variable is not found, None is returned. :param variable_key: the variable name :param required: exit with error if not found :param silent: Do not print debugging information for variable ''' variable = os.environ.get(variable_key, default) if variable is None and required: bot.error("Cannot find environment variable %s, exiting." %variable_key) sys.exit(1) if not silent: if variable is not None: bot.verbose2("%s found as %s" %(variable_key,variable)) else: bot.verbose2("%s not defined (None)" %variable_key) return variable