#!/usr/bin/python # -*- coding: utf-8 -*- import sys import os import re import logging import datetime import json import unicodedata import smtplib import httpx from os.path import basename from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formatdate # Configure logging logging.basicConfig( filename='/var/log/lateNotice.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s' ) #------------------------------------------------ # Configuration Variables #------------------------------------------------ URL = 'https://demo.wyzio.com' TOKEN = 'ffffffff-ffff-ffff-ffff-ffffffffffff' MAIL_SERVER = '10.10.10.10' ORGANIZATION_NAME = 'Acme SA' NB_DAYS_FOR_LATE_NOTICE_1 = 20 NB_DAYS_FOR_LATE_NOTICE_2 = 10 NB_DAYS_FOR_LATE_NOTICE_3 = 5 LEVEL_OF_ACCEPTANCE = 3 DEFAULT_EMAIL_ADDRESS = 'abc@def.com' SEND_FROM_EMAIL = 'abc@def.com' #------------------------------------------------ # Helper Functions #------------------------------------------------ def get_overdue_invoices(organization_id): headers = { 'User-Agent': 'Wyzio/1.0', 'weal-token': TOKEN, 'cache-control': 'no-cache', 'target-organization-id': str(organization_id) } service_url = f"{URL}/api-v1/late-notice/client?sizePerPage=10000&sortingField=overdueAmount&direction=DESC&organizationId={organization_id}" try: response = httpx.get(service_url, headers=headers, timeout=10) return response.json() except Exception as e: logging.warning(f"Failed to get overdue invoices for organization {organization_id}: {e}") return [] def get_invoice_details(organization_id, invoice_number): headers = { 'User-Agent': 'Wyzio/1.0', 'weal-token': TOKEN, 'cache-control': 'no-cache', 'target-organization-id': str(organization_id) } service_url = f"{URL}/api-v1/sales-invoice?number={invoice_number}" try: response = httpx.get(service_url, headers=headers, timeout=10) return response.json() except Exception as e: logging.warning(f"Cannot get invoice details for invoice {invoice_number}: {e}") return [] def send_mail(send_from, send_to, subject, html_content, pdf_file, smtp_server=MAIL_SERVER): msg = MIMEMultipart() msg['From'] = send_from msg['To'] = send_to msg['Date'] = formatdate(localtime=True) msg['Subject'] = subject # Attach HTML content msg.attach(MIMEText(html_content, "html", "utf-8")) # Attach PDF file try: with open(pdf_file, "rb") as f: part = MIMEApplication(f.read(), _subtype='pdf') part.add_header('Content-Disposition', 'attachment', filename=basename(pdf_file)) msg.attach(part) except Exception as e: logging.warning(f"Could not attach PDF file {pdf_file}: {e}") return try: with smtplib.SMTP(smtp_server, 587) as smtp: smtp.set_debuglevel(1) smtp.ehlo() smtp.starttls() smtp.ehlo() smtp.login("email", "password") smtp.sendmail(send_from, send_to, msg.as_string()) except Exception as e: logging.warning(f"Failed to send mail to {send_to}: {e}") def determine_language_fields(lang, invoice_number): """Return subject, text, footer, invoice label, due date label, and status label based on language.""" if lang.upper() == 'FR': return ( f'Rappel concernant la facture {invoice_number}', 'Cher client,
Veuillez trouver en annexe un rappel concernant une facture impayée.', 'Meilleures salutations,
Votre équipe Wyzio', 'Facture # :', 'Date de facture: ', 'Statut: ' ) elif lang.upper() == 'DE': return ( f'Verspätete Benachrichtigung für Rechnung {invoice_number}', 'Sehr geehrter Kunde,
Bitte finden Sie im Anhang eine Verspätungsbescheid über eine unbezahlte Rechnung.', 'Besondere Grüße,
Ihr Wyzio Team', 'Rechnung # :', 'Rechnungsdatum: ', 'Status: ' ) elif lang.upper() == 'IT': return ( f'Avviso in ritardo per la fattura {invoice_number}', 'Gentile cliente,
Si prega di trovare in allegato un avviso tardivo relativo a una fattura non pagata.', 'Ti migliori saluti,
Il vostro team Wyzio', 'Fattura # :', 'Data fattura: ', 'Stato: ' ) else: # Default to English return ( f'Late Notice for invoice {invoice_number}', "Dear customer,
Please find attached a late notice concerning an unpaid invoice.
Best regards,
Your Wyzio's Team", 'Best regards,
Your Wyzio\'s Team', 'Invoice # :', 'Invoice date: ', 'Status: ' ) def format_email_text(subject, text, currency, total_amount, status_label, invoice_status, due_date_label, invoice_date, footer): # Replace HTML
with newlines in text and footer and add additional invoice details plain_text = text.replace('
', '\n\n') plain_footer = footer.replace('
', '\n') additional_info = f"\n\nTotal ({currency}): {total_amount}\n{status_label.replace('','')}{invoice_status}\n{due_date_label}{invoice_date}\n\n{plain_footer}" return plain_text + additional_info #------------------------------------------------ # Main Script Logic #------------------------------------------------ def main(): # Use common headers for organization lookup common_headers = {'User-Agent': 'Wyzio/1.0', 'weal-token': TOKEN, 'cache-control': 'no-cache'} org_service = f"{URL}/api-v1/organization?name={ORGANIZATION_NAME}" try: response = httpx.get(org_service, headers=common_headers, timeout=10) except Exception as e: logging.warning(f"Cannot connect to server: {e}") sys.exit("Cannot connect to server!") try: organization_id = response.json()[0]['id'] except Exception as e: logging.warning(f"Organization {ORGANIZATION_NAME} does not exist: {e}") sys.exit(f"Organization {ORGANIZATION_NAME} does not exist!") # Use headers with organization target for subsequent requests org_headers = { 'User-Agent': 'Wyzio/1.0', 'weal-token': TOKEN, 'cache-control': 'no-cache', 'target-organization-id': str(organization_id) } # Set up deadlines for late notices now = datetime.datetime.now() deadline1 = now - datetime.timedelta(days=NB_DAYS_FOR_LATE_NOTICE_1) deadline2 = now - datetime.timedelta(days=NB_DAYS_FOR_LATE_NOTICE_2) deadline3 = now - datetime.timedelta(days=NB_DAYS_FOR_LATE_NOTICE_3) logging.warning("Start of late notices check!") overdue_invoices = get_overdue_invoices(organization_id) logging.info(f"{len(overdue_invoices)} late notices to look at!") for invoice in overdue_invoices: unacceptable = False do_not_send = False invoice_number = invoice.get('salesInvoiceNumber') logging.info(f"Should generate a late notice for invoice {invoice_number}!") notice_qty = invoice.get('noticeGeneratedQuantity', 0) try: if notice_qty == 0: deadline = deadline1 last_notice_date = datetime.datetime.strptime(invoice['overdueDate'][:10], '%Y-%m-%d') if LEVEL_OF_ACCEPTANCE == 1: unacceptable = True elif notice_qty == 1: deadline = deadline2 last_notice_date = datetime.datetime.strptime(invoice['lastNoticeGeneratedDate'][:10], '%Y-%m-%d') if LEVEL_OF_ACCEPTANCE == 2: unacceptable = True elif notice_qty == 2: deadline = deadline3 last_notice_date = datetime.datetime.strptime(invoice['lastNoticeGeneratedDate'][:10], '%Y-%m-%d') if LEVEL_OF_ACCEPTANCE == 3: unacceptable = True elif notice_qty >= 3: do_not_send = True else: deadline = deadline1 last_notice_date = datetime.datetime.strptime(invoice['overdueDate'][:10], '%Y-%m-%d') except Exception as e: logging.warning(f"Error parsing notice dates for invoice {invoice_number}: {e}") continue if last_notice_date < deadline and not do_not_send: logging.info(f"Will generate a late notice for invoice {invoice_number}!") invoice_details = get_invoice_details(organization_id, invoice_number) if not invoice_details: continue try: inv = invoice_details[0] # Extract invoice details; assume these fields exist currency = inv['currency'] total_amount = str(inv['totalAmount']) invoice_date = str(inv['date'])[:10] invoice_status = inv['status'] # Get client contact details client_service = f"{URL}/api-v1/contact?id={invoice.get('clientContactId')}" client = httpx.get(client_service, headers=org_headers, timeout=10).json() if client: contact_service = f"{URL}/api-v1/contact/{client[0]['contactType']}/{client[0]['id']}" client_details = httpx.get(contact_service, headers=org_headers, timeout=10).json() send_to = client_details.get('email') or 'NONE' lang = client_details.get('defaultLanguage', 'EN') else: logging.warning(f"Cannot get client id for contact {invoice.get('clientContactId')}") send_to = 'NONE' lang = 'EN' # Set language-dependent email content subject, text, footer, invoice_label, due_date_label, status_label = determine_language_fields(lang, invoice_number) # If unacceptable or missing email, override recipient if unacceptable or send_to == 'NONE': send_to = 'litigation@wyzio.com' # Create plain-text email body from HTML parts email_body = format_email_text(subject, text, currency, total_amount, status_label, invoice_status, due_date_label, invoice_date, footer) # Build PDF generation URL now_str = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%dT%H:%M:%S.000Z') pdf_service = ( f"{URL}/api-v1/generate-pdf?useLetterhead=true&overdueDate=%3C{now_str}" f"&id={invoice.get('id')}&organizationId={organization_id}&type=LATE_NOTICE" f"&lateNoticePdfType=REMINDER&increaseLateNoticeNumber=true" ) try: pdf_response = httpx.get(pdf_service, headers=org_headers, timeout=10) if pdf_response.status_code == 200: pdf_file = f"/tmp/{inv.get('number', invoice_number)}.pdf" with open(pdf_file, 'wb') as f: f.write(pdf_response.content) else: logging.warning(f"PDF generation failed for invoice {invoice_number}") continue except Exception as e: logging.warning(f"Error generating PDF for invoice {invoice_number}: {e}") continue try: send_mail(SEND_FROM_EMAIL, send_to, subject, email_body, pdf_file) logging.warning(f"The late notice for invoice {invoice_number} was sent!") except Exception as e: logging.warning(f"Could not send late notice for invoice {invoice_number}: {e}") try: os.remove(pdf_file) except Exception: pass except Exception as e: logging.warning(f"Error processing invoice {invoice_number}: {e}") logging.warning("End of late notices check!") if __name__ == "__main__": main()
#!/usr/bin/python3 # -*- coding: utf-8 -*- import httpx import sys import json import logging import datetime import unicodedata import calendar logging.basicConfig( filename='/var/log/generateInterestInvoices.log', level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s' ) #------------------------------------------------ # Helper Functions #------------------------------------------------ def get_new_details(journal_entries, start_date_range, end_date_range, rate, delay=0): """ Compute interest details for a list of journal entries. Returns a list where every two elements represent a description and an interest amount. """ new_invoice_line_list = [] total_entries = len(journal_entries) # Pre-calculate the parsed start/end range dates for comparison start_range_dt = datetime.datetime.strptime(start_date_range[:10], '%Y-%m-%d') end_range_dt = datetime.datetime.strptime(end_date_range[:10], '%Y-%m-%d') for d, entry in enumerate(journal_entries): try: principal = float(entry['companyTotal']) except Exception as e: logging.warning(f"Error parsing companyTotal for entry {d}: {e}") continue # Determine start date for the period if d == 0: start_date = start_range_dt + (datetime.timedelta(days=30) if delay else datetime.timedelta(0)) else: entry_date = datetime.datetime.strptime(entry['entryDate'][:10], '%Y-%m-%d') start_date = entry_date + (datetime.timedelta(days=30) if delay else datetime.timedelta(0)) # Determine end date for the period if total_entries == 1 or d == total_entries - 1: end_date = end_range_dt else: end_date = datetime.datetime.strptime(journal_entries[d + 1]['entryDate'][:10], '%Y-%m-%d') # If the period covers the full range, use a fixed duration of 90 days; otherwise, compute the difference if start_date == start_range_dt and end_date == end_range_dt: nb_days = 90 else: nb_days = (end_date - start_date).days if d == total_entries - 1: nb_days += 1 nb_days = max(0, min(nb_days, 90)) # Ensure nb_days is between 0 and 90 if principal > 0 and nb_days > 0 and rate > 0: # Round interest to the nearest 0.05 (i.e. increments of 20) interest = round(principal * rate * nb_days / 365 * 20) / 20 description = ( f"Intérêts de {rate * 100}% sur CHF {'{:,}'.format(principal)} du " f"{start_date.strftime('%d-%m-%Y')} au {end_date.strftime('%d-%m-%Y')}, soit {nb_days} jours" ) new_invoice_line_list.extend([description, interest]) return new_invoice_line_list def new_line_detail(detail, account, temp_id): """ Build a new invoice line detail dictionary. Returns a tuple of (line dictionary, interest amount). """ new_detail = { "discount": 0, "saleAccountId": account, "vatRateId": None, "totalPrice": detail['interest'], "netUnitPrice": 1, "vat": None, "quantity": detail['interest'], "departmentId": None, "lineType": "ARTICLE", "articleId": 65475909, "totalNet": detail['interest'], "id": None, "description": detail['description'], "allocationKey": None, "appliedAllocationRules": [], "tempId": str(temp_id) } return new_detail, detail['interest'] def new_line_layout(temp_id): """ Build a new invoice line layout dictionary. """ return { "children": None, "parentId": None, "position": temp_id + 3, "name": None, "lineType": "NO_ARTICLE", "tempId": str(temp_id), "salesInvoiceLineId": str(temp_id + 1), "collapsed": False, "id": str(temp_id), "leaf": False } #------------------------------------------------ # Configuration Variables #------------------------------------------------ url = 'https://demo.wyzio,com' token = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' headers = { 'User-Agent': 'Wyzio/1.0', 'weal-token': token, 'cache-control': 'no-cache' } today = datetime.datetime.now() month = int(today.strftime('%m')) year = today.strftime('%Y') time_suffix = 'T00:00:00.000Z' overdue_date = today.replace(day=calendar.monthrange(today.year, today.month)[1]) overdue_date = overdue_date.strftime('%Y-%m-%d') + time_suffix rate = 0.05 # Determine date ranges based on current month (quarterly periods) if month <= 3: start_date_range = f"{int(year)-1}-10-01" + time_suffix end_date_range = f"{int(year)-1}-12-31" + time_suffix elif month <= 6: start_date_range = year + "-01-01" + time_suffix end_date_range = year + "-03-31" + time_suffix elif month <= 9: start_date_range = year + "-04-01" + time_suffix end_date_range = year + "-06-30" + time_suffix else: start_date_range = year + "-07-01" + time_suffix end_date_range = year + "-09-30" + time_suffix new_invoice_date = end_date_range # Get organization info org_service = f"{url}/api-v1/organization?name=Acme Ltd." try: response = httpx.get(org_service, headers=headers, timeout=10) except Exception as e: logging.warning(f"Cannot connect to server: {e}") sys.exit("Cannot connect to server!") try: organization_id = response.json()[0]['id'] except Exception as e: logging.warning(f"Organization Acme Ltd. does not exist: {e}") sys.exit("Organization Acme Ltd. does not exist!") # (Override organization_id if necessary) organization_id = '999999999999999' headers = { 'weal-token': token, 'cache-control': 'no-cache', 'target-organization-id': organization_id, 'Content-Type': 'application/json' } # Get customer list customer_service = f"{url}/api-v1/contact?sizePerPage=10000&isCustomer=true" try: customer_list = httpx.get(customer_service, headers=headers, timeout=10).json() except Exception as e: logging.warning(f"Cannot get list of customers: {e}") customer_list = [] # Get account details def fetch_account(account_number, label): service = f"{url}/api-v1/chart-accounts?accountNumber={account_number}" try: resp = httpx.get(service, headers=headers, timeout=10).json() return str(resp[0]['id']) except Exception as e: logging.warning(f"Cannot get {label} detail: {e}") return None accounting_account = fetch_account(44021, "accounting account") BL_account = fetch_account(10204, "BL account") payment_account = fetch_account(10011, "payment account") logging.info(f"{len(customer_list)} customer(s) to treat.") #------------------------------------------------ # Main Processing Loop #------------------------------------------------ new_invoice_list = [] # Not used later; could be removed if not needed for customer in customer_list: customer_id = customer['id'] customer_name = customer['name'] # Fetch ledger details for various accounts for the customer within the date range def fetch_ledger(account): service = (f"{url}/api-v1/ledger?contactIds={customer_id}&account={account}" f"&entryDate={start_date_range}..{end_date_range}&pageNumber=1" f"&isBalanceCarried=true&sortingField=entryDate&direction=ASC" f"&sizePerPage=10000&isAppRequest=true") try: return httpx.get(service, headers=headers, timeout=10).json() except Exception as e: logging.warning(f"Cannot get total of account {account} for customer {customer_name}: {e}") return [] account10200 = fetch_ledger(10200) account10201 = fetch_ledger(10201) account10204 = fetch_ledger(10204) # Sum the totals from the last ledger entry of each account if available total_due = 0 for ledger in (account10200, account10201, account10204): if ledger: total_due += ledger[-1].get('companyTotal', 0) if total_due > 0: new_invoice_line_list = [] new_invoice_line_list.extend(get_new_details(account10200, start_date_range, end_date_range, rate)) new_invoice_line_list.extend(get_new_details(account10201, start_date_range, end_date_range, rate)) new_invoice_line_list.extend(get_new_details(account10204, start_date_range, end_date_range, rate, delay=1)) # Build a list of details from pairs of (description, interest) new_detail = [ {'description': desc, 'interest': interest} for desc, interest in zip(new_invoice_line_list[0::2], new_invoice_line_list[1::2]) ] if new_detail: # Get client contact detail contact_service = f"{url}/api-v1/contact/{customer_id}/client" try: contact_detail = httpx.get(contact_service, headers=headers, timeout=10).json() except Exception as e: logging.warning(f"Cannot get client detail for customer {customer_name}: {e}") continue # Find invoice address matching invoiceAddressId, otherwise use first address address = None for addr in contact_detail.get('addresses', []): if addr.get('id') == contact_detail.get('invoiceAddressId'): address = addr break if address is None and contact_detail.get('addresses'): address = contact_detail['addresses'][0] if address: address['id'] = None # Reset address id new_invoice = [{ "id": None, "clientAddressId": contact_detail.get('invoiceAddressId'), "clientId": customer_id, "comments": None, "currencyId": 1214, "date": new_invoice_date, "timesheetInfoItems": [], "excludedFromVat": False, "receivableAccountId": BL_account, "vatTypeCodeId": None, "clientName": customer_name, "description": f"Intérêts au {datetime.datetime.strptime(end_date_range[:10],'%Y-%m-%d').strftime('%d-%m-%Y')}", "address": address, "invoiceLines": [], "invoiceLinesLayout": [], "number": None, "reference": None, "shipments": [], "payments": [{ "bankCharges": 0, "paidAmount": 0, "requestedAmount": 0, "overdueDate": overdue_date, "receivableByAccountId": payment_account, "id": None, "lossOnDebtors": 0, "requestedDate": overdue_date, "valueDate": None }], "warnings": None }] total_amount = 0 index = 0 new_invoice_line = [] new_layout = [] for detail in new_detail: index -= 1 line, total = new_line_detail(detail, accounting_account, index) new_invoice_line.append(line) total_amount += float(total) index -= 1 new_layout.append(new_line_layout(index)) if total_amount > 1: new_invoice[0]['invoiceLines'] = new_invoice_line new_invoice[0]['invoiceLinesLayout'] = new_layout requested_amount = f"{total_amount:.2f}" new_invoice[0]['payments'][0]['requestedAmount'] = requested_amount # Post the new interest invoice invoice_service = f"{url}/api-v1/sales-invoice/batch" try: post_response = httpx.post(invoice_service, data=json.dumps(new_invoice), headers=headers, timeout=10).json() if post_response.get('items', [{}])[0].get('errorCode', {}).get('errorCode') == 'NO_ERROR': logging.debug(f"New interest invoice for {customer_name} was successfully created!") else: logging.warning(f"Could not insert interest invoice for {customer_name}!") except Exception as e: logging.warning(f"Could not insert interest invoice for {customer_name}: {e}") # End of processing logging.info("Finished processing interest invoices.")
#!/usr/bin/python3 # -*- coding: utf-8 -*- import sys import httpx import logging import datetime import json import unicodedata from xlrd import open_workbook from os.path import basename # Configure logging logging.basicConfig( filename='/var/log/lateNotice.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s' ) class Contact: def __init__(self, contact_type, default_language, company_name, last_name, first_name, phone, fax, mobile, email, web_site, address_line1, address_line2, city_name, post_code, region_code, state_name, country_code, country_id, to_the_attention_of, currency_id, payment_terms, credit_limit, client_discount_rate): self.contact_type = contact_type self.default_language = default_language self.company_name = company_name self.last_name = last_name self.first_name = first_name self.phone = phone self.fax = fax self.mobile = mobile self.email = email self.web_site = web_site self.address_line1 = address_line1 self.address_line2 = address_line2 self.city_name = city_name self.post_code = post_code self.region_code = region_code self.state_name = state_name self.country_code = country_code self.country_id = country_id self.to_the_attention_of = to_the_attention_of self.currency_id = currency_id self.payment_terms = payment_terms self.credit_limit = credit_limit self.client_discount_rate = client_discount_rate def __str__(self): return ( f"Contact object:\n" f" contactType = {self.contact_type}\n" f" defaultLanguage = {self.default_language}\n" f" companyName = {self.company_name}\n" f" lastName = {self.last_name}\n" f" firstName = {self.first_name}\n" f" Phone = {self.phone}\n" f" Fax = {self.fax}\n" f" Mobile = {self.mobile}\n" f" Email = {self.email}\n" f" webSite = {self.web_site}\n" f" addressLine1 = {self.address_line1}\n" f" addressLine2 = {self.address_line2}\n" f" cityName = {self.city_name}\n" f" postCode = {self.post_code}\n" f" regionCode = {self.region_code}\n" f" stateName = {self.state_name}\n" f" countryCode = {self.country_code}\n" f" countryId = {self.country_id}\n" f" toTheAttentionOf = {self.to_the_attention_of}\n" f" currencyId = {self.currency_id}\n" f" paymentTerms = {self.payment_terms}\n" f" creditLimit = {self.credit_limit}\n" f" clientDiscountRate = {self.client_discount_rate}\n" ) def read_contacts_from_excel(filename): wb = open_workbook(filename) contacts = [] for sheet in wb.sheets(): nrows = sheet.nrows ncols = sheet.ncols # Skip header row (assumed to be the first row) for row in range(1, nrows): values = [] for col in range(ncols): cell_value = sheet.cell(row, col).value try: # Try converting numeric values to int and then to string cell_value = str(int(cell_value)) except (ValueError, TypeError): cell_value = str(cell_value) values.append(cell_value) try: contact = Contact(*values) contacts.append(contact) except Exception as e: logging.warning(f"Error creating Contact from row {row}: {e}") return contacts def create_contact_payload(item, organization_id): """Create a JSON payload for a contact based on its type.""" if item.contact_type.upper() == 'ORGANIZATION': payload = { "contactId": -1, "organizationId": organization_id, "warnings": {}, "contactType": item.contact_type, "defaultLanguage": item.default_language, "name": item.company_name, "type": "ContactDetailsOrganization", "webSite": item.web_site, "mobile": item.mobile, "phone": item.phone, "archived": False, "fax": item.fax, "email": item.email, "isCustomer": True, "isVendor": False, "addresses": [{ "regionCode": item.region_code, "id": None, "name": "Main", "line1": item.address_line1, "line2": item.address_line2, "postCode": item.post_code, "city": item.city_name, "countryCode2": item.country_code, "countryId": item.country_id, "toTheAttentionOf": item.to_the_attention_of, "position": 0 }], "banks": [], "supplier": { "paymentTerms": None }, "client": { "orderAddressId": None, "invoiceAddressId": None, "paymentTerms": None, "lateNoticeMargin": None, "creditLimit": None, "clientDiscountRate": None, "orderAddressName": "Main", "invoiceAddressName": "Main" }, "communications": [] } else: payload = { "contactId": -1, "organizationId": organization_id, "warnings": {}, "contactType": item.contact_type, "defaultLanguage": item.default_language, "firstName": item.first_name, "lastName": item.last_name, "middleName": None, "nationalityCountryId": None, "gender": None, "birthday": None, "permissionsTree": [], "isImpersonateOthers": False, "isActive": True, "staff": { "contactId": -1, "id": None, "avsNumber": None, "payrollCurrencyId": None, "employmentStartDate": None, "employmentEndDate": None, "employeeDocumentType": None, "documentNumber": None, "documentExpiryDate": None, "contractType": None, "maritalType": None, "maritalDate": None, "isSpouseWorking": None, "familyMembers": [], "timesheetPricingLines": [], "staffPayslips": [], "workPermit": None, "permitExpiryDate": None }, "type": "ContactDetailsPerson", "webSite": item.web_site, "mobile": item.mobile, "phone": item.phone, "archived": False, "fax": item.fax, "email": item.email, "isCustomer": True, "isVendor": False, "addresses": [{ "regionCode": item.region_code, "id": None, "name": "Main", "line1": item.address_line1, "line2": item.address_line2, "postCode": item.post_code, "city": item.city_name, "countryCode2": item.country_code, "countryId": item.country_id, "toTheAttentionOf": item.to_the_attention_of, "position": 0 }], "banks": [], "supplier": { "paymentTerms": None }, "client": { "orderAddressId": None, "invoiceAddressId": None, "paymentTerms": None, "lateNoticeMargin": None, "creditLimit": None, "clientDiscountRate": None, "orderAddressName": "Main", "invoiceAddressName": "Main" }, "communications": [] } return payload def send_contacts(payload, webservice, headers): try: response = httpx.post(webservice, data=json.dumps(payload), headers=headers, timeout=10) response_data = response.json() if response_data['items'][0]['errorCode']['errorCode'] == 'NO_ERROR': new_contact_id = response_data['items'][0]['freshRow']['id'] logging.debug(f"New contact inserted with ID {new_contact_id}") return new_contact_id else: logging.warning("Could not insert new contact!") return None except Exception as e: logging.exception(f"Exception occurred while sending contacts: {e}") raise def main(): # Read contacts from Excel file contacts = read_contacts_from_excel('Addresses_Test.xlsx') organization_id = '999999999999999' url = 'https://demo.wyzio.com' token = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa' headers = { 'User-Agent': 'Wyzio/1.0', 'weal-token': token, 'cache-control': 'no-cache', 'target-organization-id': organization_id, 'Content-Type': 'application/json' } webservice = url + '/api-v1/contact/batch' for item in contacts: payload = create_contact_payload(item, organization_id) send_contacts(payload, webservice, headers) if __name__ == '__main__': main()
Avoir une démo