Public Member Functions | |
| __init__ (self, ApdbCassandraConfig config) | |
| VersionTuple | apdbImplementationVersion (cls) |
| ApdbCassandraConfig | getConfig (self) |
| Table|None | tableDef (self, ApdbTables table) |
| ApdbCassandraConfig | init_database (cls, tuple[str,...] hosts, str keyspace, *, str|None schema_file=None, str|None ss_schema_file=None, int|None read_sources_months=None, int|None read_forced_sources_months=None, bool enable_replica=False, bool replica_skips_diaobjects=False, int|None port=None, str|None username=None, str|None prefix=None, str|None part_pixelization=None, int|None part_pix_level=None, bool time_partition_tables=True, str|None time_partition_start=None, str|None time_partition_end=None, str|None read_consistency=None, str|None write_consistency=None, int|None read_timeout=None, int|None write_timeout=None, tuple[str, str]|None ra_dec_columns=None, int|None replication_factor=None, bool drop=False, CreateTableOptions|None table_options=None) |
| ApdbCassandraReplica | get_replica (self) |
| pandas.DataFrame | getDiaObjects (self, sphgeom.Region region) |
| pandas.DataFrame|None | getDiaSources (self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None) |
| pandas.DataFrame|None | getDiaForcedSources (self, sphgeom.Region region, Iterable[int]|None object_ids, astropy.time.Time visit_time, astropy.time.Time|None start_time=None) |
| bool | containsVisitDetector (self, int visit, int detector, sphgeom.Region region, astropy.time.Time visit_time) |
| None | store (self, astropy.time.Time visit_time, pandas.DataFrame objects, pandas.DataFrame|None sources=None, pandas.DataFrame|None forced_sources=None) |
| None | reassignDiaSources (self, Mapping[int, int] idMap) |
| None | dailyJob (self) |
| int | countUnassociatedObjects (self) |
| ApdbMetadata | metadata (self) |
| ApdbCassandraAdmin | admin (self) |
Public Member Functions inherited from lsst.dax.apdb.apdb.Apdb | |
| Apdb | from_config (cls, ApdbConfig config) |
| Apdb | from_uri (cls, ResourcePathExpression uri) |
Protected Member Functions | |
| ConnectionContext | _context (self) |
| Timer | _timer (self, str name, *, Mapping[str, str|int]|None tags=None) |
| None | _versionCheck (self, DbVersions current_versions, DbVersions db_versions) |
| None | _makeSchema (cls, ApdbConfig config, *, bool drop=False, int|None replication_factor=None, CreateTableOptions|None table_options=None) |
| pandas.DataFrame | _getSources (self, sphgeom.Region region, Iterable[int]|None object_ids, float mjd_start, float mjd_end, ApdbTables table_name) |
| None | _storeReplicaChunk (self, ReplicaChunk replica_chunk) |
| Mapping[int, int] | _queryDiaObjectLastPartitions (self, Iterable[int] ids) |
| None | _deleteMovingObjects (self, pandas.DataFrame objs) |
| None | _storeDiaObjects (self, pandas.DataFrame objs, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk) |
| int|None | _storeDiaSources (self, ApdbTables table_name, pandas.DataFrame sources, ReplicaChunk|None replica_chunk) |
| None | _check_time_partitions (self, Iterable[int] partitions, ApdbCassandraTimePartitionRange time_partitions_range) |
| None | _storeDiaSourcesPartitions (self, pandas.DataFrame sources, astropy.time.Time visit_time, ReplicaChunk|None replica_chunk, int|None subchunk) |
| None | _storeObjectsPandas (self, pandas.DataFrame records, ApdbTables|ExtraTables table_name, Mapping|None extra_columns=None, int|None time_part=None) |
| None | _storeUpdateRecords (self, Iterable[ApdbUpdateRecord] records, ReplicaChunk chunk, *, bool store_chunk=False) |
| pandas.DataFrame | _add_apdb_part (self, pandas.DataFrame df) |
| pandas.DataFrame | _make_empty_catalog (self, ApdbTables table_name) |
| Iterator[tuple[cassandra.query.Statement, tuple]] | _combine_where (self, str prefix, list[tuple[str, tuple]] where1, list[tuple[str, tuple]] where2, tuple[str, tuple]|None where3=None, str|None suffix=None) |
| pandas.DataFrame | _fix_input_timestamps (self, pandas.DataFrame df) |
| int | _batch_size (self, ApdbTables|ExtraTables table) |
Protected Attributes | |
| _config = config | |
| _keyspace = config.keyspace | |
| _schema = ApdbSchema(config.schema_file, config.ss_schema_file) | |
| _session_factory = SessionFactory(config) | |
| ConnectionContext|None | _connection_context = None |
Implementation of APDB database with Apache Cassandra backend.
Parameters
----------
config : `ApdbCassandraConfig`
Configuration object.
|
protected |
Calculate spatial partition for each record and add it to a
DataFrame.
Parameters
----------
df : `pandas.DataFrame`
DataFrame which has to contain ra/dec columns, names of these
columns are defined by configuration ``ra_dec_columns`` field.
Returns
-------
df : `pandas.DataFrame`
DataFrame with ``apdb_part`` column which contains pixel index
for ra/dec coordinates.
Notes
-----
This overrides any existing column in a DataFrame with the same name
(``apdb_part``). Original DataFrame is not changed, copy of a DataFrame
is returned.
|
protected |
Calculate batch size based on config parameters.
|
protected |
Check that time partitons for new data actually exist.
Parameters
----------
partitions : `~collections.abc.Iterable` [`int`]
Time partitions for new data.
time_partitions_range : `ApdbCassandraTimePartitionRange`
Currrent time partition range.
|
protected |
Make cartesian product of two parts of WHERE clause into a series
of statements to execute.
Parameters
----------
prefix : `str`
Initial statement prefix that comes before WHERE clause, e.g.
"SELECT * from Table"
|
protected |
Establish connection if not established and return context.
|
protected |
Objects in DiaObjectsLast can move from one spatial partition to another. For those objects inserting new version does not replace old one, so we need to explicitly remove old versions before inserting new ones.
|
protected |
Update timestamp columns in input DataFrame to be naive datetime type. Clients may or may not generate aware timestamps, code in this class assumes that timestamps are naive, so we convert them to UTC and drop timezone.
|
protected |
Return catalog of DiaSource instances given set of DiaObject IDs.
Parameters
----------
region : `lsst.sphgeom.Region`
Spherical region.
object_ids :
Collection of DiaObject IDs
mjd_start : `float`
Lower bound of time interval.
mjd_end : `float`
Upper bound of time interval.
table_name : `ApdbTables`
Name of the table.
Returns
-------
catalog : `pandas.DataFrame`, or `None`
Catalog containing DiaSource records. Empty catalog is returned if
``object_ids`` is empty.
|
protected |
Make an empty catalog for a table with a given name.
Parameters
----------
table_name : `ApdbTables`
Name of the table.
Returns
-------
catalog : `pandas.DataFrame`
An empty catalog.
|
protected |
Return existing mapping of diaObjectId to its last partition.
|
protected |
Store catalog of DiaObjects from current visit.
Parameters
----------
objs : `pandas.DataFrame`
Catalog with DiaObject records
visit_time : `astropy.time.Time`
Time of the current visit.
replica_chunk : `ReplicaChunk` or `None`
Replica chunk identifier if replication is configured.
|
protected |
Store catalog of DIASources or DIAForcedSources from current visit.
Parameters
----------
table_name : `ApdbTables`
Table where to store the data.
sources : `pandas.DataFrame`
Catalog containing DiaSource records
visit_time : `astropy.time.Time`
Time of the current visit.
replica_chunk : `ReplicaChunk` or `None`
Replica chunk identifier if replication is configured.
Returns
-------
subchunk : `int` or `None`
Subchunk number for resulting replica data, `None` if relication is
not enabled ot subchunking is not enabled.
|
protected |
Store mapping of diaSourceId to its partitioning values.
Parameters
----------
sources : `pandas.DataFrame`
Catalog containing DiaSource records
visit_time : `astropy.time.Time`
Time of the current visit.
replica_chunk : `ReplicaChunk` or `None`
Replication chunk, or `None` when replication is disabled.
subchunk : `int` or `None`
Replication sub-chunk, or `None` when replication is disabled or
sub-chunking is not used.
|
protected |
Store generic objects.
Takes Pandas catalog and stores a bunch of records in a table.
Parameters
----------
records : `pandas.DataFrame`
Catalog containing object records
table_name : `ApdbTables`
Name of the table as defined in APDB schema.
extra_columns : `dict`, optional
Mapping (column_name, column_value) which gives fixed values for
columns in each row, overrides values in ``records`` if matching
columns exist there.
time_part : `int`, optional
If not `None` then insert into a per-partition table.
Notes
-----
If Pandas catalog contains additional columns not defined in table
schema they are ignored. Catalog does not have to contain all columns
defined in a table, but partition and clustering keys must be present
in a catalog or ``extra_columns``.
|
protected |
Store ApdbUpdateRecords in the replica table for those records.
Parameters
----------
records : `list` [`ApdbUpdateRecord`]
Records to store.
chunk : `ReplicaChunk`
Replica chunk for these records.
store_chunk : `bool`
If True then also store replica chunk.
Raises
------
TypeError
Raised if replication is not enabled for this instance.
|
protected |
Create `Timer` instance given its name.
|
protected |
Check schema version compatibility.
| ApdbCassandraAdmin lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.admin | ( | self | ) |
Object providing adminitrative interface for APDB (`ApdbAdmin`).
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| VersionTuple lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.apdbImplementationVersion | ( | cls | ) |
Return version number for current APDB implementation.
Returns
-------
version : `VersionTuple`
Version of the code defined in implementation class.
| bool lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.containsVisitDetector | ( | self, | |
| int | visit, | ||
| int | detector, | ||
| sphgeom.Region | region, | ||
| astropy.time.Time | visit_time ) |
Test whether any sources for a given visit-detector are present in
the APDB.
Parameters
----------
visit, detector : `int`
The ID of the visit-detector to search for.
region : `lsst.sphgeom.Region`
Region corresponding to the visit/detector combination.
visit_time : `astropy.time.Time`
Visit time (as opposed to visit processing time). This can be any
timestamp in the visit timespan, e.g. its begin or end time.
Returns
-------
present : `bool`
`True` if at least one DiaSource or DiaForcedSource record
may exist for the specified observation, `False` otherwise.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| int lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.countUnassociatedObjects | ( | self | ) |
Return the number of DiaObjects that have only one DiaSource
associated with them.
Used as part of ap_verify metrics.
Returns
-------
count : `int`
Number of DiaObjects with exactly one associated DiaSource.
Notes
-----
This method can be very inefficient or slow in some implementations.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.dailyJob | ( | self | ) |
Implement daily activities like cleanup/vacuum. What should be done during daily activities is determined by specific implementation.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| ApdbCassandraReplica lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.get_replica | ( | self | ) |
Return `ApdbReplica` instance for this database.
| ApdbCassandraConfig lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getConfig | ( | self | ) |
Return APDB configuration for this instance, including any updates
that may be read from database.
Returns
-------
config : `ApdbConfig`
APDB configuration.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| pandas.DataFrame | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaForcedSources | ( | self, | |
| sphgeom.Region | region, | ||
| Iterable[int] | None | object_ids, | ||
| astropy.time.Time | visit_time, | ||
| astropy.time.Time | None | start_time = None ) |
Return catalog of DiaForcedSource instances from a given region.
Parameters
----------
region : `lsst.sphgeom.Region`
Region to search for DIASources.
object_ids : iterable [ `int` ], optional
List of DiaObject IDs to further constrain the set of returned
sources. If list is empty then empty catalog is returned with a
correct schema. If `None` then returned sources are not
constrained.
visit_time : `astropy.time.Time`
Time of the current visit. If APDB contains records later than this
time they may also be returned.
start_time : `astropy.time.Time`, optional
Lower bound of time window for the query. If not specified then
it is calculated using ``visit_time`` and
``read_forced_sources_months`` configuration parameter.
Returns
-------
catalog : `pandas.DataFrame`, or `None`
Catalog containing DiaForcedSource records. `None` is returned if
``start_time`` is not specified and ``read_forced_sources_months``
configuration parameter is set to 0.
Raises
------
NotImplementedError
May be raised by some implementations if ``object_ids`` is `None`.
Notes
-----
This method returns DiaForcedSource catalog for a region with
additional filtering based on DiaObject IDs. Only a subset of DiaSource
history is returned limited by ``read_forced_sources_months`` config
parameter, w.r.t. ``visit_time``. If ``object_ids`` is empty then an
empty catalog is always returned with the correct schema
(columns/types). If ``object_ids`` is `None` then no filtering is
performed and some of the returned records may be outside the specified
region.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| pandas.DataFrame lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaObjects | ( | self, | |
| sphgeom.Region | region ) |
Return catalog of DiaObject instances from a given region.
This method returns only the last version of each DiaObject,
and may return only the subset of the DiaObject columns needed
for AP association. Some
records in a returned catalog may be outside the specified region, it
is up to a client to ignore those records or cleanup the catalog before
futher use.
Parameters
----------
region : `lsst.sphgeom.Region`
Region to search for DIAObjects.
Returns
-------
catalog : `pandas.DataFrame`
Catalog containing DiaObject records for a region that may be a
superset of the specified region.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| pandas.DataFrame | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.getDiaSources | ( | self, | |
| sphgeom.Region | region, | ||
| Iterable[int] | None | object_ids, | ||
| astropy.time.Time | visit_time, | ||
| astropy.time.Time | None | start_time = None ) |
Return catalog of DiaSource instances from a given region.
Parameters
----------
region : `lsst.sphgeom.Region`
Region to search for DIASources.
object_ids : iterable [ `int` ], optional
List of DiaObject IDs to further constrain the set of returned
sources. If `None` then returned sources are not constrained. If
list is empty then empty catalog is returned with a correct
schema.
visit_time : `astropy.time.Time`
Time of the current visit. If APDB contains records later than this
time they may also be returned.
start_time : `astropy.time.Time`, optional
Lower bound of time window for the query. If not specified then
it is calculated using ``visit_time`` and
``read_forced_sources_months`` configuration parameter.
Returns
-------
catalog : `pandas.DataFrame`, or `None`
Catalog containing DiaSource records. `None` is returned if
``start_time`` is not specified and ``read_sources_months``
configuration parameter is set to 0.
Notes
-----
This method returns DiaSource catalog for a region with additional
filtering based on DiaObject IDs. Only a subset of DiaSource history
is returned limited by ``read_sources_months`` config parameter, w.r.t.
``visit_time``. If ``object_ids`` is empty then an empty catalog is
always returned with the correct schema (columns/types). If
``object_ids`` is `None` then no filtering is performed and some of the
returned records may be outside the specified region.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| ApdbCassandraConfig lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.init_database | ( | cls, | |
| tuple[str, ...] | hosts, | ||
| str | keyspace, | ||
| * | , | ||
| str | None | schema_file = None, | ||
| str | None | ss_schema_file = None, | ||
| int | None | read_sources_months = None, | ||
| int | None | read_forced_sources_months = None, | ||
| bool | enable_replica = False, | ||
| bool | replica_skips_diaobjects = False, | ||
| int | None | port = None, | ||
| str | None | username = None, | ||
| str | None | prefix = None, | ||
| str | None | part_pixelization = None, | ||
| int | None | part_pix_level = None, | ||
| bool | time_partition_tables = True, | ||
| str | None | time_partition_start = None, | ||
| str | None | time_partition_end = None, | ||
| str | None | read_consistency = None, | ||
| str | None | write_consistency = None, | ||
| int | None | read_timeout = None, | ||
| int | None | write_timeout = None, | ||
| tuple[str, str] | None | ra_dec_columns = None, | ||
| int | None | replication_factor = None, | ||
| bool | drop = False, | ||
| CreateTableOptions | None | table_options = None ) |
Initialize new APDB instance and make configuration object for it.
Parameters
----------
hosts : `tuple` [`str`, ...]
List of host names or IP addresses for Cassandra cluster.
keyspace : `str`
Name of the keyspace for APDB tables.
schema_file : `str`, optional
Location of (YAML) configuration file with APDB schema. If not
specified then default location will be used.
ss_schema_file : `str`, optional
Location of (YAML) configuration file with SSO schema. If not
specified then default location will be used.
read_sources_months : `int`, optional
Number of months of history to read from DiaSource.
read_forced_sources_months : `int`, optional
Number of months of history to read from DiaForcedSource.
enable_replica : `bool`, optional
If True, make additional tables used for replication to PPDB.
replica_skips_diaobjects : `bool`, optional
If `True` then do not fill regular ``DiaObject`` table when
``enable_replica`` is `True`.
port : `int`, optional
Port number to use for Cassandra connections.
username : `str`, optional
User name for Cassandra connections.
prefix : `str`, optional
Optional prefix for all table names.
part_pixelization : `str`, optional
Name of the MOC pixelization used for partitioning.
part_pix_level : `int`, optional
Pixelization level.
time_partition_tables : `bool`, optional
Create per-partition tables.
time_partition_start : `str`, optional
Starting time for per-partition tables, in yyyy-mm-ddThh:mm:ss
format, in TAI.
time_partition_end : `str`, optional
Ending time for per-partition tables, in yyyy-mm-ddThh:mm:ss
format, in TAI.
read_consistency : `str`, optional
Name of the consistency level for read operations.
write_consistency : `str`, optional
Name of the consistency level for write operations.
read_timeout : `int`, optional
Read timeout in seconds.
write_timeout : `int`, optional
Write timeout in seconds.
ra_dec_columns : `tuple` [`str`, `str`], optional
Names of ra/dec columns in DiaObject table.
replication_factor : `int`, optional
Replication factor used when creating new keyspace, if keyspace
already exists its replication factor is not changed.
drop : `bool`, optional
If `True` then drop existing tables before re-creating the schema.
table_options : `CreateTableOptions`, optional
Options used when creating Cassandra tables.
Returns
-------
config : `ApdbCassandraConfig`
Resulting configuration object for a created APDB instance.
| ApdbMetadata lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.metadata | ( | self | ) |
Object controlling access to APDB metadata (`ApdbMetadata`).
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.reassignDiaSources | ( | self, | |
| Mapping[int, int] | idMap ) |
Associate DiaSources with SSObjects, dis-associating them
from DiaObjects.
Parameters
----------
idMap : `Mapping`
Maps DiaSource IDs to their new SSObject IDs.
Raises
------
ValueError
Raised if DiaSource ID does not exist in the database.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.store | ( | self, | |
| astropy.time.Time | visit_time, | ||
| pandas.DataFrame | objects, | ||
| pandas.DataFrame | None | sources = None, | ||
| pandas.DataFrame | None | forced_sources = None ) |
Store all three types of catalogs in the database.
Parameters
----------
visit_time : `astropy.time.Time`
Time of the visit.
objects : `pandas.DataFrame`
Catalog with DiaObject records.
sources : `pandas.DataFrame`, optional
Catalog with DiaSource records.
forced_sources : `pandas.DataFrame`, optional
Catalog with DiaForcedSource records.
Notes
-----
This methods takes DataFrame catalogs, their schema must be
compatible with the schema of APDB table:
- column names must correspond to database table columns
- types and units of the columns must match database definitions,
no unit conversion is performed presently
- columns that have default values in database schema can be
omitted from catalog
- this method knows how to fill interval-related columns of DiaObject
(validityStart, validityEnd) they do not need to appear in a
catalog
- source catalogs have ``diaObjectId`` column associating sources
with objects
This operation need not be atomic, but DiaSources and DiaForcedSources
will not be stored until all DiaObjects are stored.
Reimplemented from lsst.dax.apdb.apdb.Apdb.
| Table | None lsst.dax.apdb.cassandra.apdbCassandra.ApdbCassandra.tableDef | ( | self, | |
| ApdbTables | table ) |
Return table schema definition for a given table.
Parameters
----------
table : `ApdbTables`
One of the known APDB tables.
Returns
-------
tableSchema : `.schema_model.Table` or `None`
Table schema description, `None` is returned if table is not
defined by this implementation.
Reimplemented from lsst.dax.apdb.apdb.Apdb.