-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
Copy pathcsv_file_parser.hpp
142 lines (119 loc) · 5.62 KB
/
csv_file_parser.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#ifndef OSRM_UPDATER_CSV_FILE_PARSER_HPP
#define OSRM_UPDATER_CSV_FILE_PARSER_HPP
#include "updater/source.hpp"
#include "util/exception.hpp"
#include "util/exception_utils.hpp"
#include "util/log.hpp"
#include <tbb/parallel_for.h>
#include <tbb/parallel_sort.h>
#include <tbb/spin_mutex.h>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/filesystem.hpp>
#include <boost/iostreams/device/mapped_file.hpp>
#include <boost/spirit/include/phoenix.hpp>
#include <boost/spirit/include/qi.hpp>
#include <vector>
namespace osrm
{
namespace updater
{
// Functor to parse a list of CSV files using "key,value,comment" grammar.
// Key and Value structures must be a model of Random Access Sequence.
// Also the Value structure must have source member that will be filled
// with the corresponding file index in the CSV filenames vector.
template <typename Key, typename Value> struct CSVFilesParser
{
using Iterator = boost::iostreams::mapped_file_source::iterator;
using KeyRule = boost::spirit::qi::rule<Iterator, Key()>;
using ValueRule = boost::spirit::qi::rule<Iterator, Value()>;
CSVFilesParser(std::size_t start_index, const KeyRule &key_rule, const ValueRule &value_rule)
: start_index(start_index), key_rule(key_rule), value_rule(value_rule)
{
}
// Operator returns a lambda function that maps input Key to boost::optional<Value>.
auto operator()(const std::vector<std::string> &csv_filenames) const
{
try
{
tbb::spin_mutex mutex;
std::vector<std::pair<Key, Value>> lookup;
tbb::parallel_for(std::size_t{0},
csv_filenames.size(),
[&](const std::size_t idx) {
auto local = ParseCSVFile(csv_filenames[idx], start_index + idx);
{ // Merge local CSV results into a flat global vector
tbb::spin_mutex::scoped_lock _{mutex};
lookup.insert(end(lookup),
std::make_move_iterator(begin(local)),
std::make_move_iterator(end(local)));
}
});
// With flattened map-ish view of all the files, make a stable sort on key and source
// and unique them on key to keep only the value with the largest file index
// and the largest line number in a file.
// The operands order is swapped to make descending ordering on (key, source)
tbb::parallel_sort(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) {
return std::tie(rhs.first, rhs.second.source) <
std::tie(lhs.first, lhs.second.source);
});
// Unique only on key to take the source precedence into account and remove duplicates.
const auto it =
std::unique(begin(lookup), end(lookup), [](const auto &lhs, const auto &rhs) {
return lhs.first == rhs.first;
});
lookup.erase(it, end(lookup));
util::Log() << "In total loaded " << csv_filenames.size() << " file(s) with a total of "
<< lookup.size() << " unique values";
return LookupTable<Key, Value>{lookup};
}
catch (const tbb::captured_exception &e)
{
throw util::exception(e.what() + SOURCE_REF);
}
}
private:
// Parse a single CSV file and return result as a vector<Key, Value>
auto ParseCSVFile(const std::string &filename, std::size_t file_id) const
{
namespace qi = boost::spirit::qi;
std::vector<std::pair<Key, Value>> result;
try
{
if (boost::filesystem::file_size(filename) == 0)
return result;
boost::iostreams::mapped_file_source mmap(filename);
auto first = mmap.begin(), last = mmap.end();
BOOST_ASSERT(file_id <= std::numeric_limits<std::uint8_t>::max());
ValueRule value_source =
value_rule[qi::_val = qi::_1, bind(&Value::source, qi::_val) = file_id];
qi::rule<Iterator, std::pair<Key, Value>()> csv_line =
(key_rule >> ',' >> value_source) >> -(',' >> *(qi::char_ - qi::eol));
const auto ok = qi::parse(first, last, -(csv_line % qi::eol) >> *qi::eol, result);
if (!ok || first != last)
{
auto begin_of_line = first - 1;
while (begin_of_line >= mmap.begin() && *begin_of_line != '\n')
--begin_of_line;
auto line_number = std::count(mmap.begin(), first, '\n') + 1;
const auto message = boost::format("CSV file %1% malformed on line %2%:\n %3%\n") %
filename % std::to_string(line_number) %
std::string(begin_of_line + 1, std::find(first, last, '\n'));
throw util::exception(message.str() + SOURCE_REF);
}
util::Log() << "Loaded " << filename << " with " << result.size() << "values";
return std::move(result);
}
catch (const boost::exception &e)
{
const auto message = boost::format("exception in loading %1%:\n %2%") % filename %
boost::diagnostic_information(e);
throw util::exception(message.str() + SOURCE_REF);
}
}
const std::size_t start_index;
const KeyRule key_rule;
const ValueRule value_rule;
};
}
}
#endif