Skip to content

Reference

moodys_datahub.tools

Sftp

A class to manage SFTP connections and file operations for data transfer.

__init__(hostname=None, username=None, port=22, privatekey=None, data_product_template=None)

Constructor Method

Constructor Parameters
  • hostname (str, optional): Hostname of the SFTP server (default is CBS SFTP server).
  • username (str, optional): Username for authentication (default is CBS SFTP server).
  • port (int, optional): Port number for the SFTP connection (default is 22).
  • privatekey (str, optional): Path to the private key file for authentication (required for SFTP access).
  • data_product_template (str, optional): Template for managing data products during SFTP operations.

Object Attributes: - connection (pysftp.Connection or None): Represents the current SFTP connection, initially set to None. - hostname (str): Hostname for the SFTP server. - username (str): Username for SFTP authentication. - privatekey (str or None): Path to the private key file for SFTP authentication. - port (int): Port number for SFTP connection (default is 22).

File Handling Attributes: - output_format (list of str): Supported output formats for files (default is ['.csv']). - file_size_mb (int): Maximum file size in MB before splitting files (default is 500 MB). - delete_files (bool): Flag indicating whether to delete processed files (default is False). - concat_files (bool): Flag indicating whether to concatenate processed files (default is True). - query (str, function, or None): Query string or function for filtering data (default is None). - query_args (list or None): List of arguments for the query string or function (default is None). - dfs (DataFrame or None): Stores concatenated DataFrames if concatenation is enabled.

connect()

Establish an SFTP connection.

Input Variables: - self: Implicit reference to the instance.

Returns: - SFTP connection object.

copy_obj()

Create a deep copy of the current instance and initialize its defaults.

This method creates a deep copy of the instance, calls the _object_defaults() method to set default values, and then invokes the select_data() method to prepare the copied object for use.

define_options()

Asynchronously define and set file operation options using interactive widgets.

This method allows the user to configure file operation settings such as whether to delete files after processing, concatenate files, specify output format, and define the maximum file size. These options are displayed as interactive widgets, and the user can select their preferred values. Once the options are selected, the instance variables are updated with the new configurations.

Workflow: - A dictionary config is initialized with the current values of key file operation settings (delete_files, concat_files, output_format, file_size_mb). - An instance of _SelectOptions is created with this configuration, displaying the interactive widgets. - The user's selections are awaited asynchronously using the await keyword, ensuring non-blocking behavior. - Once the user makes selections, the corresponding instance variables (delete_files, concat_files, output_format, file_size_mb) are updated with the new values. - A summary of the selected options is printed for confirmation.

Internal Async Function (f): - The internal function f manages the asynchronous behavior, ensuring that the user can interact with the widget without blocking the main thread. - After the user selects the options, the configuration is validated and applied to the class attributes.

Notes: - This method uses asyncio.ensure_future to execute the async function f concurrently, without blocking other tasks. - The config dictionary is updated with the new options chosen by the user. - If no changes are made by the user, the original configuration remains.

Example
# Launch the options configuration process
self.define_options()

Expected Outputs: - Updates the instance variables based on user input: - self.delete_files: Whether to delete files after processing. - self.concat_files: Whether to concatenate files. - self.output_format: The output format of processed files (e.g., .csv, .parquet). - self.file_size_mb: Maximum file size (in MB) before splitting output files.

  • Prints the selected options:
    • Delete Files: True/False
    • Concatenate Files: True/False
    • Output Format: List of formats (e.g., ['.csv'])
    • Output File Size: File size in MB (e.g., 500 MB)

Raises: - ValueError: If the selected options are invalid or conflict with other settings.

Example Output

The following options were selected: Delete Files: True Concatenate Files: False Output Format: ['.csv'] Output File Size: 500 MB

download_all(num_workers=None)

Initiates downloading of all remote files using parallel processing.

This method starts a new process to download files based on the selected data product and table. It uses parallel processing if num_workers is specified, defaulting to cpu_count() - 2 if not. The process is managed using the fork method, which is supported only on Unix systems.

