refactor: total change, no longer ripping gigabytes but going directly to datasets, must find way to automate download.
This commit is contained in:
@@ -48,13 +48,13 @@ SUBMISSIONS_URL = (
|
|||||||
)
|
)
|
||||||
|
|
||||||
# File paths to save the zip files
|
# File paths to save the zip files
|
||||||
companyfacts_zip = os.path.join(SEC_DATA_DIR, "companyfacts.zip")
|
#companyfacts_zip = os.path.join(SEC_DATA_DIR, "companyfacts.zip")
|
||||||
#submissions_zip = os.path.join(SEC_DATA_DIR, "submissions.zip")
|
submissions_zip = os.path.join(SEC_DATA_DIR, "submissions.zip")
|
||||||
|
|
||||||
# Download the files
|
# Download the files
|
||||||
download_file(COMPANYFACTS_URL, companyfacts_zip)
|
#download_file(COMPANYFACTS_URL, companyfacts_zip)
|
||||||
#download_file(SUBMISSIONS_URL, submissions_zip)
|
download_file(SUBMISSIONS_URL, submissions_zip)
|
||||||
|
|
||||||
# Extract the files into respective directories
|
# Extract the files into respective directories
|
||||||
extract_zip(companyfacts_zip, COMPANYFACTS_DIR)
|
#extract_zip(companyfacts_zip, COMPANYFACTS_DIR)
|
||||||
#extract_zip(submissions_zip, SUBMISSIONS_DIR)
|
extract_zip(submissions_zip, SUBMISSIONS_DIR)
|
||||||
118
db_schema.sql
118
db_schema.sql
@@ -1,55 +1,75 @@
|
|||||||
CREATE TABLE IF NOT EXISTS entities (
|
CREATE TABLE num (
|
||||||
cik INT PRIMARY KEY, -- CIK is now the primary key, ensuring uniqueness
|
adsh VARCHAR(255),
|
||||||
name VARCHAR(255) NOT NULL -- Name of the company
|
tag VARCHAR(255),
|
||||||
|
version VARCHAR(255),
|
||||||
|
coreg VARCHAR(255),
|
||||||
|
ddate BIGINT,
|
||||||
|
qtrs BIGINT,
|
||||||
|
uom VARCHAR(50),
|
||||||
|
value DOUBLE PRECISION,
|
||||||
|
footnote TEXT
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS facts (
|
CREATE TABLE pre (
|
||||||
id VARCHAR(255) PRIMARY KEY, -- Unique identifier for the fact
|
adsh VARCHAR(255),
|
||||||
taxonomy VARCHAR(255), -- Taxonomy of the fact
|
report BIGINT,
|
||||||
label VARCHAR(255), -- Label of the fact
|
line BIGINT,
|
||||||
description TEXT, -- Description of the fact
|
stmt VARCHAR(255),
|
||||||
unit VARCHAR(255) -- Unit of the fact
|
inpth BIGINT,
|
||||||
|
rfile VARCHAR(255),
|
||||||
|
tag VARCHAR(255),
|
||||||
|
version VARCHAR(255),
|
||||||
|
plabel VARCHAR(255),
|
||||||
|
negating BIGINT
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS data (
|
CREATE TABLE sub (
|
||||||
cik INT, -- CIK of the company
|
adsh VARCHAR(255),
|
||||||
fact_id VARCHAR(255),
|
cik BIGINT,
|
||||||
end DATE,
|
name VARCHAR(255),
|
||||||
start DATE, -- Start date of the fact
|
sic DOUBLE PRECISION,
|
||||||
val INT,
|
countryba VARCHAR(100),
|
||||||
accn VARCHAR(255),
|
stprba VARCHAR(100),
|
||||||
fy INT,
|
cityba VARCHAR(255),
|
||||||
fp VARCHAR(255),
|
zipba VARCHAR(50),
|
||||||
form VARCHAR(255),
|
bas1 VARCHAR(255),
|
||||||
filed DATE,
|
bas2 VARCHAR(255),
|
||||||
frame VARCHAR(255),
|
baph VARCHAR(255),
|
||||||
PRIMARY KEY (cik, fact_id, end),
|
countryma VARCHAR(100),
|
||||||
FOREIGN KEY (cik) REFERENCES entities(cik),
|
stprma VARCHAR(100),
|
||||||
FOREIGN KEY (fact_id) REFERENCES facts(id)
|
cityma VARCHAR(255),
|
||||||
|
zipma VARCHAR(50),
|
||||||
|
mas1 VARCHAR(255),
|
||||||
|
mas2 VARCHAR(255),
|
||||||
|
countryinc VARCHAR(100),
|
||||||
|
stprinc VARCHAR(100),
|
||||||
|
ein BIGINT,
|
||||||
|
former VARCHAR(255),
|
||||||
|
changed DOUBLE PRECISION,
|
||||||
|
afs VARCHAR(255),
|
||||||
|
wksi BIGINT,
|
||||||
|
fye DOUBLE PRECISION,
|
||||||
|
form VARCHAR(50),
|
||||||
|
period DOUBLE PRECISION,
|
||||||
|
fy DOUBLE PRECISION,
|
||||||
|
fp VARCHAR(50),
|
||||||
|
filed BIGINT,
|
||||||
|
accepted VARCHAR(255),
|
||||||
|
prevrpt BIGINT,
|
||||||
|
detail BIGINT,
|
||||||
|
instance VARCHAR(255),
|
||||||
|
nciks BIGINT,
|
||||||
|
aciks VARCHAR(255)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
CREATE TABLE tag (
|
||||||
-- @block
|
tag VARCHAR(255),
|
||||||
CREATE TABLE IF NOT EXISTS data (
|
version VARCHAR(255),
|
||||||
entity_cik INT,
|
custom BIGINT,
|
||||||
entity_name VARCHAR(255),
|
abstract BIGINT,
|
||||||
fact_id VARCHAR(255),
|
datatype VARCHAR(255),
|
||||||
fact_taxonomy VARCHAR(255),
|
iord VARCHAR(50),
|
||||||
fact_label VARCHAR(255),
|
crdr VARCHAR(50),
|
||||||
fact_description TEXT,
|
tlabel VARCHAR(255),
|
||||||
fact_unit VARCHAR(255)
|
doc TEXT
|
||||||
end DATE,
|
);
|
||||||
val FLOAT,
|
|
||||||
accn VARCHAR(50),
|
|
||||||
fy INT,
|
|
||||||
fp VARCHAR(255),
|
|
||||||
form VARCHAR(255),
|
|
||||||
filed DATE,
|
|
||||||
frame VARCHAR(255),
|
|
||||||
start DATE,
|
|
||||||
PRIMARY KEY (entity_cik, fact_id, end, form),
|
|
||||||
|
|
||||||
)
|
|
||||||
|
|
||||||
-- @block
|
|
||||||
CREATE TABLE IF NOT EXISTS data ();
|
|
||||||
139
write_to_db.py
139
write_to_db.py
@@ -1,129 +1,18 @@
|
|||||||
import os
|
import pandas as pd
|
||||||
import mariadb
|
|
||||||
import json
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
# Load environment variables from .env file
|
# Read the data into a Pandas DataFrame
|
||||||
load_dotenv()
|
file_path = 'sec_data/2024q1/tag.txt'
|
||||||
|
df = pd.read_csv(file_path, sep='\t')
|
||||||
|
|
||||||
def connect_to_db():
|
# Inspect the DataFrame
|
||||||
try:
|
print("First rows of the DataFrame:")
|
||||||
# Read the connection parameters from the environment
|
print(df.head(10))
|
||||||
conn = mariadb.connect(
|
|
||||||
user=os.getenv("DB_USER"),
|
|
||||||
password=os.getenv("DB_PASSWORD"),
|
|
||||||
host=os.getenv("DB_HOST"),
|
|
||||||
port=int(os.getenv("DB_PORT")),
|
|
||||||
database=os.getenv("DB_NAME")
|
|
||||||
)
|
|
||||||
return conn
|
|
||||||
except mariadb.Error as e:
|
|
||||||
print(f"Error connecting to MariaDB: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def insert_entity(cursor, cik, entity_name):
|
# Get the DataFrame Information
|
||||||
cursor.execute(
|
print("\nSummary Information:")
|
||||||
"INSERT IGNORE INTO entities (cik, name) VALUES (?, ?)", (cik, entity_name))
|
print(df.info())
|
||||||
|
|
||||||
def insert_fact(cursor, taxonomy, fact_id, label, description, unit):
|
# Check if there are any missing values in the DataFrame
|
||||||
cursor.execute(
|
missing_values = df.isnull().sum()
|
||||||
"INSERT IGNORE INTO facts (id, taxonomy, label, description, unit) VALUES (?, ?, ?, ?, ?)",
|
print("\nMissing Values:")
|
||||||
(fact_id, taxonomy, label, description, unit)
|
print(missing_values)
|
||||||
)
|
|
||||||
|
|
||||||
def insert_data(cursor, cik, fact_id, start, end, val, accn, fy, fp, form, filed, frame):
|
|
||||||
cursor.execute(
|
|
||||||
"""INSERT IGNORE INTO data (cik, fact_id, end, start, val, accn, fy, fp, form, filed, frame)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
||||||
(cik, fact_id, end, start, val, accn, fy, fp, form, filed, frame)
|
|
||||||
)
|
|
||||||
|
|
||||||
def cik_exists(cursor, cik):
|
|
||||||
cursor.execute("SELECT 1 FROM entities WHERE cik = ?", (cik,))
|
|
||||||
return cursor.fetchone() is not None
|
|
||||||
|
|
||||||
def parse_json_and_insert_data(file_path):
|
|
||||||
with open(file_path, 'r') as file:
|
|
||||||
data = json.load(file)
|
|
||||||
|
|
||||||
cik = data.get('cik')
|
|
||||||
|
|
||||||
# Start a new connection for each file
|
|
||||||
conn = connect_to_db()
|
|
||||||
if conn is None:
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
cursor = conn.cursor()
|
|
||||||
|
|
||||||
# Optional: Check if cik already exists in the database.
|
|
||||||
# You can comment this block out if you do not want this check.
|
|
||||||
if cik_exists(cursor, cik):
|
|
||||||
print(f"CIK {cik} already exists in the database. Skipping file {file_path}.")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Insert the entity
|
|
||||||
entity_name = data.get('entityName')
|
|
||||||
insert_entity(cursor, cik, entity_name)
|
|
||||||
|
|
||||||
# Iterate over facts
|
|
||||||
for taxonomy, fact_details in data['facts'].items():
|
|
||||||
for fact_id, fact in fact_details.items():
|
|
||||||
# Get fact details
|
|
||||||
label = fact.get('label')
|
|
||||||
description = fact.get('description')
|
|
||||||
|
|
||||||
for unit, unit_vals in fact.get('units', {}).items():
|
|
||||||
# Insert fact
|
|
||||||
insert_fact(cursor, taxonomy, fact_id, label, description, unit)
|
|
||||||
|
|
||||||
# Insert each data point
|
|
||||||
for entry in unit_vals:
|
|
||||||
start = entry.get('start', None)
|
|
||||||
end = entry['end']
|
|
||||||
val = entry['val']
|
|
||||||
accn = entry['accn']
|
|
||||||
fy = entry['fy']
|
|
||||||
fp = entry['fp']
|
|
||||||
form = entry['form']
|
|
||||||
filed = entry['filed']
|
|
||||||
frame = entry.get('frame', None)
|
|
||||||
|
|
||||||
insert_data(cursor, cik, fact_id, start, end, val, accn, fy, fp, form, filed, frame)
|
|
||||||
|
|
||||||
# Commit transaction
|
|
||||||
conn.commit()
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error occurred while processing {file_path}: {e}")
|
|
||||||
conn.rollback()
|
|
||||||
return False
|
|
||||||
finally:
|
|
||||||
cursor.close()
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
def process_all_files_in_directory(directory_path):
|
|
||||||
files = [f for f in os.listdir(directory_path) if f.endswith('.json')]
|
|
||||||
total_files = len(files)
|
|
||||||
processed_files = 0
|
|
||||||
|
|
||||||
for idx, file_name in enumerate(files, start=1):
|
|
||||||
file_path = os.path.join(directory_path, file_name)
|
|
||||||
print(f"Processing file {idx} of {total_files}: {file_name}")
|
|
||||||
|
|
||||||
if parse_json_and_insert_data(file_path):
|
|
||||||
processed_files += 1
|
|
||||||
print(f"Successfully processed {file_name}")
|
|
||||||
else:
|
|
||||||
print(f"Failed to process {file_name}")
|
|
||||||
|
|
||||||
print(f"Finished processing {processed_files} out of {total_files} files.")
|
|
||||||
|
|
||||||
def main():
|
|
||||||
# Process all JSON files in the directory
|
|
||||||
directory_path = './sec_data/companyfacts/'
|
|
||||||
process_all_files_in_directory(directory_path)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
|
|||||||
Reference in New Issue
Block a user