76 lines
2.8 KiB
Python
76 lines
2.8 KiB
Python
import os
|
|
import pandas as pd
|
|
from sqlalchemy import create_engine, MetaData, Table
|
|
from sqlalchemy.dialects.mysql import insert
|
|
from dotenv import load_dotenv
|
|
import numpy as np
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
# Get DB connection parameters from environment
|
|
DB_USER = os.getenv('DB_USER')
|
|
DB_PASSWORD = os.getenv('DB_PASSWORD')
|
|
DB_HOST = os.getenv('DB_HOST')
|
|
DB_PORT = os.getenv('DB_PORT')
|
|
DB_NAME = os.getenv('DB_NAME')
|
|
|
|
# Create a connection string
|
|
connection_string = f"mariadb+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
|
|
|
|
# Create the SQLAlchemy engine
|
|
engine = create_engine(connection_string)
|
|
|
|
# Define a list of file paths and corresponding table names with primary keys
|
|
file_paths = [
|
|
('sec_data/2014q4/sub.txt', 'sub', ['adsh']),
|
|
('sec_data/2014q4/tag.txt', 'tag', ['tag', 'version']),
|
|
#('sec_data/2015q1/num.txt', 'num', ['adsh', 'tag', 'version', 'coreg', 'ddate', 'qtrs', 'uom']),
|
|
#('sec_data/2015q1/pre.txt', 'pre', ['adsh', 'report', 'line'])
|
|
]
|
|
|
|
# Initialize metadata
|
|
metadata = MetaData()
|
|
|
|
# Loop through each file and write the data to the database
|
|
for i, (file_path, table_name, primary_keys) in enumerate(file_paths):
|
|
print(f"\nAnalyzing {file_path} (File {i+1}/4)...")
|
|
|
|
# Read the data into a Pandas DataFrame
|
|
df = pd.read_csv(file_path, sep='\t')
|
|
|
|
# Get the DataFrame Information
|
|
print("\nSummary Information:")
|
|
print(df.info())
|
|
|
|
# If the file being processed is 'num.txt', fix the `coreg` column
|
|
if table_name == 'num':
|
|
df['coreg'] = df['coreg'].fillna('nocoreg')
|
|
|
|
# Dropping rows with any missing values in the primary keys
|
|
df.dropna(subset=primary_keys, inplace=True)
|
|
|
|
# Ensure all primary key columns are in lower-case for case insensitive deduplication
|
|
for key in primary_keys:
|
|
df[key] = df[key].str.lower()
|
|
|
|
# Replace NaN values with None to ensure compatibility with SQL NULL
|
|
df = df.replace([np.nan, np.inf, -np.inf], None)
|
|
|
|
# Dropping duplicate rows based on primary keys
|
|
df.drop_duplicates(subset=primary_keys, keep='first', inplace=True)
|
|
|
|
# Get Updated Information
|
|
print("\nUpdated Information:")
|
|
print(df.info())
|
|
|
|
# Reflect the already existing table from the database schema
|
|
table = Table(table_name, metadata, autoload_with=engine)
|
|
|
|
# Write the DataFrame to the corresponding table in the MariaDB database
|
|
df.to_sql(table_name, con=engine, if_exists='append', index=False)
|
|
print(f"\nData from {file_path} written to the '{table_name}' table in the database.")
|
|
|
|
print(f"\nCleaned data from {file_path} has been written to the '{table_name}' table in the database with upsert functionality.\n")
|
|
|
|
print("\nAll files have been processed and cleaned data has been written to the database.") |