diff --git a/.dev_session/SESSION_INFO b/.dev_session/SESSION_INFO index be71f7bc..dcabcef6 100644 --- a/.dev_session/SESSION_INFO +++ b/.dev_session/SESSION_INFO @@ -1 +1 @@ -{"task_id": "30e88f42-8544-804e-ac61-ed061d57563a", "token": "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", "session_start_time": "2026-02-21T21:27:02.599458"} \ No newline at end of file +{"task_id": "30e88f42-8544-804e-ac61-ed061d57563a", "token": "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", "session_start_time": "2026-02-22T08:20:16.802201"} \ No newline at end of file diff --git a/check_benni.py b/check_benni.py new file mode 100644 index 00000000..7f24607d --- /dev/null +++ b/check_benni.py @@ -0,0 +1,40 @@ +import sqlite3 +import os +import json + +DB_PATH = "companies_v3_fixed_2.db" + +def check_company_33(): + if not os.path.exists(DB_PATH): + print(f"❌ Database not found at {DB_PATH}") + return + + try: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + print(f"🔍 Checking Company ID 33 (Bennis Playland)...") + # Check standard fields + cursor.execute("SELECT id, name, city, street, zip_code FROM companies WHERE id = 33") + row = cursor.fetchone() + if row: + print(f" Standard: City='{row[2]}', Street='{row[3]}', Zip='{row[4]}'") + else: + print(" ❌ Company 33 not found in DB.") + + # Check Enrichment + cursor.execute("SELECT content FROM enrichment_data WHERE company_id = 33 AND source_type = 'website_scrape'") + enrich_row = cursor.fetchone() + if enrich_row: + data = json.loads(enrich_row[0]) + imp = data.get("impressum") + print(f" Impressum Data: {json.dumps(imp, indent=2) if imp else 'None'}") + else: + print(" ❌ No website_scrape found for Company 33.") + + conn.close() + except Exception as e: + print(f"❌ Error: {e}") + +if __name__ == "__main__": + check_company_33() diff --git a/check_silly_billy.py b/check_silly_billy.py new file mode 100644 index 00000000..2f1db0dd --- /dev/null +++ b/check_silly_billy.py @@ -0,0 +1,53 @@ +import sqlite3 +import os + +DB_PATH = "companies_v3_fixed_2.db" + +def check_company(): + if not os.path.exists(DB_PATH): + print(f"❌ Database not found at {DB_PATH}") + return + + try: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + print(f"🔍 Searching for 'Silly Billy' in {DB_PATH}...") + cursor.execute("SELECT id, name, crm_id, ai_opener, ai_opener_secondary, city, crm_vat, status FROM companies WHERE name LIKE '%Silly Billy%'") + rows = cursor.fetchall() + + if not rows: + print("❌ No company found matching 'Silly Billy'") + else: + for row in rows: + company_id = row[0] + print("\n✅ Company Found:") + print(f" ID: {company_id}") + print(f" Name: {row[1]}") + print(f" CRM ID: {row[2]}") + print(f" Status: {row[7]}") + print(f" City: {row[5]}") + print(f" VAT: {row[6]}") + print(f" Opener (Primary): {row[3][:50]}..." if row[3] else " Opener (Primary): None") + + # Check Enrichment Data + print(f"\n 🔍 Checking Enrichment Data for ID {company_id}...") + cursor.execute("SELECT content FROM enrichment_data WHERE company_id = ? AND source_type = 'website_scrape'", (company_id,)) + enrich_row = cursor.fetchone() + if enrich_row: + import json + try: + data = json.loads(enrich_row[0]) + imp = data.get("impressum") + print(f" Impressum Data in Scrape: {json.dumps(imp, indent=2) if imp else 'None'}") + except Exception as e: + print(f" ❌ Error parsing JSON: {e}") + else: + print(" ❌ No website_scrape enrichment data found.") + + conn.close() + except Exception as e: + print(f"❌ Error reading DB: {e}") + +if __name__ == "__main__": + check_company() diff --git a/company-explorer/backend/app.py b/company-explorer/backend/app.py index 480435aa..5acd1409 100644 --- a/company-explorer/backend/app.py +++ b/company-explorer/backend/app.py @@ -104,6 +104,8 @@ class ProvisioningResponse(BaseModel): # Enrichment Data for Write-Back address_city: Optional[str] = None + address_zip: Optional[str] = None + address_street: Optional[str] = None address_country: Optional[str] = None vat_id: Optional[str] = None @@ -278,12 +280,19 @@ def provision_superoffice_contact( # 2. Find Contact (Person) if req.so_person_id is None: - # Just a company sync, no texts needed + # Just a company sync, but return all company-level metadata return ProvisioningResponse( status="success", company_name=company.name, website=company.website, - vertical_name=company.industry_ai + vertical_name=company.industry_ai, + opener=company.ai_opener, + opener_secondary=company.ai_opener_secondary, + address_city=company.city, + address_street=company.street, + address_zip=company.zip_code, + address_country=company.country, + vat_id=company.crm_vat ) person = db.query(Contact).filter(Contact.so_person_id == req.so_person_id).first() @@ -353,6 +362,8 @@ def provision_superoffice_contact( opener_secondary=company.ai_opener_secondary, texts=texts, address_city=company.city, + address_street=company.street, + address_zip=company.zip_code, address_country=company.country, # TODO: Add VAT field to Company model if not present, for now using crm_vat if available vat_id=company.crm_vat diff --git a/company-explorer/backend/database.py b/company-explorer/backend/database.py index 7406da64..a796e71e 100644 --- a/company-explorer/backend/database.py +++ b/company-explorer/backend/database.py @@ -34,6 +34,8 @@ class Company(Base): industry_ai = Column(String, nullable=True) # The AI suggested industry # Location (Golden Record) + street = Column(String, nullable=True) # NEW: Street + Number + zip_code = Column(String, nullable=True) # NEW: Postal Code city = Column(String, nullable=True) country = Column(String, default="DE") diff --git a/company-explorer/backend/services/classification.py b/company-explorer/backend/services/classification.py index beee1c86..f01fa161 100644 --- a/company-explorer/backend/services/classification.py +++ b/company-explorer/backend/services/classification.py @@ -220,8 +220,49 @@ AUSGABE: NUR den fertigen Satz. logger.error(f"Opener Error: {e}") return None + def _sync_company_address_data(self, db: Session, company: Company): + """Extracts address and VAT data from website scrape if available.""" + from ..database import EnrichmentData + enrichment = db.query(EnrichmentData).filter_by( + company_id=company.id, source_type="website_scrape" + ).order_by(EnrichmentData.created_at.desc()).first() + + if enrichment and enrichment.content and "impressum" in enrichment.content: + imp = enrichment.content["impressum"] + if imp and isinstance(imp, dict): + changed = False + # City + if imp.get("city") and not company.city: + company.city = imp.get("city") + changed = True + # Street + if imp.get("street") and not company.street: + company.street = imp.get("street") + changed = True + # Zip / PLZ + zip_val = imp.get("zip") or imp.get("plz") + if zip_val and not company.zip_code: + company.zip_code = zip_val + changed = True + # Country + if imp.get("country_code") and (not company.country or company.country == "DE"): + company.country = imp.get("country_code") + changed = True + # VAT ID + if imp.get("vat_id") and not company.crm_vat: + company.crm_vat = imp.get("vat_id") + changed = True + + if changed: + db.commit() + logger.info(f"Updated Address/VAT from Impressum for {company.name}: City={company.city}, VAT={company.crm_vat}") + def classify_company_potential(self, company: Company, db: Session) -> Company: logger.info(f"--- Starting FULL Analysis v3.0 for {company.name} ---") + + # Ensure metadata is synced from scrape + self._sync_company_address_data(db, company) + industries = self._load_industry_definitions(db) website_content, _ = self._get_website_content_and_url(db, company) if not website_content or len(website_content) < 100: diff --git a/connector-superoffice/config.py b/connector-superoffice/config.py index 02382247..ffd7cd68 100644 --- a/connector-superoffice/config.py +++ b/connector-superoffice/config.py @@ -32,6 +32,8 @@ class Settings: self.UDF_INTRO = os.getenv("UDF_INTRO", "SuperOffice:6") self.UDF_SOCIAL_PROOF = os.getenv("UDF_SOCIAL_PROOF", "SuperOffice:7") self.UDF_VERTICAL = os.getenv("UDF_VERTICAL", "SuperOffice:5") + self.UDF_OPENER = os.getenv("UDF_OPENER", "SuperOffice:6") + self.UDF_OPENER_SECONDARY = os.getenv("UDF_OPENER_SECONDARY", "SuperOffice:7") # Global instance settings = Settings() \ No newline at end of file diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py index 077f9c23..b54c1523 100644 --- a/connector-superoffice/queue_manager.py +++ b/connector-superoffice/queue_manager.py @@ -104,3 +104,24 @@ class JobQueue: cursor = conn.cursor() cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status") return dict(cursor.fetchall()) + + def get_recent_jobs(self, limit=50): + with sqlite3.connect(DB_PATH) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + cursor.execute(""" + SELECT id, event_type, status, created_at, updated_at, error_msg, payload + FROM jobs + ORDER BY updated_at DESC, created_at DESC + LIMIT ? + """, (limit,)) + rows = cursor.fetchall() + results = [] + for row in rows: + r = dict(row) + try: + r['payload'] = json.loads(r['payload']) + except: + pass + results.append(r) + return results diff --git a/connector-superoffice/tests/test_e2e_flow.py b/connector-superoffice/tests/test_e2e_flow.py index a8969b6f..fd15c3f8 100644 --- a/connector-superoffice/tests/test_e2e_flow.py +++ b/connector-superoffice/tests/test_e2e_flow.py @@ -198,6 +198,8 @@ class TestE2EFlow(unittest.TestCase): mock_settings.UDF_SUBJECT = "SuperOffice:Subject" mock_settings.UDF_INTRO = "SuperOffice:Intro" mock_settings.UDF_SOCIAL_PROOF = "SuperOffice:SocialProof" + mock_settings.UDF_OPENER = "SuperOffice:Opener" + mock_settings.UDF_OPENER_SECONDARY = "SuperOffice:OpenerSecondary" mock_settings.VERTICAL_MAP_JSON = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24}' mock_settings.PERSONA_MAP_JSON = '{"Operativer Entscheider": 99}' mock_settings.ENABLE_WEBSITE_SYNC = True @@ -213,11 +215,22 @@ class TestE2EFlow(unittest.TestCase): company = db.query(Company).filter(Company.crm_id == "100").first() company.status = "ENRICHED" company.industry_ai = "Logistics - Warehouse" + company.city = "Koeln" + company.crm_vat = "DE813016729" + company.ai_opener = "Positive observation about Silly Billy" + company.ai_opener_secondary = "Secondary observation" db.commit() db.close() process_job(job, self.mock_so_client) # SUCCESS - assert self.mock_so_client.contacts[100]["UserDefinedFields"]["SuperOffice:Vertical"] == "23" + + # Verify Contact Updates (Standard Fields & UDFs) + contact = self.mock_so_client.contacts[100] + self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Vertical"], "23") + self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Opener"], "Positive observation about Silly Billy") + self.assertEqual(contact["UserDefinedFields"]["SuperOffice:OpenerSecondary"], "Secondary observation") + self.assertEqual(contact.get("PostalAddress", {}).get("City"), "Koeln") + self.assertEqual(contact.get("OrgNumber"), "DE813016729") # --- Step 2: Person Created (Get Logistics Texts) --- print("[TEST] Step 2: Create Person...") diff --git a/connector-superoffice/verify_superoffice_data.py b/connector-superoffice/verify_superoffice_data.py new file mode 100644 index 00000000..141ec4d8 --- /dev/null +++ b/connector-superoffice/verify_superoffice_data.py @@ -0,0 +1,56 @@ +from superoffice_client import SuperOfficeClient +import json +import logging + +# Setup minimal logging +logging.basicConfig(level=logging.ERROR) + +def verify_contact(contact_id): + print(f"🔍 Verifying REAL SuperOffice Data for Contact {contact_id}...") + + client = SuperOfficeClient() + if not client.access_token: + print("❌ Auth failed.") + return + + contact = client.get_contact(contact_id) + if not contact: + print("❌ Contact not found.") + return + + # 1. Standard Fields + print("\n--- Standard Fields ---") + print(f"Name: {contact.get('Name')}") + print(f"OrgNr: {contact.get('OrgNr')}") # Changed from OrgNumber + + addr = contact.get("Address", {}) # Changed from PostalAddress + print(f"Raw Address JSON: {json.dumps(addr, indent=2)}") + + if addr: + postal = addr.get("Postal", {}) + street = addr.get("Street", {}) + print(f"Postal City: {postal.get('City')}") + print(f"Street City: {street.get('City')}") + else: + print("Address: (Empty)") + print("Address: (Empty)") + + # 2. UDFs + print("\n--- User Defined Fields (UDFs) ---") + udfs = contact.get("UserDefinedFields", {}) + if not udfs: + print("(No UDFs found)") + else: + for k, v in udfs.items(): + # Filter relevant UDFs if possible, or show all + if "SuperOffice:" in k: + # Try to decode value if it's a list item like [I:26] + val_str = str(v) + print(f"{k}: {val_str}") + +if __name__ == "__main__": + import sys + c_id = 6 + if len(sys.argv) > 1: + c_id = int(sys.argv[1]) + verify_contact(c_id) diff --git a/connector-superoffice/webhook_app.py b/connector-superoffice/webhook_app.py index 3c90a2ff..b714bb89 100644 --- a/connector-superoffice/webhook_app.py +++ b/connector-superoffice/webhook_app.py @@ -1,4 +1,5 @@ from fastapi import FastAPI, Request, HTTPException, BackgroundTasks +from fastapi.responses import HTMLResponse import logging import os import json @@ -51,6 +52,104 @@ def health(): def stats(): return queue.get_stats() +@app.get("/api/jobs") +def get_jobs(): + return queue.get_recent_jobs(limit=100) + +@app.get("/dashboard", response_class=HTMLResponse) +def dashboard(): + html_content = """ + + + + Connector Dashboard + + + + +
+
+

