-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
hubverse-transform: supply a parquet schema #14
Comments
I thought
|
@annakrystalli I know you've wrestled with the mis-matched schema implication of Curious to hear your thoughts: do you think it's reasonable to force those to column types to str when we're transforming the model-output files as part of syncing to the cloud? Location seems like a good candidate for that type of brute force operation--it doesn't really make sense to have FIPS codes coming in as integers. Am less sure about output_type_id (FWIW, when creating the DuckDB file, I used the |
can we generate a schema based on contents of Edit: Lucie pointed me to this function. |
The location column is coming out as character when you do collect so seems a bit strange that the filter wasn't working. |
I am curious about the solution, I encounter the same issue in the past and the only solution I found was to fix the input file. I first created 2 parquet file in a model-output folder that I created, with one file with a hub_path <- "./example-complex-forecast-hub/"
hub_con <- hubData::connect_hub(hub_path)
test <- hubData::create_model_out_submit_tmpl(hub_con, round_id = "2023-05-27")
a <- dplyr::filter(test, output_type == "quantile", location != "US") %>%
dplyr::mutate(value = sample(seq(0,100, by = 0.1), nrow(.), replace = T),
location = as.numeric(location))
b <- test %>%
dplyr::mutate(value = sample(seq(0,100, by = 0.1), nrow(.), replace = T))
arrow::write_parquet(a, "model-output/teama-model/2023-10-14-teama-model.parquet")
arrow::write_parquet(b, "model-output/teamb-model/2023-10-14-teamb-model.parquet") And when I try to read the files with one of the hubData::connect_model_output("model-output/", file_format = "parquet") %>%
dplyr::filter(output_type == "quantile", location == "US") %>%
hubData::collect_hub() I got the error: Error in `arrow::open_dataset()`:
! Type error: Unable to merge: Field location has incompatible types: double vs string even if I remove the filter: hubData::connect_model_output("model-output/", file_format = "parquet") %>%
#dplyr::filter(output_type == "quantile", location != "US") %>%
hubData::collect_hub()
Error in `arrow::open_dataset()`:
! Type error: Unable to merge: Field location has incompatible types: double vs string
Run `rlang::last_trace()` to see where the error occurred. And when I add the config_json <- hubUtils::read_config(hub_path, "tasks")
or_schema <- hubData::create_hub_schema(config_json)
hubData::connect_model_output("model-output/", file_format = "parquet",
schema = or_schema) %>%
dplyr::filter(output_type == "quantile", location == "US") %>%
hubData::collect_hub()
Error in `dplyr::collect()`:
! NotImplemented: Function 'equal' has no kernel matching input types (string, double)
However if I comment the > hubData::connect_model_output("model-output/", file_format = "parquet",
schema = or_schema) %>%
# dplyr::filter(output_type == "quantile", location == "US") %>%
hubData::collect_hub()
# A tibble: 1,028,224 × 9
model_id reference_date target horizon location target_end_date output_type output_type_id value
* <chr> <date> <chr> <int> <chr> <date> <chr> <chr> <dbl>
1 teama-mo… 2023-05-27 wk in… 0 1 2022-10-22 quantile 0.01 88.6
2 teama-mo… 2023-05-27 wk in… 1 1 2022-10-22 quantile 0.01 74.6
3 teama-mo… 2023-05-27 wk in… 2 1 2022-10-22 quantile 0.01 44.9
4 teama-mo… 2023-05-27 wk in… 3 1 2022-10-22 quantile 0.01 17.6
5 teama-mo… 2023-05-27 wk in… 0 2 2022-10-22 quantile 0.01 57.1
6 teama-mo… 2023-05-27 wk in… 1 2 2022-10-22 quantile 0.01 69.8
7 teama-mo… 2023-05-27 wk in… 2 2 2022-10-22 quantile 0.01 69.4
8 teama-mo… 2023-05-27 wk in… 3 2 2022-10-22 quantile 0.01 96.6
9 teama-mo… 2023-05-27 wk in… 0 4 2022-10-22 quantile 0.01 17.4
10 teama-mo… 2023-05-27 wk in… 1 4 2022-10-22 quantile 0.01 63.8
# ℹ 1,028,214 more rows
# ℹ Use `print(n = ...)` to see more rows |
When you add the schema option to the In the first case (when you are not overriding In the second case, when you override In the absence of the You can even force the error that you see in the second scenario (with
|
Just to follow on from above, you can cast to character. i.e. ExecPlan with 4 nodes:
3:SinkNode{}
2:ProjectNode{projection=[reference_date, target, horizon, location, target_end_date, output_type, output_type_id, value, model_id]}
1:FilterNode{filter=(cast(location, {to_type=string, allow_int_overflow=false, allow_time_truncate=false, allow_time_overflow=false, allow_decimal_truncate=false, allow_float_truncate=false, allow_invalid_utf8=false}) == "US")}
0:SourceNode{} Or, equivalently ScannerBuilder$create(x)$Filter(arrow:::cast(Expression$field_ref("location"), string()) == "US")$Finish()$ToTable() |
library(dplyr) There’s a lot going on here. I’ve done some experiments which can be All the code can be found in Differences in reading csvs between arrow and readrThe first issue appears to be a result of how Below I create two csvs, both with dir.create("test", showWarnings = FALSE)
arrow::read_csv_arrow(
"model-output/HUBuni-simexamp/2021-03-07-HUBuni-simexamp.csv"
) %>%
filter(location != "US") %>%
slice(1:1000) %>%
arrow::write_csv_arrow(file.path("test", "2021-03-07-HUBuni-simexamp-arrow.csv"))
readr::read_csv("model-output/HUBuni-simexamp/2021-03-07-HUBuni-simexamp.csv") %>%
filter(location != "US") %>%
slice(1:1000) %>%
readr::write_csv(file.path("test", "2021-03-07-HUBuni-simexamp-readr.csv"))
#> Rows: 838656 Columns: 8
#> ── Column specification ────────────────────────────────────────────────────────
#> Delimiter: ","
#> chr (4): scenario_id, location, target, output_type
#> dbl (3): horizon, output_type_id, value
#> date (1): origin_date
#>
#> ℹ Use `spec()` to retrieve the full column specification for this data.
#> ℹ Specify the column types or set `show_col_types = FALSE` to quiet this message. CSVs can often be difficult to work with because they don’t explicitly Looking at the structure of the two files created:
|
Thank you all for the detailed answer! I agree with Anna, that " best way to handle this is to make sure that the schema is preserved when writing out the parquet files and test for it as well." . That what I ended up doing too. I am also curious about the " [..] purpose of schemas for .parquet files if they are not applied in the same way as for csv files. I think this is a question for the arrow team.". |
Thanks for all of the great digging and background info--this is great! It's worth noting that this class of error is occurring on validated files. So I agree that we need to get right with schemas to the extent possible, since different downstream tools are able to infer mixed schemas with varying results. In the meantime, I updated the R snippet on the gist to this incantation that works with the problematic filter. This seems related to the prior note from @lmullany? [not saying this is the ideal permanent solution, just wanted to give people a way to work with the files in their current state] hub_path_cloud <- s3_bucket('cdcepi-flusight-forecast-hub/')
data_cloud <- connect_hub(hub_path_cloud, file_format="parquet") %>% collect()
filtered <- data_cloud %>%
filter(output_type == "quantile", location == "US") Doing the |
One thing I'm struggling to understand is whether or not we'd get this same class of error for a non-cloud hub that receives parquet-based submissions. If individual teams are submitting parquet files, wouldn't we have the same issue with potentially mis-matched parquet schemas? Does anyone know of a hubverse hub that is accepting parquet model-outputs? |
Yes we will have the same issue. I encounter the same issue in SMH. |
Are they validated files as submitted or are they files that have been converted to parquet via arrow? If they are files that actually passed validation on the way in then we need to flag those and understand what's going on a bit better.
This will work but it means loading the full dataset into memory before filtering. That's really the opposite of what the workflow is designed to do so I would fix the data instead and change the snippet back when possible. |
They are files that were validated when submitted as .csvs and then then converted to parquet via arrow (hence the question about other hubs that receive parquet submissions)
For sure--but while we're still sorting through how to fix the data, it's worth presenting people with a short-term option for interacting with the FluSight data (which aren't large) using R. I'm still looking to verify whether or not we're going to have the mis-matched schema issue on parquet files submitted by modelers (i.e., no cloud transforms). @LucieContamin has weighed in; my next step is to refer to Nick's list of hubverse hubs and see if there's anything on the list that uses parquet. |
I think this is related to the first issue I highlighted at the beginning of my comment regarding |
I should mention that if a parquet file schema does not conform to the hub schema on the way in it does not pass validation. So once validated a parquet file is actually more robust than a CSV file 😄! |
I should also mention, to clarify, that the SMH file issue was on a file that was NOT validated with HubValidation, and will probably fail the validation. |
@LucieContamin Thanks for the clarification--that's helpful! I hope it's clear that I'm not pushing back on the idea of making the cloud transforms schema aware or advocating for less-optimal methods for data access! My goal is to fully understand the problem space and apply the most straightforward fix at the correct point in the process, so appreciate y'alls patience as I poke around. |
I just wanted to note that, along with the data type issue, the transformation has also changed some library(dplyr)
library(hubData)
hub_path_cloud <- s3_bucket('cdcepi-flusight-forecast-hub/')
data_cloud <- connect_hub(hub_path_cloud, file_format="parquet") %>% collect()
data_cloud$location |> unique()
#> [1] "6" "01" "02" "04" "05" "06" "08" "09" "10" "11" "12" "13" "15" "16" "17"
#> [16] "18" "19" "20" "21" "22" "23" "24" "25" "26" "27" "28" "29" "30" "31" "32"
#> [31] "33" "34" "35" "36" "37" "38" "39" "40" "41" "42" "44" "45" "46" "47" "48"
#> [46] "49" "50" "51" "53" "54" "55" "56" "72" "US" "1" "2" "5" "9" "4" "8" Given this, I think the best place to supply the schema is when reading in GitHub files instead of writing out cloud files as the latter will likely still result in dropping if the leading zero. |
Yep, that makes sense. I was looking at the FIPS codes also and have a somewhat related question about the validation process. How do we want/expect modelers to respond to validation warnings vs errors? When running the validation process against a parquet file without all 50 states and no string value (like "US"), the validation will fail because the 01-09 FIPS codes get their leading zero dropped and don't validate against the list of valid locations in
But if you then remove all model-output entries for locations 01 - 09, the validation returns the "data types do not match hub schema" warning but not the error (because two-digit FIPS code do match the list of valid locations). It's a contrived edge case that probably isn't worth spending a lot of time on...more just an operational question. We don't expect downstream data access tools to work smoothly if hub users ignore validation errors. Do we have the same stance for warnings? [none of this matters for people using the cloud data, if we're fixing this up in the transform--it would only impact people working with local model-output data...I was able to replicate the |
FWIW, I moved this issue into the board's "Up Next" column because it makes sense to have the transform be schema-aware and act accordingly. It's also a good case for moving people to the cloud when we can, because the transform function is an opportunity to provide a degree a cleanup if teams ignore errors and warnings when submitting model-output data. Appreciate the great convo here! |
This is expected as when matching values to expected valid values, we transform everything to character as trying to check numbers for equality is generally dodgy. So we check that everything transformed as character matches combinations of values in the expanded grid of valid values. You could argue the results of the checks are an accurate assessment of the issues because coercing from character to integer and back to character does not lead to change of the actual value when there are no leading zeros but does in the cases where there are. Hence fixing the data type in files without 1-9 values will fix any issues whereas files with 1-9 values will also need leading zeros adding. |
I'd be interested to see an example of this as my understanding is all FIPS codes are two-character. |
TL;DR: My example is a very contrived edge case, and I agree that our time is best spent making the cloud transform schema-aware. If we want to dive into this further, probably worth switching to a synchronous convo. To reproduce:
hubValidations::validate_submission(
hub_path="~/code/rsv-forecast-hub/",
file_path="parquet-test_data/2024-05-05-parquet-test_data.parquet")
> rsv_path <- "~/code/rsv-forecast-hub"
> rsv_con <- connect_hub(rsv_path)
> rsv_con %>% filter(location=="US") %>% collect()
Error in `compute.arrow_dplyr_query()`:
! NotImplemented: Function 'equal' has no kernel matching input types (string, int64) Again, this is a very contrived example, and I don't think it's worth spending a ton of time on. Essentially, in this scenario, location values are interpreted as integers when the modeler writes out the parquet file. However, two digit integers match the list of accepted locations, so the validation process throws the warning but no error. If the modeler had included a location value of |
I should mention that the |
During the recent Hubverse retreat, @annakrystalli @LucieContamin and I determined that the best way forward here is to port the R-based Then the transform package can enforce a schema when reading incoming model-output files. As a first step, this PR refactors the transform function to ensure that it will have a more straightforward way to access a hub's |
Am moving this out of "in progress" state in favor of the temporary, forced schema fix outlined in #24 Inferring a hub's correct schema from the config files is still the plan, but that work has temporarily taken a back seat to other cloud-based concerns. |
Update 2025-01-15
This is an old issue, and hubverse-transform currently works around the schema issue by forcing any fields named
location
andoutput_type_id
to a string data type.My .02 is that we should put this on hold until the Hubverse has a Python-based function to determine a hub's schema. Then we can use it here.
Currently, hubverse-transform infers the parquet schema to apply when converted incoming model-output data to parquet. Because each file arrives and is transformed as a single unit, pyarrow has a limited amount of information to use when determining the parquet schema.
This resulted in some annoying behavior when working with the CDC's FluSight data. For example:
The current state is kind of workable but not ideal:
hubData
can handle itPer the conversation below, we decided to update hubverse-transform to detect a hub's schema and apply it when reading model-output files that are synced to the cloud.
Rather than submitting a large PR to make the transform package "schema aware," we'll do it in series of smaller PRs, merging them into a dev branch until the feature is complete.
ModelOutputHandler
so it is aware of a hub's high-level location (not just the location of the model-output data)ModelOutputHandler
class to use the Arrow schema when reading a model-output fileThe text was updated successfully, but these errors were encountered: