- Organisiert eine Vielzahl von Skripten aus dem Root-Verzeichnis in thematische Unterordner, um die Übersichtlichkeit zu verbessern und die Migration vorzubereiten. - Verschiebt SuperOffice-bezogene Test- und Hilfsskripte in . - Verschiebt Notion-bezogene Synchronisations- und Import-Skripte in . - Archiviert eindeutig veraltete und ungenutzte Skripte in . - Die zentralen Helfer und bleiben im Root, da sie von mehreren Tools als Abhängigkeit genutzt werden.
93 lines
3.2 KiB
Python
93 lines
3.2 KiB
Python
import csv
|
|
from collections import Counter
|
|
import os
|
|
import argparse
|
|
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime
|
|
from sqlalchemy.orm import sessionmaker
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from datetime import datetime
|
|
import logging
|
|
|
|
# --- Standalone Configuration ---
|
|
DATABASE_URL = "sqlite:////app/companies_v3_fixed_2.db"
|
|
LOG_FILE = "/app/Log_from_docker/standalone_importer.log"
|
|
|
|
# --- Logging Setup ---
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(LOG_FILE),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- SQLAlchemy Models (simplified, only what's needed) ---
|
|
Base = declarative_base()
|
|
|
|
class RawJobTitle(Base):
|
|
__tablename__ = 'raw_job_titles'
|
|
id = Column(Integer, primary_key=True)
|
|
title = Column(String, unique=True, index=True)
|
|
count = Column(Integer, default=1)
|
|
source = Column(String, default="import")
|
|
is_mapped = Column(Boolean, default=False)
|
|
created_at = Column(DateTime, default=datetime.utcnow)
|
|
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
|
|
|
# --- Database Connection ---
|
|
engine = create_engine(DATABASE_URL)
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
def import_job_titles_standalone(file_path: str):
|
|
db = SessionLocal()
|
|
try:
|
|
logger.info(f"Starting standalone import of job titles from {file_path}")
|
|
|
|
job_title_counts = Counter()
|
|
total_rows = 0
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.reader(f)
|
|
for row in reader:
|
|
if row and row[0].strip():
|
|
title = row[0].strip()
|
|
job_title_counts[title] += 1
|
|
total_rows += 1
|
|
|
|
logger.info(f"Read {total_rows} total job title entries. Found {len(job_title_counts)} unique titles.")
|
|
|
|
added_count = 0
|
|
updated_count = 0
|
|
|
|
for title, count in job_title_counts.items():
|
|
existing_title = db.query(RawJobTitle).filter(RawJobTitle.title == title).first()
|
|
if existing_title:
|
|
if existing_title.count != count:
|
|
existing_title.count = count
|
|
updated_count += 1
|
|
else:
|
|
new_title = RawJobTitle(title=title, count=count, source="csv_import", is_mapped=False)
|
|
db.add(new_title)
|
|
added_count += 1
|
|
|
|
db.commit()
|
|
logger.info(f"Standalone import complete. Added {added_count} new unique titles, updated {updated_count} existing titles.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during standalone job title import: {e}", exc_info=True)
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Standalone script to import job titles from a CSV file.")
|
|
parser.add_argument("file_path", type=str, help="Path to the CSV file containing job titles.")
|
|
args = parser.parse_args()
|
|
|
|
# Ensure the log directory exists
|
|
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
|
|
|
import_job_titles_standalone(args.file_path)
|