-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathspherical_harmonics.py
209 lines (190 loc) · 7.03 KB
/
spherical_harmonics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import healpy as hp
import numpy as np
import logging
from astropy import units as u
import pixell.enmap, pixell.curvedsky
from .. import mpi, utils
log = logging.getLogger("pysm3")
def apply_smoothing_and_coord_transform(
input_map,
fwhm=None,
beam_window=None,
rot=None,
lmax=None,
output_nside=None,
output_car_resol=None,
return_healpix=True,
return_car=False,
input_alm=False,
map2alm_lsq_maxiter=None,
map_dist=None,
):
"""Apply smoothing and coordinate rotation to an input map
it applies the `healpy.smoothing` Gaussian smoothing kernel if `map_dist`
is None, otherwise applies distributed smoothing with `libsharp`.
In the distributed case, no rotation is supported.
Parameters
----------
input_map : ndarray
Input map, of shape `(3, npix)`
This is assumed to have no beam at this point, as the
simulated small scale template on which the simulations are based
have no beam.
fwhm : astropy.units.Quantity
Full width at half-maximum, defining the
Gaussian kernels to be applied.
beam_window: array, optional
Custom beam window function (:math:`B_\ell`)
rot: hp.Rotator
Apply a coordinate rotation give a healpy `Rotator`, e.g. if the
inputs are in Galactic, `hp.Rotator(coord=("G", "C"))` rotates
to Equatorial
output_nside : int
HEALPix output map Nside, if None, use the same as the input
lmax : int
lmax for the map2alm step, if None, it is set to 2.5 * nside
if output_nside is equal or higher than nside.
It is set to 1.5 * nside if output_nside is lower than nside
output_car_resol : astropy.Quantity
CAR output map resolution, generally in arcmin
return_healpix : bool
Whether to return the HEALPix map
return_car : bool
Whether to return the CAR map
input_alm : bool
Instead of starting from a map, `input_map` is a set of Alm
map2alm_lsq_maxiter : int
Number of iteration for the least squares map to Alm transform,
setting it to 0 uses the standard map2alm, the default of 10
makes the transform slow if the input map is not band limited,
for example if has point sources or sharp features.
If ell_max is <= 1.5 nside, this setting is ignored
and `map2alm` with pixel weights is used.
Returns
-------
smoothed_map : np.ndarray or tuple of np.ndarray
Array containing the smoothed sky or tuple of HEALPix and CAR maps
"""
if not input_alm:
nside = hp.get_nside(input_map)
if output_nside is None:
output_nside = nside
unit = input_map.unit if hasattr(input_map, "unit") else 1
if lmax is None:
if nside == output_nside:
lmax = int(2.5 * output_nside)
elif output_nside > nside:
lmax = int(2.5 * nside)
elif output_nside < nside:
lmax = int(1.5 * nside)
log.info("Setting lmax to %d", lmax)
output_maps = []
if map_dist is None:
if input_alm:
alm = input_map.copy()
else:
alm = map2alm(input_map, nside, lmax, map2alm_lsq_maxiter)
if fwhm is not None:
assert beam_window is None, "Either FWHM or beam_window"
log.info("Smoothing with fwhm of %s", str(fwhm))
hp.smoothalm(alm, fwhm=fwhm.to_value(u.rad), inplace=True, pol=True)
if beam_window is not None:
assert fwhm is None, "Either FWHM or beam_window"
log.info("Smoothing with a custom isotropic beam")
# smoothalm does not support polarized beam
for i in range(3):
try:
beam_window_i = beam_window[:, i]
log.info("Using polarized beam")
except IndexError:
beam_window_i = beam_window
log.info("Using the same beam for all components")
hp.smoothalm(alm[i], beam_window=beam_window_i, inplace=True)
if rot is not None:
log.info("Rotate Alm")
rot.rotate_alm(alm, inplace=True)
if return_healpix:
log.info("Alm to map HEALPix")
if input_alm:
assert (
output_nside is not None
), "If inputting Alms, specify output_nside"
output_maps.append(
u.Quantity(
hp.alm2map(alm, nside=output_nside, pixwin=False), unit, copy=False
)
)
if return_car:
log.info("Alm to map CAR")
shape, wcs = pixell.enmap.fullsky_geometry(
output_car_resol.to_value(u.radian),
dims=(3,),
variant="fejer1",
)
ainfo = pixell.curvedsky.alm_info(lmax=lmax)
output_maps.append(
pixell.curvedsky.alm2map(
alm, pixell.enmap.empty(shape, wcs), ainfo=ainfo
)
)
else:
assert (rot is None) or (
rot.coordin == rot.coordout
), "No rotation supported in distributed smoothing"
output_maps.append(mpi.mpi_smoothing(input_map, fwhm, map_dist))
assert not return_car, "No CAR output supported in Libsharp smoothing"
return output_maps[0] if len(output_maps) == 1 else tuple(output_maps)
def map2alm(input_map, nside, lmax, map2alm_lsq_maxiter=None):
"""Compute alm from a map using healpy.
Automatically selects the most appropriate method based on
the target lmax
Parameters
----------
input_map : np.ndarray
Input HEALPix map
nside : int
Resolution parameter of the input map
lmax : int
Maximum multipole of the alm
map2alm_lsq_maxiter : int, optional
Maximum number of iterations for map2alm_lsq, by default 10
Returns
-------
alm: np.ndarray
alm array"""
if map2alm_lsq_maxiter is None:
map2alm_lsq_maxiter = 10
nside = hp.get_nside(input_map)
if lmax <= 1.5 * nside:
log.info("Using map2alm with pixel weights")
alm = hp.map2alm(
input_map,
lmax=lmax,
use_pixel_weights=True if nside > 16 else False,
)
elif map2alm_lsq_maxiter == 0:
alm = hp.map2alm(input_map, lmax=lmax, iter=0)
log.info("Using map2alm with no weights and no iterations")
else:
alm, error, n_iter = hp.map2alm_lsq(
input_map,
lmax=lmax,
mmax=lmax,
tol=1e-7,
maxiter=map2alm_lsq_maxiter,
)
if n_iter == map2alm_lsq_maxiter:
log.warning(
"hp.map2alm_lsq did not converge in %d iterations,"
+ " residual relative error is %.2g",
n_iter,
error,
)
else:
log.info(
"Used map2alm_lsq, converged in %d iterations,"
+ "residual relative error %.2g",
n_iter,
error,
)
return alm