Input Variables: - num_workers (int, optional): Number of workers for parallel processing. Defaults to cpu_count() - 2.

Notes: - This method requires the fork method for parallel processing, which is supported only on Unix systems. - If self._set_data_product or self._set_table is None, the method will call select_data() to initialize them. - Sets self.delete_files to False before starting the download process. - Starts a new process using multiprocessing.Process to run the process_all method for downloading. - Sets self._download_finished to False when starting the process, indicating that the download is in progress.

Example

self.download_all(num_workers=4)

get_column_names(save_to=False, files=None)

Retrieve column names from a DataFrame or dictionary and save them to a file.

Input Variables: - self: Implicit reference to the instance. - save_to (str, optional): Format to save results (default is CSV). - files (list, optional): List of files to retrieve column names from.

Returns: - List of column names or None if no valid source is provided.

orbis_to_moodys(file)

Match headings from an Orbis output file to headings in Moody's DataHub.

This method reads headings from an Orbis output file and matches them to headings in Moody's DataHub. The function returns a DataFrame with matched headings and a list of headings not found in Moody's DataHub.

Input Variables: - file (str): Path to the Orbis output file.

Returns: - tuple: A tuple where: - The first element is a DataFrame containing matched headings. - The second element is a list of headings that were not found.

Notes: - Headings from the Orbis file are processed to remove any extra lines and to ensure uniqueness. - The DataFrame is sorted based on the number of unique headings for each 'Data Product'. - If no headings are found, an empty DataFrame is returned.

Example

matched_df, not_found_list = self.orbis_to_moodys('orbis_output.xlsx')

process_all(files=None, destination=None, num_workers=-1, select_cols=None, date_query=None, bvd_query=None, query=None, query_args=None, pool_method=None)

Read and process multiple files into DataFrames with optional filtering and parallel processing.

This method reads multiple files into Pandas DataFrames, with options for selecting specific columns, applying filters, and performing parallel processing. It can handle file processing sequentially or in parallel, depending on the number of workers specified.

Input Variables: - files (list, optional): List of files to process. Defaults to self.remote_files. - destination (str, optional): Path to save processed files. - num_workers (int, optional): Number of workers for parallel processing. Default is -1 (auto-determined). - select_cols (list, optional): Columns to select from files. Defaults to self._select_cols. - date_query: (optional): Date query for filtering data. Defaults to self.time_period. - bvd_query: (optional): BVD query for filtering data. Defaults to self._bvd_list[2]. - query (str, optional): Additional query for filtering data. - query_args (list, optional): Arguments for the query. - pool_method (optional): Method for parallel processing (e.g., 'fork', 'threading').

Returns: - dfs: List of Pandas DataFrames with selected columns and filtered data. - file_names: List of file names processed.

Notes: - If files is None, the method will use self.remote_files. - If num_workers is less than 1, it will be set automatically based on available system memory. - Uses parallel processing if num_workers is greater than 1; otherwise, processes files sequentially. - Handles file concatenation and deletion based on instance attributes (concat_files, delete_files). - If self.delete_files is True, the method will print the current working directory.

Raises: - ValueError: If validation of arguments (files, destination, etc.) fails.

Example

dfs, file_names = self.process_all(files=['file1.csv', 'file2.csv'], destination='/processed', num_workers=4, select_cols=['col1', 'col2'], date_query='2023-01-01', query='col1 > 0')

process_one(save_to=False, files=None, n_rows=1000)

Retrieve a sample of data from a table and optionally save it to a file.

This method retrieves a sample of data from a specified table or file. If files is not provided, it uses the default file from self.remote_files. It processes the files, retrieves the specified number of rows, and saves the result to the specified format if save_to is provided.

Input Variables: - save_to (str, optional): Format to save the sample data (default is 'CSV'). Other formats may be supported based on implementation. - files (list, optional): List of files to process. Defaults to self.remote_files. If an integer is provided, it is treated as a file identifier. - n_rows (int, optional): Number of rows to retrieve from the data (default is 1000).

Returns: - Pandas DataFrame: DataFrame containing the sample of the processed data.

