Temp expirimentation, problem with duplicate primary key lowercase uppercase and insert on duplicate, need to find way to bulk insert but still not insert rows that have duplicate.
This commit is contained in:
@@ -23,10 +23,10 @@ engine = create_engine(connection_string)
|
|||||||
|
|
||||||
# Define a list of file paths and corresponding table names with primary keys
|
# Define a list of file paths and corresponding table names with primary keys
|
||||||
file_paths = [
|
file_paths = [
|
||||||
('sec_data/2015q1/sub.txt', 'sub', ['adsh']),
|
('sec_data/2014q4/sub.txt', 'sub', ['adsh']),
|
||||||
('sec_data/2015q1/tag.txt', 'tag', ['tag', 'version']),
|
('sec_data/2014q4/tag.txt', 'tag', ['tag', 'version']),
|
||||||
('sec_data/2015q1/num.txt', 'num', ['adsh', 'tag', 'version', 'coreg', 'ddate', 'qtrs', 'uom']),
|
#('sec_data/2015q1/num.txt', 'num', ['adsh', 'tag', 'version', 'coreg', 'ddate', 'qtrs', 'uom']),
|
||||||
('sec_data/2015q1/pre.txt', 'pre', ['adsh', 'report', 'line'])
|
#('sec_data/2015q1/pre.txt', 'pre', ['adsh', 'report', 'line'])
|
||||||
]
|
]
|
||||||
|
|
||||||
# Initialize metadata
|
# Initialize metadata
|
||||||
@@ -46,18 +46,20 @@ for i, (file_path, table_name, primary_keys) in enumerate(file_paths):
|
|||||||
# If the file being processed is 'num.txt', fix the `coreg` column
|
# If the file being processed is 'num.txt', fix the `coreg` column
|
||||||
if table_name == 'num':
|
if table_name == 'num':
|
||||||
df['coreg'] = df['coreg'].fillna('nocoreg')
|
df['coreg'] = df['coreg'].fillna('nocoreg')
|
||||||
print("\nUpdated 'coreg' column (NaN values replaced with 'nocoreg'):")
|
|
||||||
print(df[['coreg']].head(10)) # Display first 10 rows of the 'coreg' column for verification
|
|
||||||
|
|
||||||
# Dropping rows with any missing values in the primary keys
|
# Dropping rows with any missing values in the primary keys
|
||||||
df.dropna(subset=primary_keys, inplace=True)
|
df.dropna(subset=primary_keys, inplace=True)
|
||||||
|
|
||||||
# Dropping duplicate rows based on primary keys
|
# Ensure all primary key columns are in lower-case for case insensitive deduplication
|
||||||
# df.drop_duplicates(subset=primary_keys, keep='first', inplace=True)
|
for key in primary_keys:
|
||||||
|
df[key] = df[key].str.lower()
|
||||||
|
|
||||||
# Replace NaN values with None to ensure compatibility with SQL NULL
|
# Replace NaN values with None to ensure compatibility with SQL NULL
|
||||||
df = df.replace([np.nan, np.inf, -np.inf], None)
|
df = df.replace([np.nan, np.inf, -np.inf], None)
|
||||||
|
|
||||||
|
# Dropping duplicate rows based on primary keys
|
||||||
|
df.drop_duplicates(subset=primary_keys, keep='first', inplace=True)
|
||||||
|
|
||||||
# Get Updated Information
|
# Get Updated Information
|
||||||
print("\nUpdated Information:")
|
print("\nUpdated Information:")
|
||||||
print(df.info())
|
print(df.info())
|
||||||
@@ -65,25 +67,10 @@ for i, (file_path, table_name, primary_keys) in enumerate(file_paths):
|
|||||||
# Reflect the already existing table from the database schema
|
# Reflect the already existing table from the database schema
|
||||||
table = Table(table_name, metadata, autoload_with=engine)
|
table = Table(table_name, metadata, autoload_with=engine)
|
||||||
|
|
||||||
# Perform Upsert operation for each row in the DataFrame
|
# Write the DataFrame to the corresponding table in the MariaDB database
|
||||||
with engine.connect() as conn:
|
df.to_sql(table_name, con=engine, if_exists='append', index=False)
|
||||||
for row in df.itertuples(index=False):
|
print(f"\nData from {file_path} written to the '{table_name}' table in the database.")
|
||||||
# Create a dictionary of the row data
|
|
||||||
data = {key: getattr(row, key) for key in df.columns}
|
|
||||||
|
|
||||||
# Prepare insert statement using SQLAlchemy with MySQL-specific ON DUPLICATE KEY UPDATE
|
|
||||||
insert_stmt = insert(table).values(**data)
|
|
||||||
|
|
||||||
# Construct the `ON DUPLICATE KEY UPDATE` part
|
|
||||||
update_stmt = insert_stmt.on_duplicate_key_update(
|
|
||||||
{col.name: insert_stmt.inserted[col.name] for col in table.columns}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Execute the upsert statement
|
|
||||||
conn.execute(update_stmt)
|
|
||||||
|
|
||||||
print(f"\nCleaned data from {file_path} has been written to the '{table_name}' table in the database with upsert functionality.\n")
|
print(f"\nCleaned data from {file_path} has been written to the '{table_name}' table in the database with upsert functionality.\n")
|
||||||
|
|
||||||
print("\nAll files have been processed and cleaned data has been written to the database.")
|
print("\nAll files have been processed and cleaned data has been written to the database.")
|
||||||
|
|
||||||
#FIXME: Foreign key missing because usgapp is in the past constantly, Q1 gaap is based on the year before gaap.
|
|
||||||
Reference in New Issue
Block a user