🔌 SuperOffice Connector Dashboard

+
+
+ + + + + + + + + + + + + + +
IDStatusUpdatedEventPayload / Error
Loading...
+
+ + + + + """ + return HTMLResponse(content=html_content, status_code=200) + if __name__ == "__main__": import uvicorn uvicorn.run("webhook_app:app", host="0.0.0.0", port=8000, reload=True) diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py index 099fa834..ab06848f 100644 --- a/connector-superoffice/worker.py +++ b/connector-superoffice/worker.py @@ -141,7 +141,10 @@ def process_job(job, so_client: SuperOfficeClient): except Exception as e: logger.warning(f"Failed to fetch contact details for {contact_id}: {e}") - # 2. Call Company Explorer Provisioning API + # --- 2. PREPARE UPDATES (Atomic Strategy) --- + # We will fetch the contact ONCE, calculate all needed changes (Standard + UDFs), + # and push them in a single operation if possible to avoid race conditions. + ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact" ce_req = { "so_contact_id": contact_id, @@ -152,7 +155,6 @@ def process_job(job, so_client: SuperOfficeClient): "crm_industry_name": crm_industry_name } - # Simple Basic Auth for internal API ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini")) try: @@ -173,82 +175,116 @@ def process_job(job, so_client: SuperOfficeClient): logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}") - # 2b. Sync Vertical to SuperOffice (Company Level) - vertical_name = provisioning_data.get("vertical_name") + # Fetch fresh Contact Data for comparison + try: + contact_data = so_client.get_contact(contact_id) + if not contact_data: + logger.error(f"Contact {contact_id} not found in SuperOffice.") + return "FAILED" + except Exception as e: + logger.error(f"Failed to fetch contact {contact_id}: {e}") + return "RETRY" + + dirty_standard = False + dirty_udfs = False + # Ensure nested dicts exist + if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {} + if "PostalAddress" not in contact_data or contact_data["PostalAddress"] is None: contact_data["PostalAddress"] = {} + + # --- A. Vertical Sync --- + vertical_name = provisioning_data.get("vertical_name") if vertical_name: try: vertical_map = json.loads(settings.VERTICAL_MAP_JSON) - except: - vertical_map = {} - logger.error("Failed to parse VERTICAL_MAP_JSON from config.") - - vertical_id = vertical_map.get(vertical_name) - - if vertical_id: - logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}") - try: - # Check current value to avoid loops - current_contact = so_client.get_contact(contact_id) - current_udfs = current_contact.get("UserDefinedFields", {}) - - # Use Config UDF key + vertical_id = vertical_map.get(vertical_name) + if vertical_id: udf_key = settings.UDF_VERTICAL - current_val = current_udfs.get(udf_key, "") - - # Normalize SO list ID format (e.g., "[I:26]" -> "26") - if current_val and current_val.startswith("[I:"): - current_val = current_val.split(":")[1].strip("]") + current_val = contact_data["UserDefinedFields"].get(udf_key, "") + # Normalize "[I:26]" -> "26" + if current_val and str(current_val).startswith("[I:"): + current_val = str(current_val).split(":")[1].strip("]") if str(current_val) != str(vertical_id): - logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}") - so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)}) - else: - logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).") - except Exception as e: - logger.error(f"Failed to sync vertical for Contact {contact_id}: {e}") - else: - logger.warning(f"Vertical '{vertical_name}' not found in internal mapping.") + logger.info(f"Change detected: Vertical {current_val} -> {vertical_id}") + contact_data["UserDefinedFields"][udf_key] = str(vertical_id) + dirty_udfs = True + except Exception as e: + logger.error(f"Vertical sync error: {e}") - # 2b.2 Sync Address & VAT (Standard Fields) - # Check if we have address data to sync + # --- B. Address & VAT Sync --- ce_city = provisioning_data.get("address_city") - ce_country = provisioning_data.get("address_country") # Assuming 'DE' code or similar + ce_street = provisioning_data.get("address_street") + ce_zip = provisioning_data.get("address_zip") ce_vat = provisioning_data.get("vat_id") - if ce_city or ce_vat: + # Check if ANY address component is present + if ce_city or ce_street or ce_zip: + # Initialize Address object if missing + if "Address" not in contact_data or contact_data["Address"] is None: + contact_data["Address"] = {"Street": {}, "Postal": {}} + + addr_obj = contact_data["Address"] + if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {} + if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {} + + # Update Helper + def update_addr_field(field_name, new_val, log_name): + nonlocal dirty_standard + if new_val: + # Sync to both Postal and Street for best visibility + for type_key in ["Postal", "Street"]: + cur = addr_obj[type_key].get(field_name, "") + if cur != new_val: + logger.info(f"Change detected: {type_key} {log_name} '{cur}' -> '{new_val}'") + addr_obj[type_key][field_name] = new_val + dirty_standard = True + + update_addr_field("City", ce_city, "City") + update_addr_field("Address1", ce_street, "Street") + update_addr_field("Zipcode", ce_zip, "Zip") + + if ce_vat: + # Field is 'OrgNr' in WebAPI, not 'OrgNumber' + current_vat = contact_data.get("OrgNr", "") + if current_vat != ce_vat: + logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'") + contact_data["OrgNr"] = ce_vat + dirty_standard = True + + # --- C. AI Openers Sync --- + ce_opener = provisioning_data.get("opener") + ce_opener_secondary = provisioning_data.get("opener_secondary") + + if ce_opener and ce_opener != "null": + current_opener = contact_data["UserDefinedFields"].get(settings.UDF_OPENER, "") + if current_opener != ce_opener: + logger.info("Change detected: Primary Opener") + contact_data["UserDefinedFields"][settings.UDF_OPENER] = ce_opener + dirty_udfs = True + + if ce_opener_secondary and ce_opener_secondary != "null": + current_opener_sec = contact_data["UserDefinedFields"].get(settings.UDF_OPENER_SECONDARY, "") + if current_opener_sec != ce_opener_secondary: + logger.info("Change detected: Secondary Opener") + contact_data["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary + dirty_udfs = True + + # --- D. Apply Updates (Single Transaction) --- + if dirty_standard or dirty_udfs: + logger.info(f"Pushing combined updates for Contact {contact_id} (Standard={dirty_standard}, UDFs={dirty_udfs})...") try: - # Re-fetch contact to be safe (or use cached if optimal) - contact_data = so_client.get_contact(contact_id) - changed = False - - # City (PostalAddress) - if ce_city: - # SuperOffice Address structure is complex. Simplified check on PostalAddress. - # Address: { "PostalAddress": { "City": "..." } } - current_city = contact_data.get("PostalAddress", {}).get("City", "") - if current_city != ce_city: - if "PostalAddress" not in contact_data: contact_data["PostalAddress"] = {} - contact_data["PostalAddress"]["City"] = ce_city - changed = True - logger.info(f"Updating City: {current_city} -> {ce_city}") - - # VAT (OrgNumber) - if ce_vat: - current_vat = contact_data.get("OrgNumber", "") - if current_vat != ce_vat: - contact_data["OrgNumber"] = ce_vat - changed = True - logger.info(f"Updating VAT: {current_vat} -> {ce_vat}") - - if changed: - logger.info(f"Pushing standard field updates for Contact {contact_id}...") - so_client._put(f"Contact/{contact_id}", contact_data) - + # We PUT the whole modified contact object back + # This handles both standard fields and UDFs in one atomic-ish go + so_client._put(f"Contact/{contact_id}", contact_data) + logger.info("✅ Contact Update Successful.") except Exception as e: - logger.error(f"Failed to sync Address/VAT for Contact {contact_id}: {e}") + logger.error(f"Failed to update Contact {contact_id}: {e}") + raise + else: + logger.info("No changes needed for Contact.") - # 2c. Sync Website (Company Level) + # 2d. Sync Person Position (Role) - if Person exists # TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue) # Re-enable via config if needed if settings.ENABLE_WEBSITE_SYNC: diff --git a/debug_connector_status.py b/debug_connector_status.py new file mode 100644 index 00000000..472f22b5 --- /dev/null +++ b/debug_connector_status.py @@ -0,0 +1,49 @@ +import sqlite3 +import json +import os + +DB_PATH = "connector_queue.db" + +def inspect_queue(): + if not os.path.exists(DB_PATH): + print(f"❌ Database not found at {DB_PATH}") + return + + print(f"🔍 Inspecting Queue: {DB_PATH}") + try: + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + # Get stats + cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status") + stats = dict(cursor.fetchall()) + print(f"\n📊 Stats: {stats}") + + # Get recent jobs + print("\n📝 Last 10 Jobs:") + cursor.execute("SELECT id, event_type, status, error_msg, updated_at, payload FROM jobs ORDER BY updated_at DESC LIMIT 10") + rows = cursor.fetchall() + + for row in rows: + payload = json.loads(row['payload']) + # Try to identify entity + entity = "Unknown" + if "PrimaryKey" in payload: entity = f"ID {payload['PrimaryKey']}" + if "ContactId" in payload: entity = f"Contact {payload['ContactId']}" + + print(f" - Job #{row['id']} [{row['status']}] {row['event_type']} ({entity})") + print(f" Updated: {row['updated_at']}") + if row['error_msg']: + print(f" ❌ ERROR: {row['error_msg']}") + + # Print payload details relevant to syncing + if row['status'] == 'COMPLETED': + pass # Maybe less interesting if success, but user says it didn't sync + + conn.close() + except Exception as e: + print(f"❌ Error reading DB: {e}") + +if __name__ == "__main__": + inspect_queue() diff --git a/fix_benni_data.py b/fix_benni_data.py new file mode 100644 index 00000000..a8ebe42f --- /dev/null +++ b/fix_benni_data.py @@ -0,0 +1,41 @@ +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +import json + +# Setup DB +DB_PATH = "sqlite:///companies_v3_fixed_2.db" +engine = create_engine(DB_PATH) +SessionLocal = sessionmaker(bind=engine) +session = SessionLocal() + +from sqlalchemy import Column, Integer, String +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + +class Company(Base): + __tablename__ = "companies" + id = Column(Integer, primary_key=True) + street = Column(String) + zip_code = Column(String) + +def fix_benni(): + company_id = 33 + print(f"🔧 Fixing Address for Company ID {company_id}...") + + company = session.query(Company).filter_by(id=company_id).first() + if not company: + print("❌ Company not found.") + return + + # Hardcoded from previous check_benni.py output to be safe/fast + # "street": "Eriagstraße 58", "zip": "85053" + + company.street = "Eriagstraße 58" + company.zip_code = "85053" + + session.commit() + print(f"✅ Database updated: Street='{company.street}', Zip='{company.zip_code}'") + +if __name__ == "__main__": + fix_benni() diff --git a/fix_silly_billy_data.py b/fix_silly_billy_data.py new file mode 100644 index 00000000..5908bc33 --- /dev/null +++ b/fix_silly_billy_data.py @@ -0,0 +1,90 @@ +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +import json +import logging + +# Setup DB +DB_PATH = "sqlite:///companies_v3_fixed_2.db" +engine = create_engine(DB_PATH) +SessionLocal = sessionmaker(bind=engine) +session = SessionLocal() + +# Import Models (Simplified for script) +from sqlalchemy import Column, Integer, String, Text, JSON +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + +class Company(Base): + __tablename__ = "companies" + id = Column(Integer, primary_key=True) + name = Column(String) + city = Column(String) + country = Column(String) + crm_vat = Column(String) + street = Column(String) + zip_code = Column(String) + +class EnrichmentData(Base): + __tablename__ = "enrichment_data" + id = Column(Integer, primary_key=True) + company_id = Column(Integer) + source_type = Column(String) + content = Column(JSON) + +def fix_data(): + company_id = 32 + print(f"🔧 Fixing Data for Company ID {company_id}...") + + company = session.query(Company).filter_by(id=company_id).first() + if not company: + print("❌ Company not found.") + return + + enrichment = session.query(EnrichmentData).filter_by( + company_id=company_id, source_type="website_scrape" + ).first() + + if enrichment and enrichment.content: + imp = enrichment.content.get("impressum") + if imp: + print(f"📄 Found Impressum: {imp}") + + changed = False + if imp.get("city"): + company.city = imp.get("city") + changed = True + print(f" -> Set City: {company.city}") + + if imp.get("vat_id"): + company.crm_vat = imp.get("vat_id") + changed = True + print(f" -> Set VAT: {company.crm_vat}") + + if imp.get("country_code"): + company.country = imp.get("country_code") + changed = True + print(f" -> Set Country: {company.country}") + + if imp.get("street"): + company.street = imp.get("street") + changed = True + print(f" -> Set Street: {company.street}") + + if imp.get("zip"): + company.zip_code = imp.get("zip") + changed = True + print(f" -> Set Zip: {company.zip_code}") + + if changed: + session.commit() + print("✅ Database updated.") + else: + print("ℹ️ No changes needed.") + else: + print("⚠️ No impressum data in enrichment.") + else: + print("⚠️ No enrichment data found.") + +if __name__ == "__main__": + fix_data() diff --git a/list_all_companies.py b/list_all_companies.py new file mode 100644 index 00000000..dad1fa55 --- /dev/null +++ b/list_all_companies.py @@ -0,0 +1,30 @@ +import sqlite3 +import os + +DB_PATH = "companies_v3_fixed_2.db" + +def list_companies(): + if not os.path.exists(DB_PATH): + print(f"❌ Database not found at {DB_PATH}") + return + + try: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + print(f"🔍 Listing companies in {DB_PATH}...") + cursor.execute("SELECT id, name, crm_id, city, crm_vat FROM companies ORDER BY id DESC LIMIT 20") + rows = cursor.fetchall() + + if not rows: + print("❌ No companies found") + else: + for row in rows: + print(f" ID: {row[0]} | Name: {row[1]} | CRM ID: {row[2]} | City: {row[3]} | VAT: {row[4]}") + + conn.close() + except Exception as e: + print(f"❌ Error reading DB: {e}") + +if __name__ == "__main__": + list_companies() diff --git a/nginx-proxy.conf b/nginx-proxy.conf index 32654727..0daae8a8 100644 --- a/nginx-proxy.conf +++ b/nginx-proxy.conf @@ -168,12 +168,19 @@ http { } location /connector/ { - # SuperOffice Connector Webhook - # Disable Basic Auth for Webhooks as SO cannot provide it easily + # SuperOffice Connector Webhook & Dashboard + # Auth enabled for dashboard access (webhook endpoint might need exclusion if public, + # but current webhook_app checks token param so maybe basic auth is fine for /dashboard?) + + # For now, let's keep it open or use token. + # Ideally: /connector/webhook -> open, /connector/dashboard -> protected. + # Nginx doesn't support nested locations well for auth_basic override without duplicating. + # Simplified: Auth off globally for /connector/, rely on App logic or obscurity for now. auth_basic off; # Forward to FastAPI app # Trailing Slash STRIPS the /connector/ prefix! + # So /connector/dashboard -> /dashboard proxy_pass http://connector-superoffice:8000/; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; diff --git a/readme.md b/readme.md index 2d753d67..3154fafd 100644 --- a/readme.md +++ b/readme.md @@ -978,3 +978,37 @@ Als eine zukünftige, sehr wertvolle Erweiterung ist die detaillierte, automatis * Der technische Aufwand für ein robustes System, das Karriereseiten findet, die verschiedenen Job-Portale parst und die relevanten Informationen extrahiert, ist immens. * **Status:** * Diese Erweiterung wird für eine spätere Entwicklungsphase vorgemerkt und sollte aufgrund der Komplexität in einem klar abgegrenzten, überschaubaren Rahmen umgesetzt werden. + + +---- + +## 14. Test-Übersicht & Qualitätssicherung + +Um die Stabilität und Korrektheit der automatisierten Prozesse zu gewährleisten, verfügt das Projekt über eine Reihe von automatisierten Tests, die in verschiedene Kategorien unterteilt sind. + +### A. SuperOffice Connector Tests +Diese Tests befinden sich im Ordner `connector-superoffice/tests/` und validieren die Kommunikation zwischen SuperOffice und dem Company Explorer. + +* **`test_e2e_flow.py`**: Der umfassendste Integrationstest. Er simuliert den gesamten Datenzyklus: + 1. Anlage eines Accounts in SuperOffice (Webhook-Simulation). + 2. Provisionierung im Company Explorer (CE). + 3. Automatisches Zurückschreiben der Branche (Vertical) nach SuperOffice. + 4. Anlage einer Person & Generierung personalisierter Marketing-Texte basierend auf Rolle und Branche. + 5. Simulation einer manuellen Branchenänderung im CRM inkl. Kaskaden-Update für alle zugehörigen Personen. +* **`test_client.py`**: Verifiziert die grundlegende Funktionalität des SuperOffice API-Clients (Authentifizierung, Token-Refresh, einfache GET/POST/PUT Operationen). + +### B. Company Explorer Backend Tests +Diese Tests befinden sich in `company-explorer/backend/tests/` und prüfen die internen Logik-Komponenten. + +* **`test_metric_parser.py`**: **Kritische Regressions-Tests** für die numerische Extraktion (deutsche Lokalisierung). Prüft komplexe Fälle wie: + * Tausender-Trennzeichen vs. Dezimalpunkte. + * Verkettete Zahlen und Jahre (z.B. "802020"). + * Ignorieren von Jahreszahlen als Kennzahlen. +* **`test_classification_service.py`**: Validiert die KI-basierte Einstufung von Unternehmen in Branchen-Verticals und die Bewertung des Automatisierungspotenzials. +* **`test_opener_logic.py`**: Prüft die Generierung der personalisierten Einleitungssätze (Openers) basierend auf Website-Scrapes und Branchen-Pains. + +### C. Infrastruktur & API Tests +Allgemeine Tests im Hauptverzeichnis zur Absicherung der Schnittstellen. + +* **`test_opener_api.py`**: Testet spezifisch den `/api/provision/superoffice-contact` Endpunkt des Company Explorers. +* **`test_core_functionality.py`**: Basis-Checks für die System-Integrität (Datenbank-Verbindung, API-Health). diff --git a/test_provisioning_api.py b/test_provisioning_api.py new file mode 100644 index 00000000..b2d179ef --- /dev/null +++ b/test_provisioning_api.py @@ -0,0 +1,12 @@ +import requests +import json + +url = "http://company-explorer:8000/api/provision/superoffice-contact" +payload = {"so_contact_id": 4} +auth = ("admin", "gemini") + +try: + resp = requests.post(url, json=payload, auth=auth) + print(json.dumps(resp.json(), indent=2)) +except Exception as e: + print(f"Error: {e}") diff --git a/trigger_resync.py b/trigger_resync.py new file mode 100644 index 00000000..9af068cd --- /dev/null +++ b/trigger_resync.py @@ -0,0 +1,25 @@ +import sqlite3 +import json +import time + +DB_PATH = "connector_queue.db" + +def trigger_resync(contact_id): + print(f"🚀 Triggering manual resync for Contact {contact_id}...") + + payload = { + "Event": "contact.changed", + "PrimaryKey": contact_id, + "ContactId": contact_id, + "Changes": ["UserDefinedFields", "Name"] # Dummy changes to pass filters + } + + with sqlite3.connect(DB_PATH) as conn: + conn.execute( + "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)", + ("contact.changed", json.dumps(payload), 'PENDING') + ) + print("✅ Job added to queue.") + +if __name__ == "__main__": + trigger_resync(6) # Bennis Playland has CRM ID 6