Files
stockdb/write_to_db.py

129 lines
4.6 KiB
Python

import os
import mariadb
import json
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
def connect_to_db():
try:
# Read the connection parameters from the environment
conn = mariadb.connect(
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
host=os.getenv("DB_HOST"),
port=int(os.getenv("DB_PORT")),
database=os.getenv("DB_NAME")
)
return conn
except mariadb.Error as e:
print(f"Error connecting to MariaDB: {e}")
return None
def insert_entity(cursor, cik, entity_name):
cursor.execute(
"INSERT IGNORE INTO entities (cik, name) VALUES (?, ?)", (cik, entity_name))
def insert_fact(cursor, taxonomy, fact_id, label, description, unit):
cursor.execute(
"INSERT IGNORE INTO facts (id, taxonomy, label, description, unit) VALUES (?, ?, ?, ?, ?)",
(fact_id, taxonomy, label, description, unit)
)
def insert_data(cursor, cik, fact_id, start, end, val, accn, fy, fp, form, filed, frame):
cursor.execute(
"""INSERT IGNORE INTO data (cik, fact_id, start, end, val, accn, fy, fp, form, filed, frame)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(cik, fact_id, start, end, val, accn, fy, fp, form, filed, frame)
)
def cik_exists(cursor, cik):
cursor.execute("SELECT 1 FROM entities WHERE cik = ?", (cik,))
return cursor.fetchone() is not None
def parse_json_and_insert_data(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
cik = data.get('cik')
# Start a new connection for each file
conn = connect_to_db()
if conn is None:
return False
try:
cursor = conn.cursor()
# Optional: Check if cik already exists in the database.
# You can comment this block out if you do not want this check.
if cik_exists(cursor, cik):
print(f"CIK {cik} already exists in the database. Skipping file {file_path}.")
return False
# Insert the entity
entity_name = data.get('entityName')
insert_entity(cursor, cik, entity_name)
# Iterate over facts
for taxonomy, fact_details in data['facts'].items():
for fact_id, fact in fact_details.items():
# Get fact details
label = fact.get('label')
description = fact.get('description')
for unit, unit_vals in fact.get('units', {}).items():
# Insert fact
insert_fact(cursor, taxonomy, fact_id, label, description, unit)
# Insert each data point
for entry in unit_vals:
start = entry.get('start', None)
end = entry['end']
val = entry['val']
accn = entry['accn']
fy = entry['fy']
fp = entry['fp']
form = entry['form']
filed = entry['filed']
frame = entry.get('frame', None)
insert_data(cursor, cik, fact_id, start, end, val, accn, fy, fp, form, filed, frame)
# Commit transaction
conn.commit()
return True
except Exception as e:
print(f"Error occurred while processing {file_path}: {e}")
conn.rollback()
return False
finally:
cursor.close()
conn.close()
def process_all_files_in_directory(directory_path):
files = [f for f in os.listdir(directory_path) if f.endswith('.json')]
total_files = len(files)
processed_files = 0
for idx, file_name in enumerate(files, start=1):
file_path = os.path.join(directory_path, file_name)
print(f"Processing file {idx} of {total_files}: {file_name}")
if parse_json_and_insert_data(file_path):
processed_files += 1
print(f"Successfully processed {file_name}")
else:
print(f"Failed to process {file_name}")
print(f"Finished processing {processed_files} out of {total_files} files.")
def main():
# Process all JSON files in the directory
directory_path = './sec_data/companyfacts/'
process_all_files_in_directory(directory_path)
if __name__ == "__main__":
main()