Files
Brancheneinstufung2/ARCHIVE_legacy_scripts/standalone_importer.py
Floke d021b6b71c refactor: [30388f42] Strukturiere Root-Skripte thematisch neu
- Organisiert eine Vielzahl von Skripten aus dem Root-Verzeichnis in thematische Unterordner, um die Übersichtlichkeit zu verbessern und die Migration vorzubereiten.
- Verschiebt SuperOffice-bezogene Test- und Hilfsskripte in .
- Verschiebt Notion-bezogene Synchronisations- und Import-Skripte in .
- Archiviert eindeutig veraltete und ungenutzte Skripte in .
- Die zentralen Helfer  und  bleiben im Root, da sie von mehreren Tools als Abhängigkeit genutzt werden.
2026-03-06 10:16:08 +00:00

93 lines
3.2 KiB
Python

import csv
from collections import Counter
import os
import argparse
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import logging
# --- Standalone Configuration ---
DATABASE_URL = "sqlite:////app/companies_v3_fixed_2.db"
LOG_FILE = "/app/Log_from_docker/standalone_importer.log"
# --- Logging Setup ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# --- SQLAlchemy Models (simplified, only what's needed) ---
Base = declarative_base()
class RawJobTitle(Base):
__tablename__ = 'raw_job_titles'
id = Column(Integer, primary_key=True)
title = Column(String, unique=True, index=True)
count = Column(Integer, default=1)
source = Column(String, default="import")
is_mapped = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# --- Database Connection ---
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def import_job_titles_standalone(file_path: str):
db = SessionLocal()
try:
logger.info(f"Starting standalone import of job titles from {file_path}")
job_title_counts = Counter()
total_rows = 0
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
for row in reader:
if row and row[0].strip():
title = row[0].strip()
job_title_counts[title] += 1
total_rows += 1
logger.info(f"Read {total_rows} total job title entries. Found {len(job_title_counts)} unique titles.")
added_count = 0
updated_count = 0
for title, count in job_title_counts.items():
existing_title = db.query(RawJobTitle).filter(RawJobTitle.title == title).first()
if existing_title:
if existing_title.count != count:
existing_title.count = count
updated_count += 1
else:
new_title = RawJobTitle(title=title, count=count, source="csv_import", is_mapped=False)
db.add(new_title)
added_count += 1
db.commit()
logger.info(f"Standalone import complete. Added {added_count} new unique titles, updated {updated_count} existing titles.")
except Exception as e:
logger.error(f"Error during standalone job title import: {e}", exc_info=True)
db.rollback()
finally:
db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Standalone script to import job titles from a CSV file.")
parser.add_argument("file_path", type=str, help="Path to the CSV file containing job titles.")
args = parser.parse_args()
# Ensure the log directory exists
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
import_job_titles_standalone(args.file_path)