22__all__ = [
"ConvertRefcatManager",
"ConvertGaiaManager",
"ConvertGaiaXpManager"]
24from ctypes
import c_int
31import astropy.units
as u
42COUNTER = multiprocessing.Value(c_int, 0)
44FILE_PROGRESS = multiprocessing.Value(c_int, 0)
48 """Placeholder for ConfigurableField validation; refcat convert is
49 configured by the parent convert Task.
56 Convert a reference catalog from external files into the LSST HTM sharded
57 format, using a multiprocessing Pool to speed up the work.
61 filenames : `dict` [`int`, `str`]
62 The HTM pixel id and filenames to convert the catalog into.
63 config : `lsst.meas.algorithms.ConvertReferenceCatalogConfig`
64 The Task configuration holding the field names.
65 file_reader : `lsst.pipe.base.Task`
66 The file reader to use to load the files.
67 indexer : `lsst.meas.algorithms.HtmIndexer`
68 The class used to compute the HTM pixel per coordinate.
69 schema : `lsst.afw.table.Schema`
70 The schema of the output catalog.
71 key_map : `dict` [`str`, `lsst.afw.table.Key`]
72 The mapping from output field names to keys in the Schema.
73 htmRange : `tuple` [`int`]
74 The start and end HTM pixel ids.
75 addRefCatMetadata : callable
76 A function called to add extra metadata to each output Catalog.
77 log : `lsst.log.Log` or `logging.Logger`
78 The log to send messages to.
80 _flags = [
'photometric',
'resolved',
'variable']
81 _DefaultName =
'convertRefcatManager'
82 ConfigClass = ConvertRefcatManagerConfig
84 def __init__(self, filenames, config, file_reader, indexer,
85 schema, key_map, htmRange, addRefCatMetadata, log):
96 if self.
config.coord_err_unit
is not None:
100 def run(self, inputFiles):
101 """Index a set of input files from a reference catalog, and write the
102 output to the appropriate filenames, in parallel.
107 A list of file paths to read data from.
111 output : `dict` [`int`, `str`]
112 The htm ids and the filenames that were written to.
116 with multiprocessing.Manager()
as manager:
118 FILE_PROGRESS.value = 0
119 fileLocks = manager.dict()
122 fileLocks[i] = manager.Lock()
123 self.
log.info(
"File locks created.")
125 start_time = time.perf_counter()
126 with multiprocessing.Pool(self.
config.n_processes)
as pool:
127 result = pool.starmap(self.
_convertOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
128 end_time = time.perf_counter()
129 self.
log.info(
"Finished writing files. Elapsed time: %.2f seconds", end_time-start_time)
131 return {id: self.
filenames[id]
for item
in result
for id
in item}
134 """Read and process one file, and write its records to the correct
135 indexed files, while handling exceptions in a useful way so that they
136 don't get swallowed by the multiprocess pool.
142 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
143 A Lock for each HTM pixel; each pixel gets one file written, and
144 we need to block when one process is accessing that file.
148 pixels, files : `list` [`int`]
149 The pixel ids that were written to.
151 self.
log.debug(
"Processing: %s", filename)
156 matchedPixels = self.
indexer.indexPoints(inputData[self.
config.ra_name],
157 inputData[self.
config.dec_name])
159 self.
log.error(
"Failure preparing data for: %s", filename)
162 pixel_ids = set(matchedPixels)
163 for pixelId
in pixel_ids:
164 with fileLocks[pixelId]:
166 self.
_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
168 self.
log.error(
"Failure processing filename: %s, pixelId: %s",
171 with FILE_PROGRESS.get_lock():
172 oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFiles
173 FILE_PROGRESS.value += 1
174 percent = 100 * FILE_PROGRESS.value / self.
nInputFiles
176 if np.floor(percent) - np.floor(oldPercent) >= 1:
177 self.
log.info(
"Completed %d / %d files: %d %% complete ",
183 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
184 """Process one HTM pixel, appending to an existing catalog or creating
185 a new catalog, as needed.
189 inputData : `numpy.ndarray`
190 The data from one input file.
191 matchedPixels : `numpy.ndarray`
192 The row-matched pixel indexes corresponding to ``inputData``.
194 The pixel index we are currently processing.
195 fluxes : `dict` [`str`, `numpy.ndarray`]
196 The values that will go into the flux and fluxErr fields in the
198 coordErr : `dict` [`str`, `numpy.ndarray`]
199 The values that will go into the coord_raErr, coord_decErr, and
200 coord_ra_dec_Cov fields in the output catalog (in radians).
202 idx = np.where(matchedPixels == pixelId)[0]
204 for outputRow, inputRow
in zip(catalog[-len(idx):], inputData[idx]):
207 with COUNTER.get_lock():
208 self.
_setIds(inputData[idx], catalog)
211 for name, array
in fluxes.items():
212 catalog[self.
key_map[name]][-len(idx):] = array[idx]
215 for name, array
in coordErr.items():
216 catalog[name][-len(idx):] = array[idx]
218 catalog.writeFits(self.
filenames[pixelId])
221 """Fill the `id` field of catalog with a running index, filling the
222 last values up to the length of ``inputData``.
224 Fill with `self.config.id_name` if specified, otherwise use the
225 global running counter value.
229 inputData : `numpy.ndarray`
230 The input data that is being processed.
231 catalog : `lsst.afw.table.SimpleCatalog`
232 The output catalog to fill the ids.
234 size = len(inputData)
236 catalog[
'id'][-size:] = inputData[self.
config.id_name]
238 idEnd = COUNTER.value + size
239 catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
240 COUNTER.value = idEnd
243 """Get a catalog from disk or create it if it doesn't exist.
248 Identifier for catalog to retrieve
249 schema : `lsst.afw.table.Schema`
250 Schema to use in catalog creation it does not exist.
252 The number of new elements that will be added to the catalog,
253 so space can be preallocated.
257 catalog : `lsst.afw.table.SimpleCatalog`
258 The new or read-and-resized catalog specified by `dataId`.
261 if os.path.isfile(self.
filenames[pixelId]):
262 catalog = afwTable.SimpleCatalog.readFits(self.
filenames[pixelId])
263 catalog.resize(len(catalog) + nNewElements)
264 return catalog.copy(deep=
True)
265 catalog = afwTable.SimpleCatalog(schema)
266 catalog.resize(nNewElements)
272 """Create an ICRS coord. from a row of a catalog being converted.
276 row : `numpy.ndarray`
277 Row from catalog being converted.
279 Name of RA key in catalog being converted.
281 Name of Dec key in catalog being converted.
285 coord : `lsst.geom.SpherePoint`
291 """Compute the ra/dec error fields that will go into the output catalog.
295 inputData : `numpy.ndarray`
296 The input data to compute fluxes for.
300 coordErr : `dict` [`str`, `numpy.ndarray`]
301 The values that will go into the coord_raErr, coord_decErr, fields
302 in the output catalog (in radians).
306 This does not handle the ra/dec covariance field,
307 ``coord_ra_coord_dec_Cov``. That field is handled in
308 `_setCoordinateCovariance`.
311 if hasattr(self,
"coord_err_unit"):
312 result[
'coord_raErr'] = u.Quantity(inputData[self.
config.ra_err_name],
314 result[
'coord_decErr'] = u.Quantity(inputData[self.
config.dec_err_name],
319 """Set flags in an output record.
323 record : `lsst.afw.table.SimpleRecord`
324 Row from indexed catalog to modify.
325 row : `numpy.ndarray`
326 Row from catalog being converted.
328 names = record.schema.getNames()
331 attr_name =
'is_{}_name'.format(flag)
332 record.set(self.
key_map[flag], bool(row[getattr(self.
config, attr_name)]))
335 """Compute the flux fields that will go into the output catalog.
339 inputData : `numpy.ndarray`
340 The input data to compute fluxes for.
344 fluxes : `dict` [`str`, `numpy.ndarray`]
345 The values that will go into the flux and fluxErr fields in the
349 for item
in self.
config.mag_column_list:
350 result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
351 if len(self.
config.mag_err_column_map) > 0:
352 for err_key
in self.
config.mag_err_column_map.keys():
353 error_col_name = self.
config.mag_err_column_map[err_key]
356 fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
357 inputData[err_key].copy())*1e9
358 result[err_key+
'_fluxErr'] = fluxErr
362 """Set proper motion fields in a record of an indexed catalog.
364 The proper motions are read from the specified columns,
365 scaled appropriately, and installed in the appropriate
366 columns of the output.
370 record : `lsst.afw.table.SimpleRecord`
371 Row from indexed catalog to modify.
372 row : structured `numpy.array`
373 Row from catalog being converted.
375 if self.
config.pm_ra_name
is None:
377 radPerOriginal = np.radians(self.
config.pm_scale)/(3600*1000)
378 record.set(self.
key_map[
"pm_ra"], row[self.
config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
379 record.set(self.
key_map[
"pm_dec"], row[self.
config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
381 if self.
config.pm_ra_err_name
is not None:
382 record.set(self.
key_map[
"pm_raErr"], row[self.
config.pm_ra_err_name]*radPerOriginal)
383 record.set(self.
key_map[
"pm_decErr"], row[self.
config.pm_dec_err_name]*radPerOriginal)
386 """Set the parallax fields in a record of a refcat.
388 if self.
config.parallax_name
is None:
390 scale = self.
config.parallax_scale*lsst.geom.milliarcseconds
391 record.set(self.
key_map[
'parallax'], row[self.
config.parallax_name]*scale)
392 record.set(self.
key_map[
'parallaxErr'], row[self.
config.parallax_err_name]*scale)
395 """Convert an epoch in native format to TAI MJD (a float).
397 return astropy.time.Time(nativeEpoch, format=self.
config.epoch_format,
398 scale=self.
config.epoch_scale).tai.mjd
401 """Set the off-diagonal position covariance in a record of an indexed
404 There is no generic way to determine covariance. Override this method
405 in a subclass specialized for your dataset.
409 record : `lsst.afw.table.SimpleRecord`
410 Row from indexed catalog to modify.
411 row : structured `numpy.array`
412 Row from catalog being converted.
414 raise NotImplementedError(
"There is no default method for setting the covariance. Override this "
415 "method in a subclass specialized for your dataset.")
418 """Set extra data fields in a record of an indexed catalog.
422 record : `lsst.afw.table.SimpleRecord`
423 Row from indexed catalog to modify.
424 row : structured `numpy.array`
425 Row from catalog being converted.
427 for extra_col
in self.
config.extra_col_names:
428 value = row[extra_col]
436 if isinstance(value, np.str_):
438 record.set(self.
key_map[extra_col], value)
441 """Fill a record in an indexed catalog to be persisted.
445 record : `lsst.afw.table.SimpleRecord`
446 Row from indexed catalog to modify.
447 row : structured `numpy.array`
448 Row from catalog being converted.
453 if self.
config.full_position_information:
461 """Special-case convert manager to deal with Gaia fluxes.
472 def gaiaFluxToFlux(flux, zeroPoint):
473 """Equations 5.19 and 5.30 from the Gaia calibration document define the
474 conversion from Gaia electron/second fluxes to AB magnitudes.
475 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
477 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
479 result[flux == 0] = 0
484 with np.errstate(invalid=
'ignore', divide=
'ignore'):
487 result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
488 result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
489 result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
491 result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
492 result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
493 result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']
498 """Set the off-diagonal position covariance in a record of an indexed
501 Convert the Gaia coordinate correlations into covariances.
505 record : `lsst.afw.table.SimpleRecord`
506 Row from indexed catalog to modify.
507 row : structured `numpy.array`
508 Row from catalog being converted.
510 inputParams = [
'ra',
'dec',
'parallax',
'pmra',
'pmdec']
511 outputParams = [
'coord_ra',
'coord_dec',
'parallax',
'pm_ra',
'pm_dec']
517 reorder = [0, 1, 4, 2, 3]
524 j_error = row[f
'{inputParams[j]}_error'] * inputUnits[j]
525 i_error = row[f
'{inputParams[i]}_error'] * inputUnits[i]
526 ij_corr = row[f
'{inputParams[j]}_{inputParams[i]}_corr']
527 cov = (i_error * j_error * ij_corr).to_value(self.
outputUnit)
531 a = (i
if (reorder[i] < reorder[j])
else j)
532 b = (j
if (reorder[i] < reorder[j])
else i)
534 record.set(self.
key_map[f
'{outputParams[a]}_{outputParams[b]}_Cov'], cov)
538 """Special-case convert manager for Gaia XP spectrophotometry catalogs,
539 that have fluxes/flux errors, instead of magnitudes/mag errors. The input
540 flux and error values are in units of W/Hz/(m^2) (Gaia Collaboration, Montegriffo et al. 2022).
541 The the flux and fluxErr fields in the output catalog have units of nJy.
546 for item
in self.
config.mag_column_list:
548 error_col_name = item.replace(
"_flux_",
"_flux_error_")
550 result[item +
"_flux"] = (
551 inputData[item] * u.Watt / u.Hz / u.meter / u.meter
553 result[item +
"_fluxErr"] = (
554 inputData[error_col_name] * u.Watt / u.Hz / u.meter / u.meter
_setCoordinateCovariance(self, record, row)
__init__(self, *args, **kwargs)
_getFluxes(self, inputData)
_setParallax(self, record, row)
computeCoord(row, ra_name, dec_name)
_setFlags(self, record, row)
_setProperMotion(self, record, row)
_fillRecord(self, record, row)
_convertOneFile(self, filename, fileLocks)
_epochToMjdTai(self, nativeEpoch)
__init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
_setExtra(self, record, row)
_setCoordinateCovariance(self, record, row)
_getCoordErr(self, inputData)
_getFluxes(self, inputData)
_doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
_setIds(self, inputData, catalog)
getCatalog(self, pixelId, schema, nNewElements)