import json import pandas as pd from sqlalchemy import create_engine, Integer, String, Float, Date, Text from dotenv import load_dotenv import os import glob # Load environment variables from .env file load_dotenv() # Database connection DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') DB_HOST = os.getenv('DB_HOST') DB_PORT = os.getenv('DB_PORT') DB_NAME = os.getenv('DB_NAME') # Create the connection string db_connection_str = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}' engine = create_engine(db_connection_str) # List all JSON files in the target directory json_files = glob.glob('./sec_data/companyfacts/*.json') # Initialize a counter file_count = len(json_files) current_file = 1 # Batch size configuration - process `batch_size` files at a time batch_size = 50 # Adjust this number to your preference or based on system resources rows = [] # Data type mapping for the DataFrame to SQL conversion dtype_map = { 'entity_cik': Integer, 'entity_name': String(255), 'fact_id': String(255), 'fact_taxonomy': String(255), 'fact_label': String(255), 'fact_description': Text, 'fact_unit': String(255), 'start': Date, 'end': Date, 'val': Float, 'accn': String(50), 'fy': Integer, 'fp': String(255), 'form': String(255), 'filed': Date, 'frame': String(255) } # Iterate through the JSON files in batches for i in range(0, file_count, batch_size): batch_files = json_files[i:i+batch_size] for json_file in batch_files: try: # Load the JSON data with open(json_file) as f: data = json.load(f) # Informing the user about the current file being processed print(f"Processing file {current_file}/{file_count}: {json_file}") current_file += 1 # Check if the JSON has the keys we're interested in cik = data.get('cik', None) entity_name = data.get('entityName', None) facts = data.get('facts', {}) # Skip files that don't have facts to process if not facts: print(f"File {json_file} has no facts to process. Skipping...") continue # Process the facts dynamically for taxonomy, fact_items in facts.items(): for fact_id, fact_data in fact_items.items(): label = fact_data.get('label', None) description = fact_data.get('description', None) units = fact_data.get('units', {}) for unit, details_list in units.items(): for details in details_list: # Generate row dictionary dynamically, only updating non-None values row = { 'entity_cik': cik, 'entity_name': entity_name, 'fact_id': fact_id, 'fact_taxonomy': taxonomy, 'fact_label': label, 'fact_description': description, 'fact_unit': unit } # Add the remaining keys in details to row for key, value in details.items(): row[key] = value # Append the row to rows list rows.append(row) except json.JSONDecodeError as e: print(f"Failed to process file {json_file}: Invalid JSON format. Error: {e}") except Exception as e: print(f"An error occurred while processing file {json_file}: {e}") # After processing batch_files, insert accumulated rows into the database if rows: df = pd.DataFrame(rows) # Write DataFrame to the 'data' table, appending if table exists df.to_sql('data', con=engine, if_exists='append', index=False, dtype=dtype_map) # Clear the list of rows for the next batch rows.clear() print("Processing complete. All files have been handled.")