From 5e38f46e8fca0c1cc82501486ee53122c9155ae2 Mon Sep 17 00:00:00 2001 From: Leonard Excoffier <48970393+excoffierleonard@users.noreply.github.com> Date: Sat, 31 Aug 2024 22:34:04 -0400 Subject: [PATCH] Temp expirimentation, problem with duplicate primary key lowercase uppercase and insert on duplicate, need to find way to bulk insert but still not insert rows that have duplicate. --- write_to_db.py | 43 +++++++++++++++---------------------------- 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/write_to_db.py b/write_to_db.py index 5a4ab40..e41f709 100644 --- a/write_to_db.py +++ b/write_to_db.py @@ -23,10 +23,10 @@ engine = create_engine(connection_string) # Define a list of file paths and corresponding table names with primary keys file_paths = [ - ('sec_data/2015q1/sub.txt', 'sub', ['adsh']), - ('sec_data/2015q1/tag.txt', 'tag', ['tag', 'version']), - ('sec_data/2015q1/num.txt', 'num', ['adsh', 'tag', 'version', 'coreg', 'ddate', 'qtrs', 'uom']), - ('sec_data/2015q1/pre.txt', 'pre', ['adsh', 'report', 'line']) + ('sec_data/2014q4/sub.txt', 'sub', ['adsh']), + ('sec_data/2014q4/tag.txt', 'tag', ['tag', 'version']), + #('sec_data/2015q1/num.txt', 'num', ['adsh', 'tag', 'version', 'coreg', 'ddate', 'qtrs', 'uom']), + #('sec_data/2015q1/pre.txt', 'pre', ['adsh', 'report', 'line']) ] # Initialize metadata @@ -46,17 +46,19 @@ for i, (file_path, table_name, primary_keys) in enumerate(file_paths): # If the file being processed is 'num.txt', fix the `coreg` column if table_name == 'num': df['coreg'] = df['coreg'].fillna('nocoreg') - print("\nUpdated 'coreg' column (NaN values replaced with 'nocoreg'):") - print(df[['coreg']].head(10)) # Display first 10 rows of the 'coreg' column for verification # Dropping rows with any missing values in the primary keys df.dropna(subset=primary_keys, inplace=True) - # Dropping duplicate rows based on primary keys - # df.drop_duplicates(subset=primary_keys, keep='first', inplace=True) - + # Ensure all primary key columns are in lower-case for case insensitive deduplication + for key in primary_keys: + df[key] = df[key].str.lower() + # Replace NaN values with None to ensure compatibility with SQL NULL df = df.replace([np.nan, np.inf, -np.inf], None) + + # Dropping duplicate rows based on primary keys + df.drop_duplicates(subset=primary_keys, keep='first', inplace=True) # Get Updated Information print("\nUpdated Information:") @@ -65,25 +67,10 @@ for i, (file_path, table_name, primary_keys) in enumerate(file_paths): # Reflect the already existing table from the database schema table = Table(table_name, metadata, autoload_with=engine) - # Perform Upsert operation for each row in the DataFrame - with engine.connect() as conn: - for row in df.itertuples(index=False): - # Create a dictionary of the row data - data = {key: getattr(row, key) for key in df.columns} - - # Prepare insert statement using SQLAlchemy with MySQL-specific ON DUPLICATE KEY UPDATE - insert_stmt = insert(table).values(**data) - - # Construct the `ON DUPLICATE KEY UPDATE` part - update_stmt = insert_stmt.on_duplicate_key_update( - {col.name: insert_stmt.inserted[col.name] for col in table.columns} - ) - - # Execute the upsert statement - conn.execute(update_stmt) + # Write the DataFrame to the corresponding table in the MariaDB database + df.to_sql(table_name, con=engine, if_exists='append', index=False) + print(f"\nData from {file_path} written to the '{table_name}' table in the database.") print(f"\nCleaned data from {file_path} has been written to the '{table_name}' table in the database with upsert functionality.\n") -print("\nAll files have been processed and cleaned data has been written to the database.") - -#FIXME: Foreign key missing because usgapp is in the past constantly, Q1 gaap is based on the year before gaap. \ No newline at end of file +print("\nAll files have been processed and cleaned data has been written to the database.") \ No newline at end of file