Notes: - If files is None, the method will use the first file in self.remote_files and may trigger select_data if data product or table is not set. - The method processes all specified files or retrieves data from them if needed. - Results are saved to the file format specified by save_to if provided.

Example

df = self.process_one(save_to='parquet', n_rows=500)

search_bvd_changes(bvd_list, num_workers=-1)

Search for changes in BvD IDs based on the provided list.

This method retrieves changes in BvD IDs by processing the provided list of BvD IDs. It utilizes concurrent processing for efficiency and returns the new IDs, the newest IDs, and a filtered DataFrame containing relevant change information.

bvd_list : list A list of BvD IDs to check for changes.

int, optional

The number of worker processes to use for concurrent operations. If set to -1 (default), it will use the maximum available workers minus two to avoid overloading the system.

tuple A tuple containing: - new_ids: A list of newly identified BvD IDs. - newest_ids: A list of the most recent BvD IDs. - filtered_df: A DataFrame with relevant change information.

Examples: new_ids, newest_ids, changes_df = obj.search_bvd_changes(['BVD123', 'BVD456'])

search_company_names(names, num_workers=-1, cut_off=90.1, company_suffixes=None)

Search for company names and find the best matches based on a fuzzy query.

This method performs a search for company names using fuzzy matching techniques, leveraging concurrent processing to improve performance. It processes the provided names against a dataset of firmographic data and returns the best matches based on the specified cut-off score.

names : list A list of company names to search for.

int, optional

The number of worker processes to use for concurrent operations. If set to -1 (default), it will use the maximum available workers minus two to avoid overloading the system.

float, optional

The cut-off score for considering a match as valid. Default is 90.1.

list, optional

A list of valid company suffixes to consider when searching for names.

pandas.DataFrame A DataFrame containing the best matches for the searched company names, including associated scores and other relevant information.

Examples: results = obj.search_company_names(['Example Inc', 'Sample Ltd'], num_workers=4)

search_country_codes(search_word=None, search_cols={'Country': True, 'Code': True})

Search for country codes matching a search term.

Input Variables: - self: Implicit reference to the instance. - search_word (str, optional): Term to search for country codes. - search_cols (dict, optional): Dictionary indicating columns to search (default is {'Country':True,'Code':True}).

Returns: - Pandas Dataframe of country codes matching the search term

search_dictionary(save_to=False, search_word=None, search_cols={'Data Product': True, 'Table': True, 'Column': True, 'Definition': True}, letters_only=False, extact_match=False, data_product=None, table=None)

Search for a term in a column/variable dictionary and save results to a file.

Args: - self: Implicit reference to the instance. - save_to (str, optional): Format to save results. If False, results are not saved (default is False). - search_word (str, optional): Search term. If None, no term is searched. - search_cols (dict, optional): Dictionary indicating which columns to search. Columns are 'Data Product', 'Table', 'Column', and 'Definition' with default value as True for each. - letters_only (bool, optional): If True, search only for alphabetic characters in the search term (default is False). - exact_match (bool, optional): If True, search for an exact match of the search term. Otherwise, search for partial matches (default is False). - data_product (str, optional): Specific data product to filter results by. If None, no filtering by data product (default is None). - table (str, optional): Specific table to filter results by. If None, no filtering by table (default is None).

Returns: - pandas.DataFrame: A DataFrame containing the search results. If no results are found, an empty DataFrame is returned.

Notes: - If data_product is provided and does not match any records, a message is printed and an empty DataFrame is returned. - If table is provided and does not match any records, it attempts to perform a case-insensitive partial match search. - If search_word is provided and no matches are found, a message is printed indicating no results were found. - If letters_only is True, the search term is processed to include only alphabetic characters before searching. - If save_to is specified, the query results are saved in the format specified.

select_columns()

Asynchronously select and set columns for a specified data product and table using interactive widgets.

This method performs the following steps: 1. Checks if the data product and table are set. If not, it calls select_data() to set them. 2. Searches the dictionary for columns corresponding to the set data product and table. 3. Displays an interactive widget for the user to select columns based on their names and definitions. 4. Sets the selected columns to self._select_cols and prints the selected columns.

