Source code for gdt.missions.swift.bat.pha

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT
# WITH UNLIMITED RIGHTS
#
# Grant No.: 80NSSC21K0651
# Grantee Name: Universities Space Research Association
# Grantee Address: 425 3rd Street SW, Suite 950, Washington DC 20024
#
# Copyright 2024 by Universities Space Research Association (USRA). All rights
# reserved.
#
# Developed by: Corinne Fletcher
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# This work is a derivative of the Gamma-ray Data Tools (GDT), including the
# Core and Fermi packages, originally developed by the following:
#
#     William Cleveland and Adam Goldstein
#     Universities Space Research Association
#     Science and Technology Institute
#     https://sti.usra.edu
#
#     Daniel Kocevski
#     National Aeronautics and Space Administration (NASA)
#     Marshall Space Flight Center
#     Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
import numpy as np
import astropy.io.fits as fits

from gdt.core.pha import Pha
from gdt.core.data_primitives import Ebounds, Gti, EnergyBins
from gdt.core.file import FitsFileContextManager
from .headers import PhaHeaders
from ..time import Time

__all__ = ['BatPha']


[docs]class BatPha(Pha): def __init__(self): super().__init__() """PHA class for BAT spectra. """
[docs] @classmethod def from_data(cls, data, gti=None, trigger_time=None, filename=None, headers=None, channel_mask=None, header_type=PhaHeaders, **kwargs): """Create a PHA object from an :class:`~.data_primitives.EnergyBins` object. Args: data (:class:`~.data_primitives.EnergyBins`): The PHA count spectrum data gti (:class:`~.data_primitives.Gti`), optional): The good time intervals of the pectrum data. If omitted, then assumes the range (0, exposure). trigger_time (float, optional): The trigger time, if applicable. If provided, the data times will be shifted relative to the trigger time. Default is zero. headers (:class:`~.headers.FileHeaders`): The file headers channel_mask (np.array(dtype=bool)): A boolean array representing the valid channels. If omitted, assumes all non-zero count channels are valid. header_type (:class:`~.headers.FileHeaders`): Default file header class. Only used if ``headers`` is not defined Returns: (:class:`Pha`) """ obj = cls() obj._filename = filename # set data and ebounds if not isinstance(data, EnergyBins): raise TypeError('data must be of type EnergyBins') obj._data = data obj._ebounds = Ebounds.from_bounds(data.lo_edges, data.hi_edges) # set GTI if gti is not None: if not isinstance(gti, Gti): raise TypeError('gti must be of type Gti') else: gti = GTI.from_bounds(obj.column(3, 'START'),obj.column(3, 'STOP')) obj._gti = gti # update times to be relative to ...if trigtime is set if trigger_time is not None: if trigger_time < 0.0: raise ValueError('trigger_time must be non-negative') obj._trigtime = trigger_time # set headers if headers is not None: if not isinstance(headers, PhaHeaders): raise TypeError('headers must be of type FileHeaders') obj._headers = headers else: obj._headers = header_type() tstart, tstop = obj._gti.range obj._headers['PRIMARY']['TSTART'] = tstart obj._headers['PRIMARY']['TSTOP'] = tstop obj._headers['PRIMARY']['TRIGTIME'] = trigger_time obj._headers['SPECTRUM']['DETCHANS'] = data.size obj._headers['SPECTRUM']['EXPOSURE'] = obj._data.exposure obj._headers['SPECTRUM']['TELAPSE'] = tstop-tstart obj._headers['EBOUNDS']['DETCHANS'] = data.size # set the channel mask # if no channel mask is given, assume zero-count channels are bad if channel_mask is None: channel_mask = np.zeros(data.size, dtype=bool) channel_mask[data.counts > 0] = True try: iter(channel_mask) channel_mask = np.asarray(channel_mask).flatten().astype(bool) except: raise TypeError('channel_mask must be a Boolean array') if channel_mask.size != obj._data.size: raise ValueError('channel_mask must be the same size as the ' \ 'number of data bins') obj._channel_mask = channel_mask return obj
[docs] @classmethod def open(cls, file_path, **kwargs): """Open a BAT Pha FITS file and return the BatPha object Args: file_path (str): The file path of the FITS file Returns: (:class:`BatPha`) """ obj = super(Pha, cls).open(file_path, **kwargs) trigtime = None # get the headers hdrs = [hdu.header for hdu in obj.hdulist] if 'TRIGTIME' in hdrs[0].keys(): headers = PhaHeaders.from_headers(hdrs) trigtime = float(headers['PRIMARY']['TRIGTIME']) else: headers = PhaHeaders.from_headers(hdrs) #if 'TSTART' in hdrs[0].keys(): tstart = float(headers['PRIMARY']['TSTART']) tstop = float(headers['PRIMARY']['TSTOP']) # the channel energy bounds ebounds = Ebounds.from_bounds(obj.column(2, 'E_MIN'), obj.column(2, 'E_MAX')) channel = obj.column(1,'CHANNEL') rate = obj.column(1, 'RATE') stat_err = obj.column(1, 'STAT_ERR') sys_err = obj.column(1, 'SYS_ERR') #the good time intervals stdgti_start = obj.column(3, 'START') stdgti_stop = obj.column(3, 'STOP') if trigtime is not None: stdgti_start -= trigtime stdgti_stop -= trigtime stdgti = Gti.from_bounds(stdgti_start, stdgti_stop) exposure = headers['SPECTRUM']['EXPOSURE'] counts = obj.column(1, 'RATE') * exposure data = EnergyBins(counts, obj.column(2, 'E_MIN'), obj.column(2, 'E_MAX'), exposure, ) return cls.from_data(data, gti=stdgti, trigger_time=trigtime, filename=obj.filename, headers=headers)
def _build_hdulist(self): # create FITS and primary header hdulist = fits.HDUList() primary_hdu = fits.PrimaryHDU(header=self.headers['PRIMARY']) for key, val in self.headers['PRIMARY'].items(): primary_hdu.header[key] = val hdulist.append(primary_hdu) # the spectrum extension spectrum_hdu = self._spectrum_table() hdulist.append(spectrum_hdu) # the ebounds extension ebounds_hdu = self._ebounds_table() hdulist.append(ebounds_hdu) # the GTI extension stdgti_hdu = self._gti_table() hdulist.append(stdgti_hdu) return hdulist def _build_headers(self, trigtime, tstart, tstop, num_chans): headers = self.headers.copy() for hdu in headers: hdu['TSTART'] = tstart hdu['TSTOP'] = tstop if trigtime is not None: hdu['TRIGTIME'] = trigtime return headers def _ebounds_table(self): chan_col = fits.Column(name='CHANNEL', format='1I', array=np.arange(self.num_chans, dtype=int)) emin_col = fits.Column(name='E_MIN', format='1E', unit='keV', array=self.ebounds.low_edges()) emax_col = fits.Column(name='E_MAX', format='1E', unit='keV', array=self.ebounds.high_edges()) hdu = fits.BinTableHDU.from_columns([chan_col, emin_col, emax_col], header=self.headers['EBOUNDS']) for key, val in self.headers['EBOUNDS'].items(): hdu.header[key] = val return hdu def _spectrum_table(self): chan_col = fits.Column(name='CHANNEL', format='1I', array=np.arange(self.num_chans, dtype=int)) rates_col = fits.Column(name='RATE', format='1D', unit='count/s', array=self.data.rates) staterr_col = fits.Column(name='STAT_ERR', format='1D', unit='count/s', array=self.data.rate_uncertainty) syserr_col = fits.Column(name='SYS_ERR', format='1D', unit='count/s', array=self.data.rate_uncertainty) hdu = fits.BinTableHDU.from_columns([chan_col, rates_col, staterr_col, syserr_col], header=self.headers['SPECTRUM']) for key, val in self.headers['SPECTRUM'].items(): hdu.header[key] = val return hdu def _gti_table(self): tstart = np.array(self.gti.low_edges()) tstop = np.array(self.gti.high_edges()) if self.trigtime is not None: tstart += self.trigtime tstop += self.trigtime start_col = fits.Column(name='START', format='1D', unit='s', bzero=self.trigtime, array=tstart) stop_col = fits.Column(name='STOP', format='1D', unit='s', bzero=self.trigtime, array=tstop) hdu = fits.BinTableHDU.from_columns([start_col, stop_col], header=self.headers['STDGTI']) for key, val in self.headers['STDGTI'].items(): hdu.header[key] = val hdu.header.comments['TIMEZERO'] = 'Zero-point offset for TIME column' return hdu