clean slate

This commit is contained in:
Leonard Excoffier
2024-08-31 19:14:02 -04:00
parent 91272506e3
commit fa64f81cb8
2 changed files with 30 additions and 68 deletions

View File

@@ -1,3 +1,13 @@
-- @block
-- Reset db
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS num;
DROP TABLE IF EXISTS pre;
DROP TABLE IF EXISTS sub;
DROP TABLE IF EXISTS tag;
SET FOREIGN_KEY_CHECKS = 1;
-- @end
-- @block -- @block
-- Create tables -- Create tables
-- Create SUB table -- Create SUB table
@@ -90,17 +100,6 @@ CREATE TABLE pre (
); );
-- @end -- @end
-- @block
-- Reset db
SET FOREIGN_KEY_CHECKS = 0;
DROP TABLE IF EXISTS num;
DROP TABLE IF EXISTS pre;
DROP TABLE IF EXISTS sub;
DROP TABLE IF EXISTS tag;
SET FOREIGN_KEY_CHECKS = 1;
-- @end\
-- baph is 12 in new 20 in old -- baph is 12 in new 20 in old
-- doc is 2048 in new unlimitted in old -- doc is 2048 in new unlimitted in old

View File

@@ -19,80 +19,43 @@ connection_string = f"mariadb+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PO
# Create the SQLAlchemy engine # Create the SQLAlchemy engine
engine = create_engine(connection_string) engine = create_engine(connection_string)
# Define a list of file paths and corresponding table names # Define a list of file paths and corresponding table names with primary keys
file_paths = [ file_paths = [
('sec_data/2015q1/sub.txt', 'sub'), ('sec_data/2015q1/sub.txt', 'sub', ['adsh']),
('sec_data/2015q1/tag.txt', 'tag'), ('sec_data/2015q1/tag.txt', 'tag', ['tag', 'version']),
('sec_data/2015q1/num.txt', 'num'), ('sec_data/2015q1/num.txt', 'num', ['adsh', 'tag', 'version', 'coreg', 'ddate', 'qtrs', 'uom']),
('sec_data/2015q1/pre.txt', 'pre') ('sec_data/2015q1/pre.txt', 'pre', ['adsh', 'report', 'line'])
] ]
# Define the expected column names for each table
expected_columns = {
'sub': [
'adsh', 'cik', 'name', 'sic', 'countryba', 'stprba', 'cityba', 'zipba',
'bas1', 'bas2', 'baph', 'countryma', 'stprma', 'cityma', 'zipma',
'mas1', 'mas2', 'countryinc', 'stprinc', 'ein', 'former', 'changed',
'afs', 'wksi', 'fye', 'form', 'period', 'fy', 'fp', 'filed',
'accepted', 'prevrpt', 'detail', 'instance', 'nciks', 'aciks'],
'tag': [
'tag', 'version', 'custom', 'abstract', 'datatype', 'iord', 'crdr',
'tlabel', 'doc'],
'num': [
'adsh', 'tag', 'version', 'coreg', 'ddate', 'qtrs', 'uom', 'value',
'footnote'],
'pre': [
'adsh', 'report', 'line', 'stmt', 'inpth', 'rfile', 'tag',
'version', 'plabel']
}
# Loop through each file and write the data to the database # Loop through each file and write the data to the database
for i, (file_path, table_name) in enumerate(file_paths): for i, (file_path, table_name, primary_keys) in enumerate(file_paths):
print(f"\nAnalyzing {file_path} (File {i+1}/4)...") print(f"\nAnalyzing {file_path} (File {i+1}/4)...")
# Read the data into a Pandas DataFrame # Read the data into a Pandas DataFrame
df = pd.read_csv(file_path, sep='\t') df = pd.read_csv(file_path, sep='\t')
# Inspect the DataFrame
print("First rows of the DataFrame:")
print(df.head(10))
# Get the DataFrame Information # Get the DataFrame Information
print("\nSummary Information:") print("\nSummary Information:")
print(df.info()) print(df.info())
# Check if there are any missing values in the DataFrame
missing_values = df.isnull().sum()
print("\nMissing Values:")
print(missing_values)
# Check if the column names match the expected columns
if table_name in expected_columns:
expected = expected_columns[table_name]
print("\nEnsuring that columns match for table:", table_name)
df.columns = [col.lower() for col in df.columns]
if set(df.columns) != set(expected):
missing_cols = set(expected) - set(df.columns)
extra_cols = set(df.columns) - set(expected)
if missing_cols:
print(f"Missing columns in {table_name}: {missing_cols}")
if extra_cols:
print(f"Extra columns found in {table_name}: {extra_cols}")
df = df.drop(columns=list(extra_cols))
df = df.reindex(columns=expected)
else:
print(f"Column names in {table_name} match the expected schema.")
# If the file being processed is 'num.txt', fix the `coreg` column # If the file being processed is 'num.txt', fix the `coreg` column
if table_name == 'num': if table_name == 'num':
df['coreg'] = df['coreg'].fillna('nocoreg') df['coreg'] = df['coreg'].fillna('nocoreg')
print("\nUpdated 'coreg' column (NaN values replaced with 'nocoreg'):") print("\nUpdated 'coreg' column (NaN values replaced with 'nocoreg'):")
print(df[['coreg']].head(10)) # Display first 10 rows of the 'coreg' column for verification print(df[['coreg']].head(10)) # Display first 10 rows of the 'coreg' column for verification
# Write the DataFrame to the corresponding table in the MariaDB database # Dropping rows with any missing values in the primary keys and NOT NULL columns
# df.to_sql(table_name, con=engine, if_exists='append', index=False) df.dropna(subset=primary_keys, inplace=True)
# print(f"\nData from {file_path} written to the '{table_name}' table in the database.")
# print("\nAll files have been processed and written to the database.") # Dropping duplicate rows based on primary keys
df.drop_duplicates(subset=primary_keys, keep='first', inplace=True)
# Get Updated Information
print("\nUpdated Information:")
print(df.info())
# Write the cleaned DataFrame to the corresponding table in the MariaDB database
df.to_sql(table_name, con=engine, if_exists='append', index=False)
print(f"\nCleaned data from {file_path} has been written to the '{table_name}' table in the database.\n")
print("\nAll files have been processed and cleaned data has been written to the database.")