casacore
Loading...
Searching...
No Matches
SiscoStManColumn.h
Go to the documentation of this file.
1#ifndef CASACORE_SISCO_ST_MAN_COLUMN_H_
2#define CASACORE_SISCO_ST_MAN_COLUMN_H_
3
4#include <casacore/tables/DataMan/StManColumn.h>
5
6#include <casacore/casa/Arrays/Array.h>
7#include <casacore/casa/Arrays/IPosition.h>
8
9#include <casacore/tables/Tables/ScalarColumn.h>
10
11#include "SiscoReader.h"
12#include "SiscoWriter.h"
13#include "ShapesFileReader.h"
14#include "ShapesFileWriter.h"
15
16#include <filesystem>
17#include <optional>
18
19namespace casacore {
20
21class SiscoStMan;
22
27class SiscoStManColumn final : public StManColumn {
28 public:
34 explicit SiscoStManColumn(SiscoStMan &parent, DataType dtype)
35 : StManColumn(dtype), parent_(parent) {
36 if (dtype != casacore::TpComplex) {
37 throw std::runtime_error(
38 "Sisco storage manager column can only be used for a data column "
39 "with single precision complex values");
40 }
41 }
42
47 bool isWritable() const final { return true; }
48
49 bool canChangeShape() const final { return true; }
50
51 void setShape(rownr_t, const IPosition &) final {
52 // Shape is implied from the array; explicit setting of the shape is not
53 // required.
54 }
55 void setShape(unsigned, const IPosition &) final {}
56
57 bool isShapeDefined(rownr_t row) final {
58 if ((writer_ && row >= current_row_) || !file_exists_) {
59 return false;
60 } else {
61 return true;
62 }
63 }
64 bool isShapeDefined(unsigned) final { return false; }
65
67 void setShapeColumn(const IPosition &shape) final {
68 if (shape.size() != 2) {
69 throw std::runtime_error(
70 "Sisco storage manager is used for a column with " +
71 std::to_string(shape.size()) +
72 " dimensions, but it can only be used for "
73 "columns with exactly 2 dimensions");
74 }
75 // This call is ignored; shape will always be determined from array size.
76 }
77
80 IPosition shape(rownr_t row) final {
81 if ((writer_ && row >= current_row_) || !file_exists_) {
82 return IPosition{0, 0};
83 } else {
84 if (!reader_ || row < current_row_) {
85 OpenReader();
86 }
87 while (current_row_ < row) {
88 SkipRow();
89 }
91 }
92 }
93 IPosition shape(unsigned row) final {
94 return shape(static_cast<rownr_t>(row));
95 }
96
102 void getArrayV(rownr_t row, ArrayBase &dataPtr) final {
104 static_cast<Array<std::complex<float>> &>(dataPtr);
105 if (!reader_ || row < current_row_) {
106 OpenReader();
107 }
108 while (current_row_ < row) {
109 SkipRow();
110 }
111
114 if (shape.size() >= 2) {
115 const int n_polarizations = shape[0];
116 const size_t n_channels = shape[1];
117
118 if (n_channels) {
119 bool ownership;
120 Complex *storage = array.getStorage(ownership);
121 buffer_.resize(n_channels);
122 for (int polarization = 0; polarization != n_polarizations;
123 ++polarization) {
124 reader_->GetNextResult(buffer_);
125 for (size_t channel = 0; channel != n_channels; ++channel) {
126 storage[channel * n_polarizations + polarization] =
127 buffer_[channel];
128 }
129 }
130 array.putStorage(storage, ownership);
131 }
132 }
133
135 ++current_row_;
136 }
137
143 void putArrayV(rownr_t row, const ArrayBase &dataPtr) final {
145 static_cast<const Array<std::complex<float>> &>(dataPtr);
146 if (!writer_ || row < current_row_) {
147 OpenWriter();
148 }
149 while (current_row_ < row) {
151 }
152
153 const int field_id = field_id_column_(current_row_);
154 const int data_desc_id = data_desc_id_column_(current_row_);
155 const int antenna1 = antenna1_column_(current_row_);
156 const int antenna2 = antenna2_column_(current_row_);
157 if (array.shape().size() >= 2) {
158 const int n_polarizations = array.shape()[0];
159 const size_t n_channels = array.shape()[1];
160
161 if (n_channels) {
162 bool ownership;
163 const std::complex<float> *storage = array.getStorage(ownership);
164 buffer_.resize(n_channels);
165 for (int polarization = 0; polarization != n_polarizations;
166 ++polarization) {
167 const size_t baseline_id = GetBaselineId(
168 field_id, data_desc_id, antenna1, antenna2, polarization);
169 for (size_t channel = 0; channel != n_channels; ++channel) {
170 buffer_[channel] =
171 storage[channel * n_polarizations + polarization];
172 }
173 writer_->Write(baseline_id, buffer_);
174 }
175 array.freeStorage(storage, ownership);
176 }
177 }
178
179 ++current_row_;
180 shapes_writer_->Write(array.shape());
181 }
182
183 void Prepare();
184
185 private:
186 SiscoStManColumn(const SiscoStManColumn &source) = delete;
187 void operator=(const SiscoStManColumn &source) = delete;
188
189 void Reset() {
190 reader_.reset();
191 shapes_reader_.reset();
192 writer_.reset();
193 shapes_writer_.reset();
194 }
195
196 void OpenWriter() {
197 Reset();
198 writer_.emplace(parent_.fileName(), parent_.PredictLevel(),
199 parent_.DeflateLevel());
200 char header_buffer[kHeaderSize];
201 std::fill_n(header_buffer, kHeaderSize, 0);
202 std::copy_n(kMagic, kMagicSize, &header_buffer[0]);
203 std::copy_n(reinterpret_cast<const char *>(&kVersionMajor), 2,
204 &header_buffer[kMagicSize]);
205 std::copy_n(reinterpret_cast<const char *>(&kVersionMinor), 2,
206 &header_buffer[kMagicSize + 2]);
207 std::span<const std::byte> header(
208 reinterpret_cast<const std::byte *>(header_buffer), kHeaderSize);
209 writer_->Open(header);
210
212
213 current_row_ = 0;
214 baseline_ids_.clear();
215 baseline_count_ = 0;
216 }
217
218 void OpenReader() {
219 Reset();
220 reader_.emplace(parent_.fileName());
221 char header_buffer[kHeaderSize];
222 std::span<std::byte> header(reinterpret_cast<std::byte *>(header_buffer),
224 reader_->Open(header);
225 char magic_tag[kMagicSize];
226 short version_major;
227 short version_minor;
228 std::copy_n(&header_buffer[0], kMagicSize, magic_tag);
229 std::copy_n(&header_buffer[kMagicSize], 2,
230 reinterpret_cast<char *>(&version_major));
231 std::copy_n(&header_buffer[kMagicSize + 2], 2,
232 reinterpret_cast<char *>(&version_minor));
233 if (version_major != kVersionMajor) {
234 throw std::runtime_error(
235 "The file on disk is written as a Sisco version " +
236 std::to_string(version_major) +
237 " file, whereas this Casacore version supports only version " +
238 std::to_string(kVersionMajor));
239 }
241
242 current_row_ = 0;
243 baseline_ids_.clear();
244 baseline_count_ = 0;
245 // Always request half of the requests that fit in the buffer of
246 // SiscoReader, so that SiscoReader can preprocess requests using multiple
247 // threads. Every time a row is read/skipped, another row is requested.
248 shape_buffer_.resize(reader_->GetRequestBufferSize() / 2);
252 for (size_t i = 0; i != shape_buffer_.size(); ++i) {
254 }
255 }
256
259 if (!shapes_reader_->Eof() && shape.size() >= 2) {
260 const int field_id = field_id_column_(current_shape_reading_row_);
261 const int data_desc_id = data_desc_id_column_(current_shape_reading_row_);
262 const int antenna1 = antenna1_column_(current_shape_reading_row_);
263 const int antenna2 = antenna2_column_(current_shape_reading_row_);
264 const int n_polarizations = shape[0];
265 const int n_channels = shape[1];
266 for (int polarization = 0; polarization != n_polarizations;
267 ++polarization) {
268 const size_t baseline_id = GetBaselineId(
269 field_id, data_desc_id, antenna1, antenna2, polarization);
270 reader_->Request(baseline_id, n_channels);
271 }
272 }
276 }
277
278 size_t GetBaselineId(int field_id, int data_desc_id, int antenna1,
279 int antenna2, int polarization) {
280 const std::array<int, 5> baseline{field_id, data_desc_id, antenna1,
281 antenna2, polarization};
282 std::map<std::array<int, 5>, size_t>::const_iterator iterator =
283 baseline_ids_.find(baseline);
284 if (iterator == baseline_ids_.end()) {
285 iterator = baseline_ids_.emplace(baseline, baseline_count_).first;
287 }
288 return iterator->second;
289 }
290
291 std::string ShapesFilename() const {
292 return parent_.fileName() + kShapesExtension;
293 }
294
296 shapes_writer_->Write(IPosition{0, 0});
297 ++current_row_;
298 }
299
300 void SkipRow() {
303 if (shape.size() >= 2) {
304 const int n_polarizations = shape[0];
305 const int n_channels = shape[1];
306 if (n_channels) {
307 buffer_.resize(n_channels);
308 for (int polarization = 0; polarization != n_polarizations;
309 ++polarization) {
310 reader_->GetNextResult(buffer_);
311 }
312 }
313 }
315 ++current_row_;
316 }
317
318 static constexpr size_t kHeaderSize = 20;
319 static constexpr char kMagic[] = "Sisco\0\0\0";
320 static constexpr size_t kMagicSize = 8;
321 static constexpr uint16_t kVersionMajor = 2;
322 static constexpr uint16_t kVersionMinor = 0;
323 static constexpr char kShapesExtension[] = "-shapes";
324
329
331 std::optional<sisco::SiscoWriter> writer_;
332 std::optional<sisco::SiscoReader> reader_;
333 std::optional<ShapesFileWriter> shapes_writer_;
334 std::optional<ShapesFileReader> shapes_reader_;
335 // A circular buffer to store the already read shapes
336 std::vector<IPosition> shape_buffer_;
341 std::vector<std::complex<float>> buffer_;
342 std::map<std::array<int, 5>, size_t> baseline_ids_;
344 bool file_exists_ = false;
345};
346
347} // namespace casacore
348
349#include "SiscoStMan.h"
350
351namespace casacore {
352
354 Table &table = parent_.table();
355 field_id_column_ = ScalarColumn<int>(table, "FIELD_ID");
356 data_desc_id_column_ = ScalarColumn<int>(table, "DATA_DESC_ID");
357 antenna1_column_ = ScalarColumn<int>(table, "ANTENNA1");
358 antenna2_column_ = ScalarColumn<int>(table, "ANTENNA2");
359 file_exists_ = std::filesystem::exists(parent_.fileName()) &&
360 std::filesystem::exists(ShapesFilename());
361}
362
363} // namespace casacore
364
365#endif
Non-templated base class for templated Array class.
Definition ArrayBase.h:71
static constexpr uint16_t kVersionMinor
std::optional< ShapesFileWriter > shapes_writer_
static constexpr char kMagic[]
std::vector< IPosition > shape_buffer_
A circular buffer to store the already read shapes.
std::optional< sisco::SiscoReader > reader_
SiscoStManColumn(SiscoStMan &parent, DataType dtype)
Constructor, to be overloaded by subclass.
void setShapeColumn(const IPosition &shape) final
Set the dimensions of values in this column.
void putArrayV(rownr_t row, const ArrayBase &dataPtr) final
Write values into a particular row.
IPosition shape(unsigned row) final
bool canChangeShape() const final
Can the data manager handle chaging the shape of an existing array?
static constexpr size_t kMagicSize
IPosition shape(rownr_t row) final
Get the dimensions of the values in a particular row.
bool isShapeDefined(rownr_t row) final
Is the value shape defined in the given row?
std::optional< ShapesFileReader > shapes_reader_
bool isWritable() const final
Whether this column is writable.
std::map< std::array< int, 5 >, size_t > baseline_ids_
SiscoStManColumn(const SiscoStManColumn &source)=delete
ScalarColumn< int > antenna2_column_
void operator=(const SiscoStManColumn &source)=delete
void setShape(rownr_t, const IPosition &) final
Set the shape of an (variable-shaped) array in the given row.
void setShape(unsigned, const IPosition &) final
size_t GetBaselineId(int field_id, int data_desc_id, int antenna1, int antenna2, int polarization)
void getArrayV(rownr_t row, ArrayBase &dataPtr) final
Read the values for a particular row.
ScalarColumn< int > data_desc_id_column_
static constexpr uint16_t kVersionMajor
static constexpr char kShapesExtension[]
ScalarColumn< int > antenna1_column_
ScalarColumn< int > field_id_column_
std::optional< sisco::SiscoWriter > writer_
bool isShapeDefined(unsigned) final
std::string ShapesFilename() const
static constexpr size_t kHeaderSize
std::vector< std::complex< float > > buffer_
The Stokes I storage manager behaves like a full set of (4) polarizations but only stores the Stokes ...
Definition SiscoStMan.h:24
StManColumn(int dataType)
Default constructor.
Definition StManColumn.h:78
this file contains all the compiler specific defines
Definition mainpage.dox:28
TYPE * array
the allocated array
Definition hdu.h:491
uInt64 rownr_t
Define the type of a row number in a table.
Definition aipsxtype.h:44