Skip to content

Commit

Permalink
Merge pull request #33 from guewen/7.0-export-dependency+lock
Browse files Browse the repository at this point in the history
Add mechanisms for the export of records
  • Loading branch information
sebastienbeau committed Oct 6, 2014
2 parents e0a6d3a + 510fe01 commit 132a67c
Showing 1 changed file with 176 additions and 1 deletion.
177 changes: 176 additions & 1 deletion magentoerpconnect/unit/export_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@
##############################################################################

import logging

from contextlib import contextmanager
from datetime import datetime

import psycopg2

from openerp import SUPERUSER_ID
from openerp.tools.translate import _
from openerp.tools import DEFAULT_SERVER_DATETIME_FORMAT
from openerp.addons.connector.queue.job import job, related_action
from openerp.addons.connector.unit.synchronizer import ExportSynchronizer
from openerp.addons.connector.exception import IDMissingInBackend
from openerp.addons.connector.exception import (IDMissingInBackend,
RetryableJobError)
from .import_synchronizer import import_record
from ..connector import get_environment
from ..related_action import unwrap_binding
Expand Down Expand Up @@ -117,6 +124,11 @@ def run(self, binding_id, *args, **kwargs):
result = self._run(*args, **kwargs)

self.binder.bind(self.magento_id, self.binding_id)
# Commit so we keep the external ID when there are several
# exports (due to dependencies) and one of them fails.
# The commit will also release the lock acquired on the binding
# record
self.session.commit()
return result

def _run(self):
Expand All @@ -135,10 +147,169 @@ def __init__(self, environment):
super(MagentoExporter, self).__init__(environment)
self.binding_record = None

def _lock(self):
""" Lock the binding record.
Lock the binding record so we are sure that only one export
job is running for this record if concurrent jobs have to export the
same record.
When concurrent jobs try to export the same record, the first one
will lock and proceed, the others will fail to lock and will be
retried later.
This behavior works also when the export becomes multilevel
with :meth:`_export_dependencies`. Each level will set its own lock
on the binding record it has to export.
"""
sql = ("SELECT id FROM %s WHERE ID = %%s FOR UPDATE NOWAIT" %
self.model._table)
try:
self.session.cr.execute(sql, (self.binding_id, ),
log_exceptions=False)
except psycopg2.OperationalError:
_logger.info('A concurrent job is already exporting the same '
'record (%s with id %s). Job delayed later.',
self.model._name, self.binding_id)
raise RetryableJobError(
'A concurrent job is already exporting the same record '
'(%s with id %s). The job will be retried later.' %
(self.model._name, self.binding_id))

def _has_to_skip(self):
""" Return True if the export can be skipped """
return False

@contextmanager
def _retry_unique_violation(self):
""" Context manager: catch Unique constraint error and retry the
job later.
When we execute several jobs workers concurrently, it happens
that 2 jobs are creating the same record at the same time (binding
record created by :meth:`_export_dependency`), resulting in:
IntegrityError: duplicate key value violates unique
constraint "magento_product_product_openerp_uniq"
DETAIL: Key (backend_id, openerp_id)=(1, 4851) already exists.
In that case, we'll retry the import just later.
.. warning:: The unique constraint must be created on the
binding record to prevent 2 bindings to be created
for the same Magento record.
"""
try:
yield
except psycopg2.IntegrityError as err:
if err.pgcode == psycopg2.errorcodes.UNIQUE_VIOLATION:
raise RetryableJobError(
'A database error caused the failure of the job:\n'
'%s\n\n'
'Likely due to 2 concurrent jobs wanting to create '
'the same record. The job will be retried later.' % err)
else:
raise

def _export_dependency(self, relation, binding_model, exporter_class=None,
binding_field='magento_bind_ids',
binding_extra_vals=None):
"""
Export a dependency. The exporter class is a subclass of
``MagentoExporter``. If a more precise class need to be defined,
it can be passed to the ``exporter_class`` keyword argument.
.. warning:: a commit is done at the end of the export of each
dependency. The reason for that is that we pushed a record
on the backend and we absolutely have to keep its ID.
So you *must* take care not to modify the OpenERP
database during an export, excepted when writing
back the external ID or eventually to store
external data that we have to keep on this side.
You should call this method only at the beginning
of the exporter synchronization,
in :meth:`~._export_dependencies`.
:param relation: record to export if not already exported
:type relation: :py:class:`openerp.osv.orm.browse_record`
:param binding_model: name of the binding model for the relation
:type binding_model: str | unicode
:param exporter_cls: :py:class:`openerp.addons.connector\
.connector.ConnectorUnit`
class or parent class to use for the export.
By default: MagentoExporter
:type exporter_cls: :py:class:`openerp.addons.connector\
.connector.MetaConnectorUnit`
:param binding_field: name of the one2many field on a normal
record that points to the binding record
(default: magento_bind_ids).
It is used only when the relation is not
a binding but is a normal record.
:type binding_field: str | unicode
:binding_extra_vals: In case we want to create a new binding
pass extra values for this binding
:type binding_extra_vals: dict
"""
if not relation:
return
if exporter_class is None:
exporter_class = MagentoExporter
rel_binder = self.get_binder_for_model(binding_model)
# wrap is typically True if the relation is for instance a
# 'product.product' record but the binding model is
# 'magento.product.product'
wrap = relation._model._name != binding_model

if wrap and hasattr(relation, binding_field):
domain = [('openerp_id', '=', relation.id),
('backend_id', '=', self.backend_record.id)]
binding_ids = self.session.search(binding_model, domain)
if binding_ids:
assert len(binding_ids) == 1, (
'only 1 binding for a backend is '
'supported in _export_dependency')
binding_id = binding_ids[0]
# we are working with a unwrapped record (e.g.
# product.category) and the binding does not exist yet.
# Example: I created a product.product and its binding
# magento.product.product and we are exporting it, but we need to
# create the binding for the product.category on which it
# depends.
else:
ctx = {'connector_no_export': True}
with self.session.change_context(ctx):
with self.session.change_user(SUPERUSER_ID):
bind_values = {'backend_id': self.backend_record.id,
'openerp_id': relation.id}
if binding_extra_vals:
bind_values.update(binding_extra_vals)
# If 2 jobs create it at the same time, retry
# one later. A unique constraint (backend_id,
# openerp_id) should exist on the binding model
with self._retry_unique_violation():
binding_id = self.session.create(binding_model,
bind_values)
# Eager commit to avoid having 2 jobs
# exporting at the same time. The constraint
# will pop if an other job already created
# the same binding. It will be caught and
# raise a RetryableJobError.
self.session.commit()
else:
# If magento_bind_ids does not exist we are typically in a
# "direct" binding (the binding record is the same record).
# If wrap is True, relation is already a binding record.
binding_id = relation.id

if not rel_binder.to_backend(binding_id):
exporter = self.get_connector_unit_for_model(exporter_class,
binding_model)
exporter.run(binding_id)

def _export_dependencies(self):
""" Export the dependencies for the record"""
return
Expand Down Expand Up @@ -195,6 +366,10 @@ def _run(self, fields=None):
# export the missing linked resources
self._export_dependencies()

# prevent other jobs to export the same record
# will be released on commit (or rollback)
self._lock()

map_record = self._map_data()

if self.magento_id:
Expand Down

0 comments on commit 132a67c

Please # to comment.