#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Victor Calderon
# Created : 2018-05-02
# Last Modified: 2018-05-02
from __future__ import absolute_import, division, print_function
__author__ = ['Victor Calderon']
__copyright__ = ["Copyright 2018 Victor Calderon"]
__email__ = ['victor.calderon@vanderbilt.edu']
__maintainer__ = ['Victor Calderon']
__all__ = [ "Program_Msg",
"Index",
"get_immediate_subdirectories",
"Path_Folder",
"File_Exists",
"File_Download_needed",
"MarkParametrize"]
"""
Utilities for verifying file existence, directory paths, etc.
"""
## Import modules
import os
import subprocess
import time
import numpy as np
from pathlib import Path
from cosmo_utils.utils import file_utils as fd
from cosmo_utils.custom_exceptions import LSSUtils_Error
## Functions
# Message for a filename. It displays the basename of the filename
[docs]def Program_Msg(filename):
"""
Program message for `filename`
Parameters
----------
filename : string
Path to the filename being used
Returns
----------
prog_msg : string
String message for given `filename`
"""
try:
assert(os.path.exists(filename))
# Creating message
prog_msg = '\n\t\t==> {} >>> '.format(os.path.basename(filename))
except:
msg = '>>> `filename` {} not found! Exiting!'.format(filename)
raise ValueError(msg)
return prog_msg
## Compiles the array of files with matching pattern
[docs]def Index(pathdir, datatype, sort=True, basename=False):
"""
Indexes the files in a directory `pathdir` with a specific data type
`datatype`.
Parameters
----------
pathdir : string
Path to the directory being analyzed
datatype : string
Type of documents to look for.
sort : `bool`, optional (default = True)
If this is set to True, the output list is sorted by name
basename : `bool`, optional
If this is set to True, the output list will contain only the
basename of the files in `pathdir`
Returns
----------
file_arr : np.ndarray
List of (sorted) files in `pathdir` with datatype `.datatype`
"""
# Checking that directory exists
if os.path.exists(pathdir):
# List of files
path_obj = Path(os.path.abspath(pathdir))
file_arr = list(path_obj.rglob('*{0}'.format(datatype)))
file_arr = np.array([x.as_posix() for x in file_arr])
# Sorting
if sort:
file_arr = np.sort(file_arr)
# Basenames
if basename:
file_arr = np.array([os.path.basename(x) for x in file_arr])
else:
msg = '{0} `pathdir` {1} not found!'.format(Program_Msg(__file__),
pathdir)
raise LSSUtils_Error(msg)
return file_arr
## Immediate subdirectories for a given directory
## Creates a Folder if needed
[docs]def Path_Folder(pathdir, time_sleep=0.5):
"""
Creates a folder if it does not exist already
Parameters
----------
pathdir : str
Path to the desired directory
time_sleep : float, optional
Amount of time in seconds to `sleep` or wait for the process to
finish. By default `time_slee` is set to 0.5 seconds.
"""
if os.path.exists(pathdir):
pass
else:
while True:
try:
os.makedirs(pathdir)
break
except OSError as e:
if e.errno != 17:
raise
## Adjusting time_sleep
time.sleep(time_sleep)
pass
## Checking if a file exists
[docs]def File_Exists(filename):
"""
Detrmines if file exists or not
Parameters
-----------
filename : str
Absolute path to the file
Raises
-----------
LSSUtils_Error : Exception
"""
if os.path.exists(os.path.abspath(filename)):
try:
assert(os.path.isfile(os.path.abspath(filename)))
except:
msg = '{0} `filename` {1} not found!'.format(Program_Msg(__file__),
filename)
raise LSSUtils_Error(msg)
else:
msg = '{0} `filename` {1} not found!'.format(Program_Msg(__file__),
filename)
raise LSSUtils_Error(msg)
## Downloading a file from a remote server
[docs]def File_Download_needed(localpath, remotepath):
"""
Determines if there exists a local copy of a file.
If not, the file is downloaded from the remote server and a copy of
the file is saved locally
Parameters
----------
localpath : str
Local path to the file.
remotepath : str
Remote path to the file. This is the URL of the file, if there is
no local copy of the file
"""
file_msg = Program_Msg(__file__)
# File extension - Remotely
web_ext = os.path.splitext(remotepath)[1]
# Checking if there is a local copy of the file
if (os.path.exists(localpath) and os.path.isfile(localpath)):
pass
else:
## Downloading file
# Testing for different kinds of files
if web_ext == '.gz':
cmd = 'wget {0} -O {0}{1}'.format(remotepath, localpath, web_ext)
print('{0} {1}'.format(file_msg, cmd))
subprocess.call(cmd, shell=True)
# Unzipping file
cmd = 'gzip -d {0}'.format(localpath + web_ext)
print('{0} {1}'.format(file_msg, cmd))
subprocess.call(cmd, shell=True)
if web_ext == '.tar':
cmd = 'wget {0} -O {0}{1}'.format(remotepath, localpath, web_ext)
print('{0} {1}'.format(file_msg, cmd))
subprocess.call(cmd, shell=True)
# Unzipping file
local_dir = os.path.dirname(localpath)
cmd = 'tar zxf {0} - C {1}'.format(localpath + web_ext, local_dir)
print('{0} {1}'.format(file_msg, cmd))
subprocess.call(cmd, shell=True)
else:
cmd = 'wget {0} -O {1}'.format(remotepath, localpath)
print('{0} {1}'.format(file_msg, cmd))
subprocess.call(cmd, shell=True)
##
## Checking that file exists
File_Exists(localpath)
## Parametrize set of inputs to a function
[docs]class MarkParametrize(object):
"""
Parametrizes a set of values and changes the input variables
of a function.
"""
def __init__(self, argname, argvalues):
"""
Initializes class object.
Parameters
----------
argname : `str`
Key of the element to change in main dictionary.
It can only contain 1 word at a time.
argvalues : array-like
List of argvalues for each of the `argnames`.
This list will be used to loop over the values and
replace them into the main dictionary.
Notes
----------
This function loops over the many different elements in `argvalues`.
This function is meant to be used as a `decorator` for some
function whose input a dictionary.
"""
file_msg = fd.Program_Msg(__file__)
## Check input parameters
# `argname`
if not (isinstance(argname, str)):
msg = '{0} `argname` ({1}) must be a string'.format(
file_msg, type(argname))
raise TypeError(msg)
# `argvalues`
if not (isinstance(argvalues, (tuple, list))):
msg = '{0} `argvalues` ({1}) must be a tuple or list'.format(
file_msg, type(argvalues))
raise TypeError(msg)
## Assigning to class variables
self.argname = argname
self.argvalues = argvalues
self.file_msg = file_msg
##
## Decorator
[docs] def __call__(self, func):
"""
This function gets executed when calling the function.
This functions changes the value of arguments in `p_dict`, and
runs the function `func` with the given parameters.
Parameters
----------
func : `function`
Function that will be decorated.
"""
def wrapped_func(*args, **kwargs):
"""
This function changes the elements in `kwargs`
"""
# Extracting main dictionary
try:
p_dict = kwargs['p_dict']
except KeyError:
try:
p_dict = args[0]
except IndexError:
msg = '{0} Could not read the input dictionary `p_dict`.'
msg += 'Please check your function!'
msg = msg.format(self.file_msg)
raise LSSUtils_Error(msg)
# Checking if `p_dict` is a dictionary
if not (isinstance(p_dict, dict)):
msg = '{0} `p_dict` must be a dictionary!'.format(
self.file_msg, type(p_dict))
raise TypeError(msg)
## Parsing elements
# `argvalues`
argvalues = np.asarray(self.argvalues)
##
## Looping over elements
for argval in argvalues:
p_dict[self.argname] = argval
func(p_dict=p_dict)
return wrapped_func