diff --git a/.dev_session/SESSION_INFO b/.dev_session/SESSION_INFO
index be71f7bc..dcabcef6 100644
--- a/.dev_session/SESSION_INFO
+++ b/.dev_session/SESSION_INFO
@@ -1 +1 @@
-{"task_id": "30e88f42-8544-804e-ac61-ed061d57563a", "token": "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", "session_start_time": "2026-02-21T21:27:02.599458"}
\ No newline at end of file
+{"task_id": "30e88f42-8544-804e-ac61-ed061d57563a", "token": "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", "session_start_time": "2026-02-22T08:20:16.802201"}
\ No newline at end of file
diff --git a/check_benni.py b/check_benni.py
new file mode 100644
index 00000000..7f24607d
--- /dev/null
+++ b/check_benni.py
@@ -0,0 +1,40 @@
+import sqlite3
+import os
+import json
+
+DB_PATH = "companies_v3_fixed_2.db"
+
+def check_company_33():
+ if not os.path.exists(DB_PATH):
+ print(f"❌ Database not found at {DB_PATH}")
+ return
+
+ try:
+ conn = sqlite3.connect(DB_PATH)
+ cursor = conn.cursor()
+
+ print(f"🔍 Checking Company ID 33 (Bennis Playland)...")
+ # Check standard fields
+ cursor.execute("SELECT id, name, city, street, zip_code FROM companies WHERE id = 33")
+ row = cursor.fetchone()
+ if row:
+ print(f" Standard: City='{row[2]}', Street='{row[3]}', Zip='{row[4]}'")
+ else:
+ print(" ❌ Company 33 not found in DB.")
+
+ # Check Enrichment
+ cursor.execute("SELECT content FROM enrichment_data WHERE company_id = 33 AND source_type = 'website_scrape'")
+ enrich_row = cursor.fetchone()
+ if enrich_row:
+ data = json.loads(enrich_row[0])
+ imp = data.get("impressum")
+ print(f" Impressum Data: {json.dumps(imp, indent=2) if imp else 'None'}")
+ else:
+ print(" ❌ No website_scrape found for Company 33.")
+
+ conn.close()
+ except Exception as e:
+ print(f"❌ Error: {e}")
+
+if __name__ == "__main__":
+ check_company_33()
diff --git a/check_silly_billy.py b/check_silly_billy.py
new file mode 100644
index 00000000..2f1db0dd
--- /dev/null
+++ b/check_silly_billy.py
@@ -0,0 +1,53 @@
+import sqlite3
+import os
+
+DB_PATH = "companies_v3_fixed_2.db"
+
+def check_company():
+ if not os.path.exists(DB_PATH):
+ print(f"❌ Database not found at {DB_PATH}")
+ return
+
+ try:
+ conn = sqlite3.connect(DB_PATH)
+ cursor = conn.cursor()
+
+ print(f"🔍 Searching for 'Silly Billy' in {DB_PATH}...")
+ cursor.execute("SELECT id, name, crm_id, ai_opener, ai_opener_secondary, city, crm_vat, status FROM companies WHERE name LIKE '%Silly Billy%'")
+ rows = cursor.fetchall()
+
+ if not rows:
+ print("❌ No company found matching 'Silly Billy'")
+ else:
+ for row in rows:
+ company_id = row[0]
+ print("\n✅ Company Found:")
+ print(f" ID: {company_id}")
+ print(f" Name: {row[1]}")
+ print(f" CRM ID: {row[2]}")
+ print(f" Status: {row[7]}")
+ print(f" City: {row[5]}")
+ print(f" VAT: {row[6]}")
+ print(f" Opener (Primary): {row[3][:50]}..." if row[3] else " Opener (Primary): None")
+
+ # Check Enrichment Data
+ print(f"\n 🔍 Checking Enrichment Data for ID {company_id}...")
+ cursor.execute("SELECT content FROM enrichment_data WHERE company_id = ? AND source_type = 'website_scrape'", (company_id,))
+ enrich_row = cursor.fetchone()
+ if enrich_row:
+ import json
+ try:
+ data = json.loads(enrich_row[0])
+ imp = data.get("impressum")
+ print(f" Impressum Data in Scrape: {json.dumps(imp, indent=2) if imp else 'None'}")
+ except Exception as e:
+ print(f" ❌ Error parsing JSON: {e}")
+ else:
+ print(" ❌ No website_scrape enrichment data found.")
+
+ conn.close()
+ except Exception as e:
+ print(f"❌ Error reading DB: {e}")
+
+if __name__ == "__main__":
+ check_company()
diff --git a/company-explorer/backend/app.py b/company-explorer/backend/app.py
index 480435aa..5acd1409 100644
--- a/company-explorer/backend/app.py
+++ b/company-explorer/backend/app.py
@@ -104,6 +104,8 @@ class ProvisioningResponse(BaseModel):
# Enrichment Data for Write-Back
address_city: Optional[str] = None
+ address_zip: Optional[str] = None
+ address_street: Optional[str] = None
address_country: Optional[str] = None
vat_id: Optional[str] = None
@@ -278,12 +280,19 @@ def provision_superoffice_contact(
# 2. Find Contact (Person)
if req.so_person_id is None:
- # Just a company sync, no texts needed
+ # Just a company sync, but return all company-level metadata
return ProvisioningResponse(
status="success",
company_name=company.name,
website=company.website,
- vertical_name=company.industry_ai
+ vertical_name=company.industry_ai,
+ opener=company.ai_opener,
+ opener_secondary=company.ai_opener_secondary,
+ address_city=company.city,
+ address_street=company.street,
+ address_zip=company.zip_code,
+ address_country=company.country,
+ vat_id=company.crm_vat
)
person = db.query(Contact).filter(Contact.so_person_id == req.so_person_id).first()
@@ -353,6 +362,8 @@ def provision_superoffice_contact(
opener_secondary=company.ai_opener_secondary,
texts=texts,
address_city=company.city,
+ address_street=company.street,
+ address_zip=company.zip_code,
address_country=company.country,
# TODO: Add VAT field to Company model if not present, for now using crm_vat if available
vat_id=company.crm_vat
diff --git a/company-explorer/backend/database.py b/company-explorer/backend/database.py
index 7406da64..a796e71e 100644
--- a/company-explorer/backend/database.py
+++ b/company-explorer/backend/database.py
@@ -34,6 +34,8 @@ class Company(Base):
industry_ai = Column(String, nullable=True) # The AI suggested industry
# Location (Golden Record)
+ street = Column(String, nullable=True) # NEW: Street + Number
+ zip_code = Column(String, nullable=True) # NEW: Postal Code
city = Column(String, nullable=True)
country = Column(String, default="DE")
diff --git a/company-explorer/backend/services/classification.py b/company-explorer/backend/services/classification.py
index beee1c86..f01fa161 100644
--- a/company-explorer/backend/services/classification.py
+++ b/company-explorer/backend/services/classification.py
@@ -220,8 +220,49 @@ AUSGABE: NUR den fertigen Satz.
logger.error(f"Opener Error: {e}")
return None
+ def _sync_company_address_data(self, db: Session, company: Company):
+ """Extracts address and VAT data from website scrape if available."""
+ from ..database import EnrichmentData
+ enrichment = db.query(EnrichmentData).filter_by(
+ company_id=company.id, source_type="website_scrape"
+ ).order_by(EnrichmentData.created_at.desc()).first()
+
+ if enrichment and enrichment.content and "impressum" in enrichment.content:
+ imp = enrichment.content["impressum"]
+ if imp and isinstance(imp, dict):
+ changed = False
+ # City
+ if imp.get("city") and not company.city:
+ company.city = imp.get("city")
+ changed = True
+ # Street
+ if imp.get("street") and not company.street:
+ company.street = imp.get("street")
+ changed = True
+ # Zip / PLZ
+ zip_val = imp.get("zip") or imp.get("plz")
+ if zip_val and not company.zip_code:
+ company.zip_code = zip_val
+ changed = True
+ # Country
+ if imp.get("country_code") and (not company.country or company.country == "DE"):
+ company.country = imp.get("country_code")
+ changed = True
+ # VAT ID
+ if imp.get("vat_id") and not company.crm_vat:
+ company.crm_vat = imp.get("vat_id")
+ changed = True
+
+ if changed:
+ db.commit()
+ logger.info(f"Updated Address/VAT from Impressum for {company.name}: City={company.city}, VAT={company.crm_vat}")
+
def classify_company_potential(self, company: Company, db: Session) -> Company:
logger.info(f"--- Starting FULL Analysis v3.0 for {company.name} ---")
+
+ # Ensure metadata is synced from scrape
+ self._sync_company_address_data(db, company)
+
industries = self._load_industry_definitions(db)
website_content, _ = self._get_website_content_and_url(db, company)
if not website_content or len(website_content) < 100:
diff --git a/connector-superoffice/config.py b/connector-superoffice/config.py
index 02382247..ffd7cd68 100644
--- a/connector-superoffice/config.py
+++ b/connector-superoffice/config.py
@@ -32,6 +32,8 @@ class Settings:
self.UDF_INTRO = os.getenv("UDF_INTRO", "SuperOffice:6")
self.UDF_SOCIAL_PROOF = os.getenv("UDF_SOCIAL_PROOF", "SuperOffice:7")
self.UDF_VERTICAL = os.getenv("UDF_VERTICAL", "SuperOffice:5")
+ self.UDF_OPENER = os.getenv("UDF_OPENER", "SuperOffice:6")
+ self.UDF_OPENER_SECONDARY = os.getenv("UDF_OPENER_SECONDARY", "SuperOffice:7")
# Global instance
settings = Settings()
\ No newline at end of file
diff --git a/connector-superoffice/queue_manager.py b/connector-superoffice/queue_manager.py
index 077f9c23..b54c1523 100644
--- a/connector-superoffice/queue_manager.py
+++ b/connector-superoffice/queue_manager.py
@@ -104,3 +104,24 @@ class JobQueue:
cursor = conn.cursor()
cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
return dict(cursor.fetchall())
+
+ def get_recent_jobs(self, limit=50):
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.row_factory = sqlite3.Row
+ cursor = conn.cursor()
+ cursor.execute("""
+ SELECT id, event_type, status, created_at, updated_at, error_msg, payload
+ FROM jobs
+ ORDER BY updated_at DESC, created_at DESC
+ LIMIT ?
+ """, (limit,))
+ rows = cursor.fetchall()
+ results = []
+ for row in rows:
+ r = dict(row)
+ try:
+ r['payload'] = json.loads(r['payload'])
+ except:
+ pass
+ results.append(r)
+ return results
diff --git a/connector-superoffice/tests/test_e2e_flow.py b/connector-superoffice/tests/test_e2e_flow.py
index a8969b6f..fd15c3f8 100644
--- a/connector-superoffice/tests/test_e2e_flow.py
+++ b/connector-superoffice/tests/test_e2e_flow.py
@@ -198,6 +198,8 @@ class TestE2EFlow(unittest.TestCase):
mock_settings.UDF_SUBJECT = "SuperOffice:Subject"
mock_settings.UDF_INTRO = "SuperOffice:Intro"
mock_settings.UDF_SOCIAL_PROOF = "SuperOffice:SocialProof"
+ mock_settings.UDF_OPENER = "SuperOffice:Opener"
+ mock_settings.UDF_OPENER_SECONDARY = "SuperOffice:OpenerSecondary"
mock_settings.VERTICAL_MAP_JSON = '{"Logistics - Warehouse": 23, "Healthcare - Hospital": 24}'
mock_settings.PERSONA_MAP_JSON = '{"Operativer Entscheider": 99}'
mock_settings.ENABLE_WEBSITE_SYNC = True
@@ -213,11 +215,22 @@ class TestE2EFlow(unittest.TestCase):
company = db.query(Company).filter(Company.crm_id == "100").first()
company.status = "ENRICHED"
company.industry_ai = "Logistics - Warehouse"
+ company.city = "Koeln"
+ company.crm_vat = "DE813016729"
+ company.ai_opener = "Positive observation about Silly Billy"
+ company.ai_opener_secondary = "Secondary observation"
db.commit()
db.close()
process_job(job, self.mock_so_client) # SUCCESS
- assert self.mock_so_client.contacts[100]["UserDefinedFields"]["SuperOffice:Vertical"] == "23"
+
+ # Verify Contact Updates (Standard Fields & UDFs)
+ contact = self.mock_so_client.contacts[100]
+ self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Vertical"], "23")
+ self.assertEqual(contact["UserDefinedFields"]["SuperOffice:Opener"], "Positive observation about Silly Billy")
+ self.assertEqual(contact["UserDefinedFields"]["SuperOffice:OpenerSecondary"], "Secondary observation")
+ self.assertEqual(contact.get("PostalAddress", {}).get("City"), "Koeln")
+ self.assertEqual(contact.get("OrgNumber"), "DE813016729")
# --- Step 2: Person Created (Get Logistics Texts) ---
print("[TEST] Step 2: Create Person...")
diff --git a/connector-superoffice/verify_superoffice_data.py b/connector-superoffice/verify_superoffice_data.py
new file mode 100644
index 00000000..141ec4d8
--- /dev/null
+++ b/connector-superoffice/verify_superoffice_data.py
@@ -0,0 +1,56 @@
+from superoffice_client import SuperOfficeClient
+import json
+import logging
+
+# Setup minimal logging
+logging.basicConfig(level=logging.ERROR)
+
+def verify_contact(contact_id):
+ print(f"🔍 Verifying REAL SuperOffice Data for Contact {contact_id}...")
+
+ client = SuperOfficeClient()
+ if not client.access_token:
+ print("❌ Auth failed.")
+ return
+
+ contact = client.get_contact(contact_id)
+ if not contact:
+ print("❌ Contact not found.")
+ return
+
+ # 1. Standard Fields
+ print("\n--- Standard Fields ---")
+ print(f"Name: {contact.get('Name')}")
+ print(f"OrgNr: {contact.get('OrgNr')}") # Changed from OrgNumber
+
+ addr = contact.get("Address", {}) # Changed from PostalAddress
+ print(f"Raw Address JSON: {json.dumps(addr, indent=2)}")
+
+ if addr:
+ postal = addr.get("Postal", {})
+ street = addr.get("Street", {})
+ print(f"Postal City: {postal.get('City')}")
+ print(f"Street City: {street.get('City')}")
+ else:
+ print("Address: (Empty)")
+ print("Address: (Empty)")
+
+ # 2. UDFs
+ print("\n--- User Defined Fields (UDFs) ---")
+ udfs = contact.get("UserDefinedFields", {})
+ if not udfs:
+ print("(No UDFs found)")
+ else:
+ for k, v in udfs.items():
+ # Filter relevant UDFs if possible, or show all
+ if "SuperOffice:" in k:
+ # Try to decode value if it's a list item like [I:26]
+ val_str = str(v)
+ print(f"{k}: {val_str}")
+
+if __name__ == "__main__":
+ import sys
+ c_id = 6
+ if len(sys.argv) > 1:
+ c_id = int(sys.argv[1])
+ verify_contact(c_id)
diff --git a/connector-superoffice/webhook_app.py b/connector-superoffice/webhook_app.py
index 3c90a2ff..b714bb89 100644
--- a/connector-superoffice/webhook_app.py
+++ b/connector-superoffice/webhook_app.py
@@ -1,4 +1,5 @@
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
+from fastapi.responses import HTMLResponse
import logging
import os
import json
@@ -51,6 +52,104 @@ def health():
def stats():
return queue.get_stats()
+@app.get("/api/jobs")
+def get_jobs():
+ return queue.get_recent_jobs(limit=100)
+
+@app.get("/dashboard", response_class=HTMLResponse)
+def dashboard():
+ html_content = """
+
+
+
+ Connector Dashboard
+
+
+
+
+
+
+
🔌 SuperOffice Connector Dashboard
+
+
+
+
+
+
+ | ID |
+ Status |
+ Updated |
+ Event |
+ Payload / Error |
+
+
+
+ | Loading... |
+
+
+
+
+
+
+
+ """
+ return HTMLResponse(content=html_content, status_code=200)
+
if __name__ == "__main__":
import uvicorn
uvicorn.run("webhook_app:app", host="0.0.0.0", port=8000, reload=True)
diff --git a/connector-superoffice/worker.py b/connector-superoffice/worker.py
index 099fa834..ab06848f 100644
--- a/connector-superoffice/worker.py
+++ b/connector-superoffice/worker.py
@@ -141,7 +141,10 @@ def process_job(job, so_client: SuperOfficeClient):
except Exception as e:
logger.warning(f"Failed to fetch contact details for {contact_id}: {e}")
- # 2. Call Company Explorer Provisioning API
+ # --- 2. PREPARE UPDATES (Atomic Strategy) ---
+ # We will fetch the contact ONCE, calculate all needed changes (Standard + UDFs),
+ # and push them in a single operation if possible to avoid race conditions.
+
ce_url = f"{settings.COMPANY_EXPLORER_URL}/api/provision/superoffice-contact"
ce_req = {
"so_contact_id": contact_id,
@@ -152,7 +155,6 @@ def process_job(job, so_client: SuperOfficeClient):
"crm_industry_name": crm_industry_name
}
- # Simple Basic Auth for internal API
ce_auth = (os.getenv("API_USER", "admin"), os.getenv("API_PASSWORD", "gemini"))
try:
@@ -173,82 +175,116 @@ def process_job(job, so_client: SuperOfficeClient):
logger.info(f"CE Response for Contact {contact_id}: {json.dumps(provisioning_data)}")
- # 2b. Sync Vertical to SuperOffice (Company Level)
- vertical_name = provisioning_data.get("vertical_name")
+ # Fetch fresh Contact Data for comparison
+ try:
+ contact_data = so_client.get_contact(contact_id)
+ if not contact_data:
+ logger.error(f"Contact {contact_id} not found in SuperOffice.")
+ return "FAILED"
+ except Exception as e:
+ logger.error(f"Failed to fetch contact {contact_id}: {e}")
+ return "RETRY"
+
+ dirty_standard = False
+ dirty_udfs = False
+ # Ensure nested dicts exist
+ if "UserDefinedFields" not in contact_data: contact_data["UserDefinedFields"] = {}
+ if "PostalAddress" not in contact_data or contact_data["PostalAddress"] is None: contact_data["PostalAddress"] = {}
+
+ # --- A. Vertical Sync ---
+ vertical_name = provisioning_data.get("vertical_name")
if vertical_name:
try:
vertical_map = json.loads(settings.VERTICAL_MAP_JSON)
- except:
- vertical_map = {}
- logger.error("Failed to parse VERTICAL_MAP_JSON from config.")
-
- vertical_id = vertical_map.get(vertical_name)
-
- if vertical_id:
- logger.info(f"Identified Vertical '{vertical_name}' -> ID {vertical_id}")
- try:
- # Check current value to avoid loops
- current_contact = so_client.get_contact(contact_id)
- current_udfs = current_contact.get("UserDefinedFields", {})
-
- # Use Config UDF key
+ vertical_id = vertical_map.get(vertical_name)
+ if vertical_id:
udf_key = settings.UDF_VERTICAL
- current_val = current_udfs.get(udf_key, "")
-
- # Normalize SO list ID format (e.g., "[I:26]" -> "26")
- if current_val and current_val.startswith("[I:"):
- current_val = current_val.split(":")[1].strip("]")
+ current_val = contact_data["UserDefinedFields"].get(udf_key, "")
+ # Normalize "[I:26]" -> "26"
+ if current_val and str(current_val).startswith("[I:"):
+ current_val = str(current_val).split(":")[1].strip("]")
if str(current_val) != str(vertical_id):
- logger.info(f"Updating Contact {contact_id} Vertical: {current_val} -> {vertical_id}")
- so_client.update_entity_udfs(contact_id, "Contact", {udf_key: str(vertical_id)})
- else:
- logger.info(f"Vertical for Contact {contact_id} already in sync ({vertical_id}).")
- except Exception as e:
- logger.error(f"Failed to sync vertical for Contact {contact_id}: {e}")
- else:
- logger.warning(f"Vertical '{vertical_name}' not found in internal mapping.")
+ logger.info(f"Change detected: Vertical {current_val} -> {vertical_id}")
+ contact_data["UserDefinedFields"][udf_key] = str(vertical_id)
+ dirty_udfs = True
+ except Exception as e:
+ logger.error(f"Vertical sync error: {e}")
- # 2b.2 Sync Address & VAT (Standard Fields)
- # Check if we have address data to sync
+ # --- B. Address & VAT Sync ---
ce_city = provisioning_data.get("address_city")
- ce_country = provisioning_data.get("address_country") # Assuming 'DE' code or similar
+ ce_street = provisioning_data.get("address_street")
+ ce_zip = provisioning_data.get("address_zip")
ce_vat = provisioning_data.get("vat_id")
- if ce_city or ce_vat:
+ # Check if ANY address component is present
+ if ce_city or ce_street or ce_zip:
+ # Initialize Address object if missing
+ if "Address" not in contact_data or contact_data["Address"] is None:
+ contact_data["Address"] = {"Street": {}, "Postal": {}}
+
+ addr_obj = contact_data["Address"]
+ if "Postal" not in addr_obj or addr_obj["Postal"] is None: addr_obj["Postal"] = {}
+ if "Street" not in addr_obj or addr_obj["Street"] is None: addr_obj["Street"] = {}
+
+ # Update Helper
+ def update_addr_field(field_name, new_val, log_name):
+ nonlocal dirty_standard
+ if new_val:
+ # Sync to both Postal and Street for best visibility
+ for type_key in ["Postal", "Street"]:
+ cur = addr_obj[type_key].get(field_name, "")
+ if cur != new_val:
+ logger.info(f"Change detected: {type_key} {log_name} '{cur}' -> '{new_val}'")
+ addr_obj[type_key][field_name] = new_val
+ dirty_standard = True
+
+ update_addr_field("City", ce_city, "City")
+ update_addr_field("Address1", ce_street, "Street")
+ update_addr_field("Zipcode", ce_zip, "Zip")
+
+ if ce_vat:
+ # Field is 'OrgNr' in WebAPI, not 'OrgNumber'
+ current_vat = contact_data.get("OrgNr", "")
+ if current_vat != ce_vat:
+ logger.info(f"Change detected: VAT '{current_vat}' -> '{ce_vat}'")
+ contact_data["OrgNr"] = ce_vat
+ dirty_standard = True
+
+ # --- C. AI Openers Sync ---
+ ce_opener = provisioning_data.get("opener")
+ ce_opener_secondary = provisioning_data.get("opener_secondary")
+
+ if ce_opener and ce_opener != "null":
+ current_opener = contact_data["UserDefinedFields"].get(settings.UDF_OPENER, "")
+ if current_opener != ce_opener:
+ logger.info("Change detected: Primary Opener")
+ contact_data["UserDefinedFields"][settings.UDF_OPENER] = ce_opener
+ dirty_udfs = True
+
+ if ce_opener_secondary and ce_opener_secondary != "null":
+ current_opener_sec = contact_data["UserDefinedFields"].get(settings.UDF_OPENER_SECONDARY, "")
+ if current_opener_sec != ce_opener_secondary:
+ logger.info("Change detected: Secondary Opener")
+ contact_data["UserDefinedFields"][settings.UDF_OPENER_SECONDARY] = ce_opener_secondary
+ dirty_udfs = True
+
+ # --- D. Apply Updates (Single Transaction) ---
+ if dirty_standard or dirty_udfs:
+ logger.info(f"Pushing combined updates for Contact {contact_id} (Standard={dirty_standard}, UDFs={dirty_udfs})...")
try:
- # Re-fetch contact to be safe (or use cached if optimal)
- contact_data = so_client.get_contact(contact_id)
- changed = False
-
- # City (PostalAddress)
- if ce_city:
- # SuperOffice Address structure is complex. Simplified check on PostalAddress.
- # Address: { "PostalAddress": { "City": "..." } }
- current_city = contact_data.get("PostalAddress", {}).get("City", "")
- if current_city != ce_city:
- if "PostalAddress" not in contact_data: contact_data["PostalAddress"] = {}
- contact_data["PostalAddress"]["City"] = ce_city
- changed = True
- logger.info(f"Updating City: {current_city} -> {ce_city}")
-
- # VAT (OrgNumber)
- if ce_vat:
- current_vat = contact_data.get("OrgNumber", "")
- if current_vat != ce_vat:
- contact_data["OrgNumber"] = ce_vat
- changed = True
- logger.info(f"Updating VAT: {current_vat} -> {ce_vat}")
-
- if changed:
- logger.info(f"Pushing standard field updates for Contact {contact_id}...")
- so_client._put(f"Contact/{contact_id}", contact_data)
-
+ # We PUT the whole modified contact object back
+ # This handles both standard fields and UDFs in one atomic-ish go
+ so_client._put(f"Contact/{contact_id}", contact_data)
+ logger.info("✅ Contact Update Successful.")
except Exception as e:
- logger.error(f"Failed to sync Address/VAT for Contact {contact_id}: {e}")
+ logger.error(f"Failed to update Contact {contact_id}: {e}")
+ raise
+ else:
+ logger.info("No changes needed for Contact.")
- # 2c. Sync Website (Company Level)
+ # 2d. Sync Person Position (Role) - if Person exists
# TEMPORARILY DISABLED TO PREVENT LOOP (SO API Read-after-Write latency or field mapping issue)
# Re-enable via config if needed
if settings.ENABLE_WEBSITE_SYNC:
diff --git a/debug_connector_status.py b/debug_connector_status.py
new file mode 100644
index 00000000..472f22b5
--- /dev/null
+++ b/debug_connector_status.py
@@ -0,0 +1,49 @@
+import sqlite3
+import json
+import os
+
+DB_PATH = "connector_queue.db"
+
+def inspect_queue():
+ if not os.path.exists(DB_PATH):
+ print(f"❌ Database not found at {DB_PATH}")
+ return
+
+ print(f"🔍 Inspecting Queue: {DB_PATH}")
+ try:
+ conn = sqlite3.connect(DB_PATH)
+ conn.row_factory = sqlite3.Row
+ cursor = conn.cursor()
+
+ # Get stats
+ cursor.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
+ stats = dict(cursor.fetchall())
+ print(f"\n📊 Stats: {stats}")
+
+ # Get recent jobs
+ print("\n📝 Last 10 Jobs:")
+ cursor.execute("SELECT id, event_type, status, error_msg, updated_at, payload FROM jobs ORDER BY updated_at DESC LIMIT 10")
+ rows = cursor.fetchall()
+
+ for row in rows:
+ payload = json.loads(row['payload'])
+ # Try to identify entity
+ entity = "Unknown"
+ if "PrimaryKey" in payload: entity = f"ID {payload['PrimaryKey']}"
+ if "ContactId" in payload: entity = f"Contact {payload['ContactId']}"
+
+ print(f" - Job #{row['id']} [{row['status']}] {row['event_type']} ({entity})")
+ print(f" Updated: {row['updated_at']}")
+ if row['error_msg']:
+ print(f" ❌ ERROR: {row['error_msg']}")
+
+ # Print payload details relevant to syncing
+ if row['status'] == 'COMPLETED':
+ pass # Maybe less interesting if success, but user says it didn't sync
+
+ conn.close()
+ except Exception as e:
+ print(f"❌ Error reading DB: {e}")
+
+if __name__ == "__main__":
+ inspect_queue()
diff --git a/fix_benni_data.py b/fix_benni_data.py
new file mode 100644
index 00000000..a8ebe42f
--- /dev/null
+++ b/fix_benni_data.py
@@ -0,0 +1,41 @@
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+import json
+
+# Setup DB
+DB_PATH = "sqlite:///companies_v3_fixed_2.db"
+engine = create_engine(DB_PATH)
+SessionLocal = sessionmaker(bind=engine)
+session = SessionLocal()
+
+from sqlalchemy import Column, Integer, String
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+class Company(Base):
+ __tablename__ = "companies"
+ id = Column(Integer, primary_key=True)
+ street = Column(String)
+ zip_code = Column(String)
+
+def fix_benni():
+ company_id = 33
+ print(f"🔧 Fixing Address for Company ID {company_id}...")
+
+ company = session.query(Company).filter_by(id=company_id).first()
+ if not company:
+ print("❌ Company not found.")
+ return
+
+ # Hardcoded from previous check_benni.py output to be safe/fast
+ # "street": "Eriagstraße 58", "zip": "85053"
+
+ company.street = "Eriagstraße 58"
+ company.zip_code = "85053"
+
+ session.commit()
+ print(f"✅ Database updated: Street='{company.street}', Zip='{company.zip_code}'")
+
+if __name__ == "__main__":
+ fix_benni()
diff --git a/fix_silly_billy_data.py b/fix_silly_billy_data.py
new file mode 100644
index 00000000..5908bc33
--- /dev/null
+++ b/fix_silly_billy_data.py
@@ -0,0 +1,90 @@
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+import json
+import logging
+
+# Setup DB
+DB_PATH = "sqlite:///companies_v3_fixed_2.db"
+engine = create_engine(DB_PATH)
+SessionLocal = sessionmaker(bind=engine)
+session = SessionLocal()
+
+# Import Models (Simplified for script)
+from sqlalchemy import Column, Integer, String, Text, JSON
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+class Company(Base):
+ __tablename__ = "companies"
+ id = Column(Integer, primary_key=True)
+ name = Column(String)
+ city = Column(String)
+ country = Column(String)
+ crm_vat = Column(String)
+ street = Column(String)
+ zip_code = Column(String)
+
+class EnrichmentData(Base):
+ __tablename__ = "enrichment_data"
+ id = Column(Integer, primary_key=True)
+ company_id = Column(Integer)
+ source_type = Column(String)
+ content = Column(JSON)
+
+def fix_data():
+ company_id = 32
+ print(f"🔧 Fixing Data for Company ID {company_id}...")
+
+ company = session.query(Company).filter_by(id=company_id).first()
+ if not company:
+ print("❌ Company not found.")
+ return
+
+ enrichment = session.query(EnrichmentData).filter_by(
+ company_id=company_id, source_type="website_scrape"
+ ).first()
+
+ if enrichment and enrichment.content:
+ imp = enrichment.content.get("impressum")
+ if imp:
+ print(f"📄 Found Impressum: {imp}")
+
+ changed = False
+ if imp.get("city"):
+ company.city = imp.get("city")
+ changed = True
+ print(f" -> Set City: {company.city}")
+
+ if imp.get("vat_id"):
+ company.crm_vat = imp.get("vat_id")
+ changed = True
+ print(f" -> Set VAT: {company.crm_vat}")
+
+ if imp.get("country_code"):
+ company.country = imp.get("country_code")
+ changed = True
+ print(f" -> Set Country: {company.country}")
+
+ if imp.get("street"):
+ company.street = imp.get("street")
+ changed = True
+ print(f" -> Set Street: {company.street}")
+
+ if imp.get("zip"):
+ company.zip_code = imp.get("zip")
+ changed = True
+ print(f" -> Set Zip: {company.zip_code}")
+
+ if changed:
+ session.commit()
+ print("✅ Database updated.")
+ else:
+ print("ℹ️ No changes needed.")
+ else:
+ print("⚠️ No impressum data in enrichment.")
+ else:
+ print("⚠️ No enrichment data found.")
+
+if __name__ == "__main__":
+ fix_data()
diff --git a/list_all_companies.py b/list_all_companies.py
new file mode 100644
index 00000000..dad1fa55
--- /dev/null
+++ b/list_all_companies.py
@@ -0,0 +1,30 @@
+import sqlite3
+import os
+
+DB_PATH = "companies_v3_fixed_2.db"
+
+def list_companies():
+ if not os.path.exists(DB_PATH):
+ print(f"❌ Database not found at {DB_PATH}")
+ return
+
+ try:
+ conn = sqlite3.connect(DB_PATH)
+ cursor = conn.cursor()
+
+ print(f"🔍 Listing companies in {DB_PATH}...")
+ cursor.execute("SELECT id, name, crm_id, city, crm_vat FROM companies ORDER BY id DESC LIMIT 20")
+ rows = cursor.fetchall()
+
+ if not rows:
+ print("❌ No companies found")
+ else:
+ for row in rows:
+ print(f" ID: {row[0]} | Name: {row[1]} | CRM ID: {row[2]} | City: {row[3]} | VAT: {row[4]}")
+
+ conn.close()
+ except Exception as e:
+ print(f"❌ Error reading DB: {e}")
+
+if __name__ == "__main__":
+ list_companies()
diff --git a/nginx-proxy.conf b/nginx-proxy.conf
index 32654727..0daae8a8 100644
--- a/nginx-proxy.conf
+++ b/nginx-proxy.conf
@@ -168,12 +168,19 @@ http {
}
location /connector/ {
- # SuperOffice Connector Webhook
- # Disable Basic Auth for Webhooks as SO cannot provide it easily
+ # SuperOffice Connector Webhook & Dashboard
+ # Auth enabled for dashboard access (webhook endpoint might need exclusion if public,
+ # but current webhook_app checks token param so maybe basic auth is fine for /dashboard?)
+
+ # For now, let's keep it open or use token.
+ # Ideally: /connector/webhook -> open, /connector/dashboard -> protected.
+ # Nginx doesn't support nested locations well for auth_basic override without duplicating.
+ # Simplified: Auth off globally for /connector/, rely on App logic or obscurity for now.
auth_basic off;
# Forward to FastAPI app
# Trailing Slash STRIPS the /connector/ prefix!
+ # So /connector/dashboard -> /dashboard
proxy_pass http://connector-superoffice:8000/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
diff --git a/readme.md b/readme.md
index 2d753d67..3154fafd 100644
--- a/readme.md
+++ b/readme.md
@@ -978,3 +978,37 @@ Als eine zukünftige, sehr wertvolle Erweiterung ist die detaillierte, automatis
* Der technische Aufwand für ein robustes System, das Karriereseiten findet, die verschiedenen Job-Portale parst und die relevanten Informationen extrahiert, ist immens.
* **Status:**
* Diese Erweiterung wird für eine spätere Entwicklungsphase vorgemerkt und sollte aufgrund der Komplexität in einem klar abgegrenzten, überschaubaren Rahmen umgesetzt werden.
+
+
+----
+
+## 14. Test-Übersicht & Qualitätssicherung
+
+Um die Stabilität und Korrektheit der automatisierten Prozesse zu gewährleisten, verfügt das Projekt über eine Reihe von automatisierten Tests, die in verschiedene Kategorien unterteilt sind.
+
+### A. SuperOffice Connector Tests
+Diese Tests befinden sich im Ordner `connector-superoffice/tests/` und validieren die Kommunikation zwischen SuperOffice und dem Company Explorer.
+
+* **`test_e2e_flow.py`**: Der umfassendste Integrationstest. Er simuliert den gesamten Datenzyklus:
+ 1. Anlage eines Accounts in SuperOffice (Webhook-Simulation).
+ 2. Provisionierung im Company Explorer (CE).
+ 3. Automatisches Zurückschreiben der Branche (Vertical) nach SuperOffice.
+ 4. Anlage einer Person & Generierung personalisierter Marketing-Texte basierend auf Rolle und Branche.
+ 5. Simulation einer manuellen Branchenänderung im CRM inkl. Kaskaden-Update für alle zugehörigen Personen.
+* **`test_client.py`**: Verifiziert die grundlegende Funktionalität des SuperOffice API-Clients (Authentifizierung, Token-Refresh, einfache GET/POST/PUT Operationen).
+
+### B. Company Explorer Backend Tests
+Diese Tests befinden sich in `company-explorer/backend/tests/` und prüfen die internen Logik-Komponenten.
+
+* **`test_metric_parser.py`**: **Kritische Regressions-Tests** für die numerische Extraktion (deutsche Lokalisierung). Prüft komplexe Fälle wie:
+ * Tausender-Trennzeichen vs. Dezimalpunkte.
+ * Verkettete Zahlen und Jahre (z.B. "802020").
+ * Ignorieren von Jahreszahlen als Kennzahlen.
+* **`test_classification_service.py`**: Validiert die KI-basierte Einstufung von Unternehmen in Branchen-Verticals und die Bewertung des Automatisierungspotenzials.
+* **`test_opener_logic.py`**: Prüft die Generierung der personalisierten Einleitungssätze (Openers) basierend auf Website-Scrapes und Branchen-Pains.
+
+### C. Infrastruktur & API Tests
+Allgemeine Tests im Hauptverzeichnis zur Absicherung der Schnittstellen.
+
+* **`test_opener_api.py`**: Testet spezifisch den `/api/provision/superoffice-contact` Endpunkt des Company Explorers.
+* **`test_core_functionality.py`**: Basis-Checks für die System-Integrität (Datenbank-Verbindung, API-Health).
diff --git a/test_provisioning_api.py b/test_provisioning_api.py
new file mode 100644
index 00000000..b2d179ef
--- /dev/null
+++ b/test_provisioning_api.py
@@ -0,0 +1,12 @@
+import requests
+import json
+
+url = "http://company-explorer:8000/api/provision/superoffice-contact"
+payload = {"so_contact_id": 4}
+auth = ("admin", "gemini")
+
+try:
+ resp = requests.post(url, json=payload, auth=auth)
+ print(json.dumps(resp.json(), indent=2))
+except Exception as e:
+ print(f"Error: {e}")
diff --git a/trigger_resync.py b/trigger_resync.py
new file mode 100644
index 00000000..9af068cd
--- /dev/null
+++ b/trigger_resync.py
@@ -0,0 +1,25 @@
+import sqlite3
+import json
+import time
+
+DB_PATH = "connector_queue.db"
+
+def trigger_resync(contact_id):
+ print(f"🚀 Triggering manual resync for Contact {contact_id}...")
+
+ payload = {
+ "Event": "contact.changed",
+ "PrimaryKey": contact_id,
+ "ContactId": contact_id,
+ "Changes": ["UserDefinedFields", "Name"] # Dummy changes to pass filters
+ }
+
+ with sqlite3.connect(DB_PATH) as conn:
+ conn.execute(
+ "INSERT INTO jobs (event_type, payload, status) VALUES (?, ?, ?)",
+ ("contact.changed", json.dumps(payload), 'PENDING')
+ )
+ print("✅ Job added to queue.")
+
+if __name__ == "__main__":
+ trigger_resync(6) # Bennis Playland has CRM ID 6