If no columns are found for the specified table, a ValueError is raised.

Args: - self: Implicit reference to the instance.

Notes: - This method uses asyncio.ensure_future to run the asynchronous function f which handles the widget interaction. - The function f combines column names and definitions for display, maps selected items to their indices, and then extracts the selected columns based on these indices.

Raises: - ValueError: If no columns are found for the specified table.

Example

self.select_columns()

select_data()

Asynchronously select and set the data product and table using interactive widgets.

This method facilitates user interaction for selecting a data product and table from a backup of available tables. It leverages asynchronous widgets, allowing the user to make selections and automatically updating instance attributes for the selected data product and table. The selections are applied to self.set_data_product and self.set_table, which can be used in subsequent file operations.

Workflow: - An instance of the _SelectData class is created, using _tables_backup to populate the widget options. - Users select a data product and table through the interactive widget. - The method validates the selected options and updates the instance's set_data_product and set_table attributes. - If multiple data products match the selection, a list of options is displayed to the user for further refinement. - The selected data product and table are printed to confirm the operation.

Internal Async Function (f): - The method contains an internal asynchronous function f that handles the widget display and data selection. - It uses the await keyword to ensure non-blocking behavior while the user interacts with the widget. - The selected values are processed and validated before being assigned to the instance variables.

Notes: - This method uses asyncio.ensure_future to ensure that the asynchronous function f runs concurrently without blocking other operations. - If multiple matches for the selected data product are found, the user is prompted to further specify the data product. - The method uses a copy of _tables_backup to preserve the original data structure during the filtering process.

Example
# Trigger the data selection process
self.select_data()

Raises: - ValueError: If no valid data product or table is selected after interaction.

Expected Outputs: - Updates self.set_data_product and self.set_table based on user selections. - Prints confirmation of the selected data product and table.

table_dates(save_to=False, data_product=None, table=None)

Retrieve and save the available date columns for a specified data product and table.

This method performs the following steps: 1. Ensures that the available tables and table dates are loaded. 2. Filters the dates data by the specified data product and table, if provided. 3. Optionally saves the filtered results to a specified format.

Args: - self: Implicit reference to the instance. - save_to (str, optional): Format to save results. If False, results are not saved (default is False). - data_product (str, optional): Specific data product to filter results by. If None, defaults to self.set_data_product. - table (str, optional): Specific table to filter results by. If None, defaults to self.set_table.

Returns: - pandas.DataFrame: A DataFrame containing the filtered dates for the specified data product and table. If no results are found, an empty DataFrame is returned.

Notes: - If data_product is provided and does not match any records, a message is printed and an empty DataFrame is returned. - If table is provided and does not match any records, it attempts to perform a case-insensitive partial match search. - If save_to is specified, the query results are saved in the format specified.

Example

df = self.table_dates(save_to='csv', data_product='Product1', table='TableA')

tables_available(product_overview=None, save_to=False, reset=False)

Retrieve and optionally save available SFTP data products and tables.

This method fetches the available data products and tables from the SFTP server. It can optionally save the results to a file in the specified format and reset the data if needed.

Input Variables: - product_overview (str, optional): Overview of the data products to filter. Defaults to None. - save_to (str, optional): Format to save the results (e.g., 'csv', 'xlsx'). Defaults to 'csv'. - reset (bool, optional): Flag to force refresh and reset the data products and tables. Defaults to False.

Returns: - Pandas DataFrame: DataFrame containing the available SFTP data products and tables.

Notes: - If reset is True, the method will reset _tables_available to _tables_backup. - Old exports may be deleted from the SFTP server based on conditions (e.g., large CPU count). - The results are saved using the _save_to function.

Example

df = self.tables_available(product_overview='overview.xlsx', save_to='csv', reset=True)

fuzzy_query(df, names, match_column=None, return_column=None, cut_off=50, remove_str=None, num_workers=None)

Perform fuzzy string matching with a list of input strings against a specific column in a DataFrame.