# Copyright (c) 2018, Vanessa Sochat All rights reserved.
# See the LICENSE in the main repository at:
# https://www.github.com/openbases/openbases-python
from openbases.logger import bot
from subprocess import (
Popen,
PIPE,
STDOUT
)
import errno
import json
import os
import re
import shutil
import sys
import yaml
################################################################################
## FOLDER OPERATIONS ###########################################################
################################################################################
[docs]def mkdir_p(path):
'''mkdir_p attempts to get the same functionality as mkdir -p
:param path: the path to create.
'''
try:
os.makedirs(path)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
bot.error("Error creating path %s, exiting." % path)
sys.exit(1)
[docs]def get_installdir():
return os.path.dirname(os.path.abspath(__file__))
[docs]def find_subdirectories(basepath):
'''
Return directories (and sub) starting from a base
'''
directories = []
for root, dirnames, filenames in os.walk(basepath):
new_directories = [d for d in dirnames if d not in directories]
directories = directories + new_directories
return directories
[docs]def find_directories(root,fullpath=True):
'''
Return directories at one level specified by user
(not recursive)
'''
directories = []
for item in os.listdir(root):
# Don't include hidden directories
if not re.match("^[.]",item):
if os.path.isdir(os.path.join(root, item)):
if fullpath:
directories.append(os.path.abspath(os.path.join(root, item)))
else:
directories.append(item)
return directories
[docs]def find_files(root, pattern, fullpath=True):
'''
Return files at one level specified by user
(not recursive)
'''
files = []
for root, dirnames, filenames in os.walk(root):
new_files = [f for f in filenames if f not in files]
new_files = [ os.path.join(root, f)
for f in new_files if re.search(pattern, f)]
files = files + new_files
return files
[docs]def copy_directory(src, dest, force=False):
''' Copy an entire directory recursively
'''
if os.path.exists(dest) and force is True:
shutil.rmtree(dest)
try:
shutil.copytree(src, dest)
except OSError as e:
# If the error was caused because the source wasn't a directory
if e.errno == errno.ENOTDIR:
shutil.copy(src, dest)
else:
bot.error('Directory not copied. Error: %s' % e)
sys.exit(1)
################################################################################
## FILE OPERATIONS #############################################################
################################################################################
[docs]def write_file(filename, content, mode="w"):
'''write_file will open a file, "filename" and write content, "content"
and properly close the file
'''
with open(filename, mode) as filey:
filey.writelines(content)
return filename
[docs]def write_json(json_obj, filename, mode="w", print_pretty=True):
'''write_json will (optionally,pretty print) a json object to file
Parameters
==========
json_obj: the dict to print to json
filename: the output file to write to
pretty_print: if True, will use nicer formatting
'''
with open(filename, mode) as filey:
if print_pretty:
filey.writelines(
json.dumps(
json_obj,
indent=4,
separators=(
',',
': ')))
else:
filey.writelines(json.dumps(json_obj))
return filename
[docs]def read_file(filename, mode="r", readlines=True):
'''write_file will open a file, "filename" and write content, "content"
and properly close the file
'''
with open(filename, mode) as filey:
if readlines is True:
content = filey.readlines()
else:
content = filey.read()
return content
[docs]def read_json(filename, mode='r'):
'''read_json reads in a json file and returns
the data structure as dict.
'''
with open(filename, mode) as filey:
data = json.load(filey)
return data
################################################################################
## YAML ########################################################################
################################################################################
[docs]def read_yaml(filename, mode='r', quiet=False):
'''read a yaml file, only including sections between dashes
'''
stream = read_file(filename, mode, readlines=False)
return _read_yaml(stream, quiet=quiet)
[docs]def write_yaml(yaml_dict, filename, mode="w"):
'''write a dictionary to yaml file
Parameters
==========
yaml_dict: the dict to print to yaml
filename: the output file to write to
pretty_print: if True, will use nicer formatting
'''
with open(filename, mode) as filey:
filey.writelines(yaml.dump(yaml_dict))
return filename
def _read_yaml(section, quiet=False):
'''read yaml from a string, either read from file (read_frontmatter) or
from yml file proper (read_yaml)
Parameters
==========
section: a string of unparsed yaml content.
'''
metadata = {}
docs = yaml.load_all(section)
for doc in docs:
if isinstance(doc, dict):
for k,v in doc.items():
if not quiet:
print('%s: %s' %(k,v))
metadata[k] = v
return metadata
[docs]def read_frontmatter(filename, mode='r', quiet=False):
'''read a yaml file, only including sections between dashes
'''
stream = read_file(filename, mode, readlines=False)
# The yml section always comes after the --- of the frontmatter
section = stream.split('---')[1]
return _read_yaml(section, quiet=quiet)
[docs]def read_markdown(filename, mode='r'):
'''read the OTHER part of the markdown file (remove the frontend matter)
'''
stream = read_file(filename, mode, readlines=False)
# The yml section always comes after the --- of the frontmatter
return stream.split('---')[-1]
################################################################################
# bibtex
################################################################################
[docs]def read_bibtex(filename, mode='r'):
'''read a yaml file, only including sections between dashes
'''
from pybtex.database.input import bibtex
parser = bibtex.Parser()
try:
data = parser.parse_file(filename)
return data.entries
except Exception as e:
bot.error(e)
################################################################################
# environment / options
################################################################################
[docs]def load_module(module_str):
'''load a module based on a string name.
Parameters
==========
module_str: complete python path to module (and function). Note that this
MUST be a python module (module.py) and not a function in an __init__.py
'''
module_str, function = module_str.rsplit('.', 1)
module = __import__(module_str, fromlist=[''])
return getattr(module, function)
[docs]def convert2boolean(arg):
'''convert2boolean is used for environmental variables
that must be returned as boolean'''
if not isinstance(arg, bool):
return arg.lower() in ("yes", "true", "t", "1", "y")
return arg
[docs]def getenv(variable_key, default=None, required=False, silent=True):
'''getenv will attempt to get an environment variable. If the variable
is not found, None is returned.
:param variable_key: the variable name
:param required: exit with error if not found
:param silent: Do not print debugging information for variable
'''
variable = os.environ.get(variable_key, default)
if variable is None and required:
bot.error("Cannot find environment variable %s, exiting." %variable_key)
sys.exit(1)
if not silent:
if variable is not None:
bot.verbose2("%s found as %s" %(variable_key,variable))
else:
bot.verbose2("%s not defined (None)" %variable_key)
return variable