Skip to content

Commit 22bc045

Browse files
author
Sean Sullivan
committed
Update geo_shapes to have a more accurate min/max used for colors and change to sub aggregations instead of a query for each bucket
1 parent bc771c9 commit 22bc045

File tree

1 file changed

+36
-11
lines changed

1 file changed

+36
-11
lines changed

elastic_datashader/tilegen.py

+36-11
Original file line numberDiff line numberDiff line change
@@ -537,10 +537,11 @@ def get_span_upper_bound(span_range: str, estimated_points_per_tile: Optional[in
537537
return math.log(1e9)
538538

539539
if span_range == "ultrawide":
540+
return math.log(1e30)
540541
return math.log(1e308)
541542

542543
assert estimated_points_per_tile is not None
543-
return math.log(max(estimated_points_per_tile * 2, 2))
544+
return math.log(max(math.pow(estimated_points_per_tile,2), 2))
544545

545546
def get_span_none(span_upper_bound: Optional[float]) -> Optional[List[float]]:
546547
if span_upper_bound is None:
@@ -1116,6 +1117,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11161117
composite_agg_size = int(max_bins / inner_agg_size) - 1
11171118
field_type = get_field_type(config.elastic_hosts, headers, params,geopoint_field, idx)
11181119
partial_data = False # TODO can we get partial data?
1120+
span = None
11191121
if field_type == "geo_point":
11201122
resp = ScanAggs(
11211123
tile_s,
@@ -1138,10 +1140,7 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11381140
)
11391141
estimated_points_per_tile = get_estimated_points_per_tile(span_range, global_bounds, z, global_doc_cnt)
11401142
elif field_type == "geo_shape":
1141-
searches = []
1142-
estimated_points_per_tile = 10000
11431143
zoom = 0
1144-
#span_range = "ultrawide"
11451144
if resolution == "coarse":
11461145
zoom = 5
11471146
spread = 7
@@ -1151,24 +1150,41 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_
11511150
elif resolution == "finest":
11521151
zoom = 7
11531152
spread = 1
1153+
geotile_precision = current_zoom+zoom
1154+
searches = []
1155+
if category_field:
1156+
max_value_s = copy.copy(base_s)
1157+
bucket = max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
1158+
bucket.metric("sum","sum",field=category_field,missing=0)
1159+
resp = max_value_s.execute()
1160+
estimated_points_per_tile = resp.aggregations.comp.buckets[0].sum['value']
1161+
span = [0,estimated_points_per_tile]
1162+
else:
1163+
max_value_s = copy.copy(base_s)
1164+
max_value_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=1)
1165+
resp = max_value_s.execute()
1166+
estimated_points_per_tile = resp.aggregations.comp.buckets[0].doc_count
1167+
span = [0,estimated_points_per_tile]
1168+
logger.info("EST Points: %s",estimated_points_per_tile)
1169+
11541170
searches = []
11551171
composite_agg_size = 65536#max agg bucket size
1156-
geotile_precision = current_zoom+zoom
11571172
subtile_bb_dict = create_bounding_box_for_tile(x, y, z)
11581173
subtile_s = copy.copy(base_s)
11591174
subtile_s = subtile_s[0:0]
11601175
subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: subtile_bb_dict})
1161-
subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=composite_agg_size,bounds=subtile_bb_dict)
1176+
bucket = subtile_s.aggs.bucket("comp", "geotile_grid", field=geopoint_field,precision=geotile_precision,size=composite_agg_size,bounds=subtile_bb_dict)
1177+
if category_field:
1178+
bucket.metric("sum","sum",field=category_field,missing=0)
11621179
searches.append(subtile_s)
1163-
#logger.info(inner_aggs)
11641180
cmap = "bmy" #todo have front end pass the cmap for none categorical
11651181
def calc_aggregation(bucket,search):
11661182
#get bounds from bucket.key
11671183
#do search for sum of values on category_field
11681184
z, x, y = [ int(x) for x in bucket.key.split("/") ]
11691185
bucket_bb_dict = create_bounding_box_for_tile(x, y, z)
11701186
subtile_s = copy.copy(base_s)
1171-
subtile_s.aggs.bucket("sum","median_absolute_deviation",field=category_field,missing=0)
1187+
subtile_s.aggs.bucket("sum","avg",field=category_field,missing=0)
11721188
subtile_s = subtile_s[0:0]
11731189
subtile_s = subtile_s.filter("geo_bounding_box", **{geopoint_field: bucket_bb_dict})
11741190
response = subtile_s.execute()
@@ -1180,9 +1196,16 @@ def calc_aggregation(bucket,search):
11801196
search.total_failed += response._shards.failed # pylint: disable=W0212
11811197
bucket.doc_count = response.aggregations.sum['value'] #replace with sum of category_field
11821198
return bucket
1199+
1200+
def remap_bucket(bucket,search):
1201+
#get bounds from bucket.key
1202+
#remap sub aggregation for sum of values to the doc count
1203+
bucket.doc_count = bucket.sum['value']
1204+
return bucket
11831205
bucket_callback = None
11841206
if category_field:
1185-
bucket_callback = calc_aggregation
1207+
#bucket_callback = calc_aggregation #don't run a sub query. sub aggregation worked But we might want to leave this in for cross index searches
1208+
bucket_callback = remap_bucket
11861209
resp = Scan(searches,timeout=config.query_timeout_seconds,bucket_callback=bucket_callback)
11871210
df = pd.DataFrame(
11881211
convert_composite(
@@ -1291,8 +1314,10 @@ def calc_aggregation(bucket,search):
12911314
# bins that have 1000 or more items will be colored full
12921315
# scale
12931316
span_upper_bound = get_span_upper_bound(span_range, estimated_points_per_tile)
1294-
span = get_span_zero(span_upper_bound)
1295-
logger.debug("Span %s %s", span, span_range)
1317+
if span is None:
1318+
span = get_span_zero(span_upper_bound)
1319+
logger.info("Span %s %s", span, span_range)
1320+
logger.info("aggs min:%s max:%s",float(agg.min()),float(agg.max()))
12961321
img = tf.shade(agg, cmap=cc.palette[cmap], how="log", span=span)
12971322

12981323
###############################################################

0 commit comments

Comments
 (0)