Files

226 lines
8.0 KiB
Plaintext

import base64
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict
import jwt
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from django.conf import settings
from core.plugins.base_api import BaseAPI
from core.plugins.super_office.ticket_store import RedisTicketStore, TicketRecord
from core.plugins.super_office.utils import to_iso_z_datetime
class SuperOfficeAPI(BaseAPI):
__api_name__ = "SuperOfficeAPI"
def __init__(self):
super().__init__()
template_path = Path(__file__).with_name(
"partnersystemuser_envelope_template.xml"
)
self.envelope_template = template_path.read_text(encoding="utf-8").strip()
self.webapi_base = None
self.ticket = None
self.redis_store = RedisTicketStore()
def get_host(self) -> str:
if self.webapi_base is None:
self.set_ticket_and_webapi_base()
return self.webapi_base
def build_headers(self) -> Dict[str, str]:
if self.ticket is None:
self.set_ticket_and_webapi_base()
return {
"Authorization": f"SOTicket {self.ticket}",
"SO-AppToken": settings.SUPER_OFFICE_APPLICATION_TOKEN,
"Accept": "application/json",
}
def repair_authentication(self) -> bool:
new_ticket_data = self.get_ticket()
if (
not new_ticket_data
or not new_ticket_data.get("ticket")
or not new_ticket_data.get("webapi_base")
):
return False
self.redis_store.set(
TicketRecord(
ticket=new_ticket_data["ticket"],
webapi_base=new_ticket_data["webapi_base"],
issued_at=datetime.now(timezone.utc).timestamp(),
)
)
self.ticket = new_ticket_data["ticket"]
self.webapi_base = new_ticket_data["webapi_base"]
return True
@staticmethod
def generate_signed_system_token():
"""
# todo: doc
"""
ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M")
to_sign = f"{settings.SUPER_OFFICE_SYSTEM_USER_TOKEN}.{ts}".encode("utf-8")
key = serialization.load_pem_private_key(
settings.SUPER_OFFICE_PRIVATE_KEY_PEM.encode("utf-8"), password=None
)
signature = key.sign(to_sign, padding.PKCS1v15(), hashes.SHA256())
sig_b64 = base64.b64encode(signature).decode("ascii")
return f"{settings.SUPER_OFFICE_SYSTEM_USER_TOKEN}.{ts}.{sig_b64}"
def get_ticket(self):
"""
todo: doc
todo: test
:return:
"""
signed_system_token = self.generate_signed_system_token()
envelope = self.envelope_template.format(
application_token=settings.SUPER_OFFICE_APPLICATION_TOKEN,
context_identifier=settings.SUPER_OFFICE_CONTEXT_IDENTIFIER,
signed_system_token=signed_system_token,
return_token_type=settings.SUPER_OFFICE_RETURN_TOKEN_TYPE,
)
headers = {
"Content-Type": "text/xml; charset=utf-8",
"SOAPAction": "http://www.superoffice.com/superid/partnersystemuser/0.1/IPartnerSystemUserService/Authenticate",
}
resp = self.request(
method="POST",
path="",
data=envelope.encode("utf-8"),
headers=headers,
custom_host=settings.SUPER_OFFICE_SOAP_URL,
use_build_headers=False,
)
token = resp["Envelope"]["Body"]["AuthenticationResponse"]["Token"]
is_successful = resp["Envelope"]["Body"]["AuthenticationResponse"][
"IsSuccessful"
]
if not str(is_successful).lower() == "true":
return None
claims = jwt.decode(token, options={"verify_signature": False})
ticket = claims.get("http://schemes.superoffice.net/identity/ticket")
webapi_base = claims.get("http://schemes.superoffice.net/identity/webapi_url")
return {"ticket": ticket, "webapi_base": webapi_base}
def set_ticket_and_webapi_base(self):
ticket_data = self.redis_store.get()
if ticket_data:
self.ticket = ticket_data.ticket
self.webapi_base = ticket_data.webapi_base
return
lock_token = self.redis_store.acquire_refresh_lock()
try:
if lock_token is None:
# someone else is refreshing the ticket, wait and try to get it again
import time
for _ in range(10): # ~2s total
time.sleep(0.2)
ticket_data = self.redis_store.get()
if ticket_data:
self.ticket = ticket_data.ticket
self.webapi_base = ticket_data.webapi_base
return
raise RuntimeError("Failed to get ticket after waiting")
# we have the lock, refresh the ticket
ok = self.repair_authentication()
if not ok:
raise RuntimeError("SuperOffice Authenticate failed")
finally:
if lock_token:
self.redis_store.release_refresh_lock(lock_token)
@staticmethod
def _filter_since(since: datetime | None) -> str:
if since:
since = since.astimezone(timezone.utc).replace(microsecond=0)
since = since.strftime("%Y-%m-%dT%H:%M:%S")
return f"&$filter=(registeredDate+afterTime+'{since}' or updatedDate+afterTime+'{since}')"
return ""
def _query_with_paging(self, base_query: str, since: datetime | None = None):
if since:
query = base_query + self._filter_since(since)
else:
query = base_query
values = []
while True:
resp = self.request(method="GET", path=query)
values += resp.get("value", [])
if "odata.nextLink" in resp and resp["odata.nextLink"]:
query = resp["odata.nextLink"].split("Cust26703/api/")[-1]
time.sleep(1) # be nice to the API
continue
else:
break
return {
"value": values,
}
def get_all_projects(self, since: datetime | None = None):
return self._query_with_paging(
base_query="v1/Project?$select=PrimaryKey,name",
since=since,
)
def get_all_contacts(self, since: datetime | None = None):
return self._query_with_paging(
base_query="v1/Contact?$select=PrimaryKey,name",
since=since,
)
def get_all_persons(self, since: datetime | None = None):
return self.request(
method="GET",
path="v1/MDOList/Associate",
)
def get_all_sale_types(self, since: datetime | None = None):
return self.request(
method="GET",
path="v1/List/SaleType/Items",
)
def get_all_sources(self, since: datetime | None = None):
return {
"value": self.request(
method="GET",
path="v1/List/Source/Items",
)
}
def get_contact_projects(self, contact_id: str):
return self.request(
method="GET",
path=f"v1/Contact/{contact_id}/Projects",
)
def add_sale(self, **kwargs):
for key in ["Contact", "Project", "Person", "Associate"]:
if key in kwargs:
kwargs[key] = {f"{key}Id": kwargs[key]}
for key in ["SaleType", "Source"]:
if key in kwargs:
kwargs[key] = {"Id": kwargs[key]}
if "Saledate" in kwargs:
kwargs["Saledate"] = to_iso_z_datetime(kwargs["Saledate"])
return self.request(
method="POST",
path="v1/Sale",
json=kwargs,
)