Connecteur Societeinfo HubSpot : ajouter automatiquement des sociétés à HubSpot

HubSpot

Societeinfo est un outil hyper pratique pour effectuer de l'enrichissement sur une base de prospect français. En plus d'avoir un coût relativement faible, celui-ci offre une API très bien structurée. Avec l'API de Societeinfo et l'API d'Hubspot, il est possible d'ajouter automatiquement toutes les nouvelles boîtes créées avec des critères précis tels que : code APE, nombre d'employés, type de société, etc.

L'objectif du connecteur Societeinfo => Hubspot

Créer à intervalle de 24h dans Hubspot un contact pour chaque nouvelle société créées en France qui est associé à un code APE précis.

Les informations envoyées vers Hubspot

  • SIRET / SIREN
  • Date de création
  • Nom de la société
  • Source = sociétéinfo
  • Code postal
  • Ville
  • Email si existant
  • Numéro de téléphone si existant
  • Site web si existant
  • Prénom de l'associé principal
  • Nom de l'associé principal
  • Date d'anniversaire de l'associé principal
  • Titre de l'associé principal

Requis pour réaliser ce projet

  • La documentation de l'API Societeinfo ;
  • Un compte payant Societeinfo ;
  • La documentation de l'API Hubspot ;
  • Un compte Google Cloud (ou autres) ;
  • Fort probablement un développeur, ou quelqu'un avec beaucoup de volontés et de temps, ou nous.

Les scripts nécessaires

#!/usr/bin/python
# https://societeinfo.com/app/rest/api/v2/company.json/843630591?key=xxxxxx
import json
import requests
import logging
import time
import datetime
import pytz
import calendar
import os

# Change path to your file
LAST_PROCESSED_REGISTRATION_NUMBER_FILENAME = "./xxxx/last_processed_registration_number"
APE_CODE = os.environ.get("APE_CODE", None)
if not APE_CODE:
   raise Exception("APE code is missing")
SOCIETE_INFO_API_KEY = os.environ.get("SOCIETE_INFO_API_KEY", None)
if not SOCIETE_INFO_API_KEY:
   raise Exception("Societe-info API key is missing")
HUBSPOT_API_KEY = os.environ.get("HUBSPOT_API_KEY", None)
if not HUBSPOT_API_KEY:
   raise Exception("Hubspot API key is missing")
def log(text, level="info"):
   now = datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d - %H:%M:%S")
   line = "{} : {}".format(now, text)
   if level == "warning":
       logging.warning(line)
   elif level == "info":
       logging.info(line)
   else:
       logging.debug(line)
def get_companies(page):
   return "https://societeinfo.com/app/rest/api/v2/companies.json?key={}&sort=creationDateDesc&query={}&searchMode=keyword&page=
{}&limit=25&active=true&mincreationdate={}".format(SOCIETE_INFO_API_KEY, APE_CODE, page, (datetime.date.today() - datetime.timede
lta(1)).strftime("%Y%m%d"))
def get_company(registration_number):
   return "https://societeinfo.com/app/rest/api/v2/company.json/{}?key={}".format(registration_number, SOCIETE_INFO_API_KEY)
# def search_hs_contact(company_name):
#     return "https://api.hubapi.com/contacts/v1/search/query?q={}&hapikey={}&property=siren".format(company_name, HUBSPOT_API_KE
Y)
def format_date(date):
   utc = pytz.timezone('UTC')
   dt = datetime.datetime.combine(datetime.datetime.strptime(date, "%Y-%m-%d"), datetime.time(0, 0))
   utc_dt = utc.localize(dt, is_dst=None)
   return "{}000".format(calendar.timegm(utc_dt.timetuple()))
