From ab8f1fcf8ff46173c91be794ebd921bf4c4c6825 Mon Sep 17 00:00:00 2001 From: Leonard Excoffier <48970393+excoffierleonard@users.noreply.github.com> Date: Sat, 31 Aug 2024 00:50:17 -0400 Subject: [PATCH] feat: now use batches but seem to write less rows, not sure --- new_write.py | 112 +++++++++++++++++++++++++++------------------------ 1 file changed, 59 insertions(+), 53 deletions(-) diff --git a/new_write.py b/new_write.py index 5e9cabb..0a52a2d 100644 --- a/new_write.py +++ b/new_write.py @@ -26,6 +26,10 @@ json_files = glob.glob('./sec_data/companyfacts/*.json') file_count = len(json_files) current_file = 1 +# Batch size configuration - process `batch_size` files at a time +batch_size = 50 # Adjust this number to your preference or based on system resources +rows = [] + # Data type mapping for the DataFrame to SQL conversion dtype_map = { 'entity_cik': Integer, @@ -46,69 +50,71 @@ dtype_map = { 'frame': String(255) } -for json_file in json_files: - try: - # Load the JSON data - with open(json_file) as f: - data = json.load(f) +# Iterate through the JSON files in batches +for i in range(0, file_count, batch_size): + batch_files = json_files[i:i+batch_size] + + for json_file in batch_files: + try: + # Load the JSON data + with open(json_file) as f: + data = json.load(f) - # Informing the user about the current file being processed - print(f"Processing file {current_file}/{file_count}: {json_file}") - current_file += 1 + # Informing the user about the current file being processed + print(f"Processing file {current_file}/{file_count}: {json_file}") + current_file += 1 - # Check if the JSON has the keys we're interested in - cik = data.get('cik', None) - entity_name = data.get('entityName', None) - facts = data.get('facts', {}) + # Check if the JSON has the keys we're interested in + cik = data.get('cik', None) + entity_name = data.get('entityName', None) + facts = data.get('facts', {}) - # Initialize a list to hold rows - rows = [] + # Skip files that don't have facts to process + if not facts: + print(f"File {json_file} has no facts to process. Skipping...") + continue - # Skip files that don't have facts to process - if not facts: - print(f"File {json_file} has no facts to process. Skipping...") - continue + # Process the facts dynamically + for taxonomy, fact_items in facts.items(): + for fact_id, fact_data in fact_items.items(): + label = fact_data.get('label', None) + description = fact_data.get('description', None) + units = fact_data.get('units', {}) - # Process the facts dynamically - for taxonomy, fact_items in facts.items(): - for fact_id, fact_data in fact_items.items(): - label = fact_data.get('label', None) - description = fact_data.get('description', None) - units = fact_data.get('units', {}) + for unit, details_list in units.items(): + for details in details_list: + # Generate row dictionary dynamically, only updating non-None values + row = { + 'entity_cik': cik, + 'entity_name': entity_name, + 'fact_id': fact_id, + 'fact_taxonomy': taxonomy, + 'fact_label': label, + 'fact_description': description, + 'fact_unit': unit + } - for unit, details_list in units.items(): - for details in details_list: - # Generate row dictionary dynamically, only updating non-None values - row = { - 'entity_cik': cik, - 'entity_name': entity_name, - 'fact_id': fact_id, - 'fact_taxonomy': taxonomy, - 'fact_label': label, - 'fact_description': description, - 'fact_unit': unit - } + # Add the remaining keys in details to row + for key, value in details.items(): + row[key] = value + + # Append the row to rows list + rows.append(row) - # Add the remaining keys in details to row - for key, value in details.items(): - row[key] = value - - # Append the row to rows list - rows.append(row) + except json.JSONDecodeError as e: + print(f"Failed to process file {json_file}: Invalid JSON format. Error: {e}") - # Create DataFrame only if there are rows - if rows: - df = pd.DataFrame(rows) + except Exception as e: + print(f"An error occurred while processing file {json_file}: {e}") - # Write DataFrame to the 'data' table, appending if table exists - df.to_sql('data', con=engine, if_exists='append', index=False, dtype=dtype_map) - else: - print(f"No data rows were created for file {json_file}. Skipping insert...") + # After processing batch_files, insert accumulated rows into the database + if rows: + df = pd.DataFrame(rows) - except json.JSONDecodeError as e: - print(f"Failed to process file {json_file}: Invalid JSON format. Error: {e}") + # Write DataFrame to the 'data' table, appending if table exists + df.to_sql('data', con=engine, if_exists='append', index=False, dtype=dtype_map) - except Exception as e: - print(f"An error occurred while processing file {json_file}: {e}") + # Clear the list of rows for the next batch + rows.clear() print("Processing complete. All files have been handled.") \ No newline at end of file