import os import pandas as pd from sqlalchemy import create_engine from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Get DB connection parameters from environment DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') DB_HOST = os.getenv('DB_HOST') DB_PORT = os.getenv('DB_PORT') DB_NAME = os.getenv('DB_NAME') # Create a connection string connection_string = f"mariadb+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" # Create the SQLAlchemy engine engine = create_engine(connection_string) # Define a list of file paths and corresponding table names file_paths = [ ('sec_data/2015q1/sub.txt', 'sub'), ('sec_data/2015q1/tag.txt', 'tag'), ('sec_data/2015q1/num.txt', 'num'), ('sec_data/2015q1/pre.txt', 'pre') ] # Define the expected column names for each table expected_columns = { 'sub': [ 'adsh', 'cik', 'name', 'sic', 'countryba', 'stprba', 'cityba', 'zipba', 'bas1', 'bas2', 'baph', 'countryma', 'stprma', 'cityma', 'zipma', 'mas1', 'mas2', 'countryinc', 'stprinc', 'ein', 'former', 'changed', 'afs', 'wksi', 'fye', 'form', 'period', 'fy', 'fp', 'filed', 'accepted', 'prevrpt', 'detail', 'instance', 'nciks', 'aciks'], 'tag': [ 'tag', 'version', 'custom', 'abstract', 'datatype', 'iord', 'crdr', 'tlabel', 'doc'], 'num': [ 'adsh', 'tag', 'version', 'coreg', 'ddate', 'qtrs', 'uom', 'value', 'footnote'], 'pre': [ 'adsh', 'report', 'line', 'stmt', 'inpth', 'rfile', 'tag', 'version', 'plabel'] } # Loop through each file and write the data to the database for i, (file_path, table_name) in enumerate(file_paths): print(f"\nAnalyzing {file_path} (File {i+1}/4)...") # Read the data into a Pandas DataFrame df = pd.read_csv(file_path, sep='\t') # Inspect the DataFrame print("First rows of the DataFrame:") print(df.head(10)) # Get the DataFrame Information print("\nSummary Information:") print(df.info()) # Check if there are any missing values in the DataFrame missing_values = df.isnull().sum() print("\nMissing Values:") print(missing_values) # Check if the column names match the expected columns if table_name in expected_columns: expected = expected_columns[table_name] print("\nEnsuring that columns match for table:", table_name) df.columns = [col.lower() for col in df.columns] if set(df.columns) != set(expected): missing_cols = set(expected) - set(df.columns) extra_cols = set(df.columns) - set(expected) if missing_cols: print(f"Missing columns in {table_name}: {missing_cols}") if extra_cols: print(f"Extra columns found in {table_name}: {extra_cols}") df = df.drop(columns=list(extra_cols)) df = df.reindex(columns=expected) else: print(f"Column names in {table_name} match the expected schema.") # If the file being processed is 'num.txt', fix the `coreg` column if table_name == 'num': df['coreg'] = df['coreg'].fillna('nocoreg') print("\nUpdated 'coreg' column (NaN values replaced with 'nocoreg'):") print(df[['coreg']].head(10)) # Display first 10 rows of the 'coreg' column for verification # Write the DataFrame to the corresponding table in the MariaDB database # df.to_sql(table_name, con=engine, if_exists='append', index=False) # print(f"\nData from {file_path} written to the '{table_name}' table in the database.") # print("\nAll files have been processed and written to the database.")