feat: now use batches but seem to write less rows, not sure

This commit is contained in:
Leonard Excoffier
2024-08-31 00:50:17 -04:00
parent 9c5afa7670
commit ab8f1fcf8f

View File

@@ -26,6 +26,10 @@ json_files = glob.glob('./sec_data/companyfacts/*.json')
file_count = len(json_files) file_count = len(json_files)
current_file = 1 current_file = 1
# Batch size configuration - process `batch_size` files at a time
batch_size = 50 # Adjust this number to your preference or based on system resources
rows = []
# Data type mapping for the DataFrame to SQL conversion # Data type mapping for the DataFrame to SQL conversion
dtype_map = { dtype_map = {
'entity_cik': Integer, 'entity_cik': Integer,
@@ -46,69 +50,71 @@ dtype_map = {
'frame': String(255) 'frame': String(255)
} }
for json_file in json_files: # Iterate through the JSON files in batches
try: for i in range(0, file_count, batch_size):
# Load the JSON data batch_files = json_files[i:i+batch_size]
with open(json_file) as f:
data = json.load(f) for json_file in batch_files:
try:
# Load the JSON data
with open(json_file) as f:
data = json.load(f)
# Informing the user about the current file being processed # Informing the user about the current file being processed
print(f"Processing file {current_file}/{file_count}: {json_file}") print(f"Processing file {current_file}/{file_count}: {json_file}")
current_file += 1 current_file += 1
# Check if the JSON has the keys we're interested in # Check if the JSON has the keys we're interested in
cik = data.get('cik', None) cik = data.get('cik', None)
entity_name = data.get('entityName', None) entity_name = data.get('entityName', None)
facts = data.get('facts', {}) facts = data.get('facts', {})
# Initialize a list to hold rows # Skip files that don't have facts to process
rows = [] if not facts:
print(f"File {json_file} has no facts to process. Skipping...")
continue
# Skip files that don't have facts to process # Process the facts dynamically
if not facts: for taxonomy, fact_items in facts.items():
print(f"File {json_file} has no facts to process. Skipping...") for fact_id, fact_data in fact_items.items():
continue label = fact_data.get('label', None)
description = fact_data.get('description', None)
units = fact_data.get('units', {})
# Process the facts dynamically for unit, details_list in units.items():
for taxonomy, fact_items in facts.items(): for details in details_list:
for fact_id, fact_data in fact_items.items(): # Generate row dictionary dynamically, only updating non-None values
label = fact_data.get('label', None) row = {
description = fact_data.get('description', None) 'entity_cik': cik,
units = fact_data.get('units', {}) 'entity_name': entity_name,
'fact_id': fact_id,
'fact_taxonomy': taxonomy,
'fact_label': label,
'fact_description': description,
'fact_unit': unit
}
for unit, details_list in units.items(): # Add the remaining keys in details to row
for details in details_list: for key, value in details.items():
# Generate row dictionary dynamically, only updating non-None values row[key] = value
row = {
'entity_cik': cik, # Append the row to rows list
'entity_name': entity_name, rows.append(row)
'fact_id': fact_id,
'fact_taxonomy': taxonomy,
'fact_label': label,
'fact_description': description,
'fact_unit': unit
}
# Add the remaining keys in details to row except json.JSONDecodeError as e:
for key, value in details.items(): print(f"Failed to process file {json_file}: Invalid JSON format. Error: {e}")
row[key] = value
# Append the row to rows list
rows.append(row)
# Create DataFrame only if there are rows except Exception as e:
if rows: print(f"An error occurred while processing file {json_file}: {e}")
df = pd.DataFrame(rows)
# Write DataFrame to the 'data' table, appending if table exists # After processing batch_files, insert accumulated rows into the database
df.to_sql('data', con=engine, if_exists='append', index=False, dtype=dtype_map) if rows:
else: df = pd.DataFrame(rows)
print(f"No data rows were created for file {json_file}. Skipping insert...")
except json.JSONDecodeError as e: # Write DataFrame to the 'data' table, appending if table exists
print(f"Failed to process file {json_file}: Invalid JSON format. Error: {e}") df.to_sql('data', con=engine, if_exists='append', index=False, dtype=dtype_map)
except Exception as e: # Clear the list of rows for the next batch
print(f"An error occurred while processing file {json_file}: {e}") rows.clear()
print("Processing complete. All files have been handled.") print("Processing complete. All files have been handled.")