diff --git a/write_to_db.py b/write_to_db.py index b22820e..5a4ab40 100644 --- a/write_to_db.py +++ b/write_to_db.py @@ -1,7 +1,9 @@ import os import pandas as pd -from sqlalchemy import create_engine +from sqlalchemy import create_engine, MetaData, Table +from sqlalchemy.dialects.mysql import insert from dotenv import load_dotenv +import numpy as np # Load environment variables from .env file load_dotenv() @@ -27,6 +29,9 @@ file_paths = [ ('sec_data/2015q1/pre.txt', 'pre', ['adsh', 'report', 'line']) ] +# Initialize metadata +metadata = MetaData() + # Loop through each file and write the data to the database for i, (file_path, table_name, primary_keys) in enumerate(file_paths): print(f"\nAnalyzing {file_path} (File {i+1}/4)...") @@ -44,18 +49,41 @@ for i, (file_path, table_name, primary_keys) in enumerate(file_paths): print("\nUpdated 'coreg' column (NaN values replaced with 'nocoreg'):") print(df[['coreg']].head(10)) # Display first 10 rows of the 'coreg' column for verification - # Dropping rows with any missing values in the primary keys and NOT NULL columns + # Dropping rows with any missing values in the primary keys df.dropna(subset=primary_keys, inplace=True) # Dropping duplicate rows based on primary keys - df.drop_duplicates(subset=primary_keys, keep='first', inplace=True) + # df.drop_duplicates(subset=primary_keys, keep='first', inplace=True) + + # Replace NaN values with None to ensure compatibility with SQL NULL + df = df.replace([np.nan, np.inf, -np.inf], None) # Get Updated Information print("\nUpdated Information:") print(df.info()) - # Write the cleaned DataFrame to the corresponding table in the MariaDB database - df.to_sql(table_name, con=engine, if_exists='append', index=False) - print(f"\nCleaned data from {file_path} has been written to the '{table_name}' table in the database.\n") + # Reflect the already existing table from the database schema + table = Table(table_name, metadata, autoload_with=engine) -print("\nAll files have been processed and cleaned data has been written to the database.") \ No newline at end of file + # Perform Upsert operation for each row in the DataFrame + with engine.connect() as conn: + for row in df.itertuples(index=False): + # Create a dictionary of the row data + data = {key: getattr(row, key) for key in df.columns} + + # Prepare insert statement using SQLAlchemy with MySQL-specific ON DUPLICATE KEY UPDATE + insert_stmt = insert(table).values(**data) + + # Construct the `ON DUPLICATE KEY UPDATE` part + update_stmt = insert_stmt.on_duplicate_key_update( + {col.name: insert_stmt.inserted[col.name] for col in table.columns} + ) + + # Execute the upsert statement + conn.execute(update_stmt) + + print(f"\nCleaned data from {file_path} has been written to the '{table_name}' table in the database with upsert functionality.\n") + +print("\nAll files have been processed and cleaned data has been written to the database.") + +#FIXME: Foreign key missing because usgapp is in the past constantly, Q1 gaap is based on the year before gaap. \ No newline at end of file