def compute_contact_properties(registration_number):
   payload = { "properties": [] }
   res = requests.get(get_company(registration_number))
   company = json.loads(res.text)["result"]
   if res.status_code == 200:
       payload["properties"].append({ "property": "siren", "value": registration_number })
       payload["properties"].append({ "property": "siret", "value": company["organization"]["full_registration_number"] })
       payload["properties"].append({ "property": "date_creation_societe", "value": format_date(company["organization"]["creatio
n_date"]) })
       payload["properties"].append({ "property": "company", "value": company["organization"]["name"] })
       payload["properties"].append({ "property": "origine", "value": "societeinfo_creation" })
       if "street" in company["organization"]["address"]:
           payload["properties"].append({ "property": "address", "value": company["organization"]["address"]["street"] })
       if "postal_code" in company["organization"]["address"]:
           payload["properties"].append({ "property": "zip", "value": company["organization"]["address"]["postal_code"] })
       if "city" in company["organization"]["address"]:
           payload["properties"].append({ "property": "city", "value": company["organization"]["address"]["city"] })
       if "email" in company["contacts"]:
           payload["properties"].append({ "property": "email", "value": company["contacts"]["email"] })
       if len(company["contacts"]["phones"]) > 0:
           payload["properties"].append({ "property": "phone", "value": company["contacts"]["phones"][0]["value"] })
       if len(company["contacts"]["phones"]) > 1:
           payload["properties"].append({ "property": "mobile", "value": company["contacts"]["phones"][1]["value"] })
       if "website_url" in company["web_infos"]:
           payload["properties"].append({ "property": "website", "value": company["web_infos"]["website_url"] })
       if len(company["contacts"]["corporate_officiers"]) > 0:
           corporate_officier = company["contacts"]["corporate_officiers"][0]
           first_name = corporate_officier["firstName"].split(" ")[0]
           payload["properties"].append({ "property": "firstname", "value": first_name })
           payload["properties"].append({ "property": "lastname", "value": corporate_officier["lastName"] })
           payload["properties"].append({ "property": "date_anniversaire", "value": format_date(corporate_officier["birth_date"]
) })
           payload["properties"].append({ "property": "jobtitle", "value": corporate_officier["role"] })
   return payload
def get_new_results(last_processed_registration_number):
   log("Fetching last companies...")
   registration_numbers = []
   companies = {}
   page = 1
   stop = False
   while not stop:
       res = requests.get(get_companies(page))
       log("page {}".format(page))
       if res.status_code == 200:
           result = json.loads(res.text)
           if len(result["result"]) > 0:
               for company in result["result"]:
                   if company["registration_number"] == last_processed_registration_number:
                       stop = True
                       break
                   else:
                       registration_numbers.append(company["registration_number"])
                       companies[company["registration_number"]] = { "name": company["name"] }
               if not stop:
                   page += 1
           else:
               stop = True
       else:
           stop = True
   registration_numbers.reverse()
   return [registration_numbers, companies]
def create_new_contacts(registration_numbers, companies):
   log("Creating new contacts from companies...")
   for registration_number in registration_numbers:
       # company = companies[registration_number]
       # res = requests.get(search_hs_contact(company["name"]))
       # if res.status_code == 200:
           # result = json.loads(res.text)
           # if result["total"] == 0:
       payload = compute_contact_properties(registration_number)
       log(payload, "debug")
       res = requests.post("https://api.hubapi.com/contacts/v1/contact/?hapikey={}".format(HUBSPOT_API_KEY), data=json.dumps(pay
load))
       if res.status_code == 200:
           result = json.loads(res.text)
           log("New contact created for company with registration number {}. vid is {}".format(registration_number, result["vid"
]))
           with open(LAST_PROCESSED_REGISTRATION_NUMBER_FILENAME, 'w') as f:
               f.write(registration_number)
       elif res.status_code == 409:
           log("Conflict with existing hubspot contact for company with registration number {}".format(registration_number), "wa
rning")
       elif res.status_code == 400:
           log(res.text, "warning")
           log("Invalid contact creation request for company with registration number {}".format(registration_number), "warning"
)
       else:
           log("Error encountered while creating contact for company with registration number {}".format(registration_number), "
warning")
           # else:
           #     suspicious_vids = list(map(lambda contact: str(contact["vid"]), result["contacts"]))
           #     log("Company with registration number {} might already exists in hubspot. Look for records : {}".format(registr
ation_number, ", ".join(suspicious_vids)), "warning")
if __name__== "__main__":
   logging.basicConfig(filename='./hubspot_integration/ceres_cron/populate_company.log', filemode='w', level=logging.DEBUG)
   with open(LAST_PROCESSED_REGISTRATION_NUMBER_FILENAME) as f:
       last_processed_registration_number = f.readline().rstrip()
   if last_processed_registration_number:
       registration_numbers, companies = get_new_results(last_processed_registration_number)
       log("Found {} new companies...".format(len(registration_numbers)), "info")
       if len(registration_numbers) > 0:
           create_new_contacts(registration_numbers, companies)
   else:
       log("Unknown last processed registration number", "warning")
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149

Voici la tâche Cron qui permet de déclencher le script à un intervalle régulier :

from crontab import CronTab

user = None
APE_CODE = None
SOCIETE_INFO_API_KEY = None
HUBSPOT_API_KEY = None
hour = None
minute = None

while not user:
   user = input('Enter user : ')
while not APE_CODE:
   APE_CODE = input('Enter APE_CODE : ')

while not SOCIETE_INFO_API_KEY:
   SOCIETE_INFO_API_KEY = input('Enter SOCIETE_INFO_API_KEY : ')
while not HUBSPOT_API_KEY:
   HUBSPOT_API_KEY = input('Enter HUBSPOT_API_KEY : ')
while not hour:
   hour = input('Hour : ')
while not minute:
   minute = input('Minute : ')
cron = CronTab(user=user)
cron.remove_all(comment='populate_company')
job = cron.new(command="APE_CODE={} SOCIETE_INFO_API_KEY={} HUBSPOT_API_KEY={} python3 /home/{}/hubspot_integration
/ceres_cron/populate_company_contact.py".format(APE_CODE, SOCIETE_INFO_API_KEY, HUBSPOT_API_KEY, user), user=user,
comment='populate_company')
job.hour.on(hour)
job.minute.on(minute)
job.enable()
if job.is_enabled():
   print("Job is enabled")
else:
   print("Job is disabled")
if job.is_valid():
   print("Job is valid")
else:
   print("Job is invalid")
cron.write()
if cron.render():
   print("cron line : {}".format(cron.render()))
schedule = job.schedule()
datetime = schedule.get_next()
print("next execution : {}".format(datetime))
1234567891011121314151617181920212223242526272829303132333435363738394041424344

Une fois déployé sur Google Cloud, le script ajoutera à votre Hubspot toutes les boîtes correspondantes au code APE précisé.

Vous pouvez donc par exemple :

  • Envoyer un mail automatiquement à une nouvelle société ;
  • Ajouter une tâche Hubspot d'appel ;
  • Envoyer automatiquement une pub par la poste ;
  • Envoyer un SMS ;
  • Toutes ces options.

Si vous avez besoin d'un connecteur Societeinfo=>Hubspot ou si vous avez des questions, faites-moi signe !

vous souhaitez passer au niveau SUPÉRIEUR

Démarrez maintenant