-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathsupport.py
91 lines (79 loc) · 3.26 KB
/
support.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from __future__ import absolute_import
import types
import unittest
import uuid
from tempfile import mkdtemp
from ftmstore import get_dataset
from random import randrange
from servicelayer.archive import init_archive
from servicelayer.taskqueue import Task
from servicelayer.tags import Tags
from servicelayer.archive.util import ensure_path
from servicelayer import settings as service_settings
from ftmstore import settings as ftmstore_settings
from ingestors import settings as ingestors_settings
from ingestors.manager import Manager
def emit_entity(self, entity, fragment=None):
self.entities.append(entity)
self.writer.put(entity.to_dict(), fragment=fragment)
def queue_entity(self, entity):
self.ingest_entity(entity)
class TestCase(unittest.TestCase):
def setUp(self):
# Force tests to use fake configuration
ingestors_settings.TESTING = True
service_settings.REDIS_URL = None
service_settings.ARCHIVE_TYPE = "file"
service_settings.ARCHIVE_PATH = mkdtemp()
service_settings.QUEUE_ALEPH = "ingesttest-aleph-queue"
service_settings.QUEUE_INDEX = "ingesttest-index-queue"
service_settings.QUEUE_INGEST = "ingesttest-ingest-queue"
ftmstore_settings.DATABASE_URI = "sqlite://"
dataset = get_dataset("test", origin=ingestors_settings.STAGE_INGEST)
Tags("ingest_cache").delete()
priority = randrange(1, service_settings.RABBITMQ_MAX_PRIORITY + 1)
task = Task(
task_id=uuid.uuid4().hex,
job_id=uuid.uuid4().hex,
collection_id="test",
operation=ingestors_settings.STAGE_INGEST,
delivery_tag="",
context={},
payload={},
priority=priority,
)
self.manager = Manager(dataset, task)
self.manager.entities = []
self.manager.emit_entity = types.MethodType(emit_entity, self.manager)
self.manager.queue_entity = types.MethodType(queue_entity, self.manager)
self.archive = init_archive()
self.manager._archive = self.archive
def fixture(self, fixture_path):
"""Returns a fixture path and a dummy entity"""
# clear out entities
self.manager.entities = []
self.manager.dataset.delete()
cur_path = ensure_path(__file__).parent
cur_path = cur_path.joinpath("fixtures")
path = cur_path.joinpath(fixture_path)
entity = self.manager.make_entity("Document")
if not path.exists():
raise RuntimeError(path)
if path.is_file():
checksum = self.manager.store(path)
entity.make_id(path.name, checksum)
entity.set("contentHash", checksum)
entity.set("fileSize", path.stat().st_size)
entity.set("fileName", path.name)
else:
entity.make_id(fixture_path)
return path, entity
def get_emitted(self, schema=None):
entities = list(self.manager.dataset.iterate())
if schema is not None:
entities = [e for e in entities if e.schema.is_a(schema)]
return entities
def get_emitted_by_id(self, id):
return self.manager.dataset.get(id)
def assertSuccess(self, entity):
self.assertEqual(entity.first("processingStatus"), self.manager.STATUS_SUCCESS)