casacore
Loading...
Searching...
No Matches
Lane.h
Go to the documentation of this file.
1#ifndef CASACORE_AOCOMMON_LANE_11_H_
2#define CASACORE_AOCOMMON_LANE_11_H_
3
4#include <condition_variable>
5#include <cstring>
6#include <deque>
7#include <mutex>
8
17
18//#define LANE_DEBUG_MODE
19
20#ifdef LANE_DEBUG_MODE
21#include <cmath>
22#include <iostream>
23#include <sstream>
24#include <string>
25#endif
26
28
29#ifdef LANE_DEBUG_MODE
30#define set_lane_debug_name(lane, str) (lane).setDebugName(str)
31#define LANE_REGISTER_DEBUG_INFO registerDebugInfo()
32#define LANE_REGISTER_DEBUG_WRITE_WAIT registerDebugWriteWait()
33#define LANE_REGISTER_DEBUG_READ_WAIT registerDebugReadWait()
34#define LANE_REPORT_DEBUG_INFO reportDebugInfo()
35
36#else
37
38#define set_lane_debug_name(lane, str)
39#define LANE_REGISTER_DEBUG_INFO
40#define LANE_REGISTER_DEBUG_WRITE_WAIT
41#define LANE_REGISTER_DEBUG_READ_WAIT
42#define LANE_REPORT_DEBUG_INFO
43
44#endif
45
99template <typename Tp>
100class Lane {
101 public:
103 typedef std::size_t size_type;
104
106 typedef Tp value_type;
107
116 Lane() noexcept
117 : _buffer(nullptr),
118 _capacity(0),
122
128 explicit Lane(size_t capacity)
129 : _buffer(new Tp[capacity]),
134
135 Lane(const Lane<Tp>& source) = delete;
136
142 Lane(Lane<Tp>&& source) noexcept
143 : _buffer(nullptr),
144 _capacity(0),
148 swap(source);
149 }
150
156 delete[] _buffer;
157 }
158
159 Lane<Tp>& operator=(const Lane<Tp>& source) = delete;
160
167 Lane<Tp>& operator=(Lane<Tp>&& source) noexcept {
168 swap(source);
169 return *this;
170 }
171
176 void swap(Lane<Tp>& other) noexcept {
177 std::swap(_buffer, other._buffer);
178 std::swap(_capacity, other._capacity);
179 std::swap(_write_position, other._write_position);
180 std::swap(_free_write_space, other._free_write_space);
181 std::swap(_status, other._status);
182 }
183
191 void clear() noexcept {
192 _write_position = 0;
195 }
196
206 void write(const value_type& element) {
207 std::unique_lock<std::mutex> lock(_mutex);
209
210 while (_free_write_space == 0 && _status == status_normal) {
213 }
214 if (_status == status_normal) {
215 _buffer[_write_position] = element;
218 // Now that there is less free write space, there is more free read
219 // space and thus readers may continue.
220 _reading_possible_condition.notify_all();
221 }
222 }
223
235 template <typename... Args>
236 void emplace(Args&&... args) {
237 std::unique_lock<std::mutex> lock(_mutex);
239
240 while (_free_write_space == 0 && _status == status_normal) {
243 }
244
245 if (_status == status_normal) {
246 _buffer[_write_position] = value_type(std::forward<Args>(args)...);
249 // Now that there is less free write space, there is more free read
250 // space and thus readers can possibly continue.
251 _reading_possible_condition.notify_all();
252 }
253 }
254
263 void write(value_type&& element) {
264 std::unique_lock<std::mutex> lock(_mutex);
266
267 while (_free_write_space == 0 && _status == status_normal) {
270 }
271 if (_status == status_normal) {
272 _buffer[_write_position] = std::move(element);
275 // Now that there is less free write space, there is more free read
276 // space and thus readers can possibly continue.
277 _reading_possible_condition.notify_all();
278 }
279 }
280
281 void write(const value_type* elements, size_t n) {
282 write_generic(elements, n);
283 }
284
285 void move_write(value_type* elements, size_t n) {
286 write_generic(elements, n);
287 }
288
289 bool read(value_type& destination) {
290 std::unique_lock<std::mutex> lock(_mutex);
292 while (free_read_space() == 0 && _status == status_normal) {
295 }
296 if (free_read_space() == 0)
297 return false;
298 else {
299 destination = std::move(_buffer[read_position()]);
301 // Now that there is more free write space, writers can possibly continue.
302 _writing_possible_condition.notify_all();
303 return true;
304 }
305 }
306
307 size_t read(value_type* destinations, size_t n) {
308 size_t n_left = n;
309
310 std::unique_lock<std::mutex> lock(_mutex);
312
313 size_t free_space = free_read_space();
314 size_t read_size = free_space > n ? n : free_space;
315 immediate_read(destinations, read_size);
316 n_left -= read_size;
317
318 while (n_left != 0 && _status == status_normal) {
319 destinations += read_size;
320
321 do {
324 } while (free_read_space() == 0 && _status == status_normal);
325
326 free_space = free_read_space();
327 read_size = free_space > n_left ? n_left : free_space;
328 immediate_read(destinations, read_size);
329 n_left -= read_size;
330 }
331 return n - n_left;
332 }
333
339 size_t discard(size_t n) {
340 size_t n_left = n;
341
342 std::unique_lock<std::mutex> lock(_mutex);
344
345 size_t free_space = free_read_space();
346 size_t read_size = free_space > n ? n : free_space;
347 immediate_discard(read_size);
348 n_left -= read_size;
349
350 while (n_left != 0 && _status == status_normal) {
351 do {
354 } while (free_read_space() == 0 && _status == status_normal);
355
356 free_space = free_read_space();
357 read_size = free_space > n_left ? n_left : free_space;
358 immediate_discard(read_size);
359 n_left -= read_size;
360 }
361 return n - n_left;
362 }
363
364 void write_end() {
365 std::lock_guard<std::mutex> lock(_mutex);
368 _writing_possible_condition.notify_all();
369 _reading_possible_condition.notify_all();
370 }
371
372 size_t capacity() const noexcept { return _capacity; }
373
374 size_t size() const {
375 std::lock_guard<std::mutex> lock(_mutex);
377 }
378
379 bool empty() const {
380 std::lock_guard<std::mutex> lock(_mutex);
382 }
383
388 bool is_end() const {
389 std::lock_guard<std::mutex> lock(_mutex);
390 return _status == status_end;
391 }
392
396 bool is_end_and_empty() const {
397 std::lock_guard<std::mutex> lock(_mutex);
399 }
400
405 void resize(size_t new_capacity) {
406 Tp* new_buffer = new Tp[new_capacity];
407 delete[] _buffer;
408 _buffer = new_buffer;
409 _capacity = new_capacity;
410 _write_position = 0;
411 _free_write_space = new_capacity;
413 }
414
419 std::unique_lock<std::mutex> lock(_mutex);
420 while (_capacity != _free_write_space) {
422 }
423 }
424
425#ifdef LANE_DEBUG_MODE
432 void setDebugName(const std::string& nameStr) { _debugName = nameStr; }
433#endif
434 private:
436
437 size_t _capacity;
438
440
442
444
445 mutable std::mutex _mutex;
446
447 std::condition_variable _writing_possible_condition,
449
450 size_t read_position() const noexcept {
452 }
453
454 size_t free_read_space() const noexcept {
456 }
457
458 // This is a template to allow const and non-const (to be able to move)
459 template <typename T>
460 void write_generic(T* elements, size_t n) {
461 std::unique_lock<std::mutex> lock(_mutex);
463
464 if (_status == status_normal) {
465 size_t write_size = _free_write_space > n ? n : _free_write_space;
466 immediate_write(elements, write_size);
467 n -= write_size;
468
469 while (n != 0 && _status == status_normal) {
470 elements += write_size;
471
472 do {
475 } while (_free_write_space == 0 && _status == status_normal);
476
477 write_size = _free_write_space > n ? n : _free_write_space;
478 immediate_write(elements, write_size);
479 n -= write_size;
480 }
481 }
482 }
483
484 // This is a template to allow const and non-const (to be able to move)
485 template <typename T>
486 void immediate_write(T* elements, size_t n) noexcept {
487 // Split the writing in two ranges if needed. The first range fits in
488 // [_write_position, _capacity), the second range in [0, end). By doing
489 // so, we only have to calculate the modulo in the write position once.
490 if (n > 0) {
491 size_t nPart;
492 if (_write_position + n > _capacity) {
493 nPart = _capacity - _write_position;
494 } else {
495 nPart = n;
496 }
497 for (size_t i = 0; i < nPart; ++i, ++_write_position) {
498 _buffer[_write_position] = std::move(elements[i]);
499 }
500
502
503 for (size_t i = nPart; i < n; ++i, ++_write_position) {
504 _buffer[_write_position] = std::move(elements[i]);
505 }
506
508
509 // Now that there is less free write space, there is more free read
510 // space and thus readers may continue.
511 _reading_possible_condition.notify_all();
512 }
513 }
514
515 void immediate_read(value_type* elements, size_t n) noexcept {
516 // As with write, split in two ranges if needed. The first range fits in
517 // [read_position(), _capacity), the second range in [0, end).
518 if (n > 0) {
519 size_t nPart;
520 size_t position = read_position();
521 if (position + n > _capacity) {
522 nPart = _capacity - position;
523 } else {
524 nPart = n;
525 }
526 for (size_t i = 0; i < nPart; ++i, ++position) {
527 elements[i] = std::move(_buffer[position]);
528 }
529
530 position = position % _capacity;
531
532 for (size_t i = nPart; i < n; ++i, ++position) {
533 elements[i] = std::move(_buffer[position]);
534 }
535
537
538 // Now that there is more free write space, writers can possibly continue.
539 _writing_possible_condition.notify_all();
540 }
541 }
542
543 void immediate_discard(size_t n) noexcept {
544 if (n > 0) {
546
547 // Now that there is more free write space, writers can possibly continue.
548 _writing_possible_condition.notify_all();
549 }
550 }
551
552#ifdef LANE_DEBUG_MODE
553 void registerDebugInfo() noexcept {
554 _debugSummedSize += _capacity - _free_write_space;
555 _debugMeasureCount++;
556 }
557 void registerDebugReadWait() noexcept { ++_debugReadWaitCount; }
558 void registerDebugWriteWait() noexcept { ++_debugWriteWaitCount; }
559 void reportDebugInfo() {
560 if (!_debugName.empty()) {
561 std::stringstream str;
562 str << "*** Debug report for the following Lane: ***\n"
563 << "\"" << _debugName << "\"\n"
564 << "Capacity: " << _capacity << '\n'
565 << "Total read/write ops: " << _debugMeasureCount << '\n'
566 << "Average size of buffer, measured per read/write op.: "
567 << round(double(_debugSummedSize) * 100.0 / _debugMeasureCount) /
568 100.0
569 << '\n'
570 << "Number of wait events during reading: " << _debugReadWaitCount
571 << '\n'
572 << "Number of wait events during writing: " << _debugWriteWaitCount
573 << '\n';
574 std::cout << str.str();
575 }
576 }
577 std::string _debugName;
578 size_t _debugSummedSize = 0, _debugMeasureCount = 0, _debugReadWaitCount = 0,
579 _debugWriteWaitCount = 0;
580#endif
581};
582
583template <typename Tp>
585 first.swap(second);
586}
587
588} // namespace aocommon
589
590#endif // AO_LANE11_H
#define LANE_REGISTER_DEBUG_WRITE_WAIT
Definition Lane.h:40
#define LANE_REGISTER_DEBUG_INFO
Definition Lane.h:39
#define LANE_REGISTER_DEBUG_READ_WAIT
Definition Lane.h:41
#define LANE_REPORT_DEBUG_INFO
Definition Lane.h:42
The Lane is an efficient cyclic buffer that is synchronized.
Definition Lane.h:100
std::mutex _mutex
Definition Lane.h:445
std::condition_variable _writing_possible_condition
Definition Lane.h:447
void immediate_discard(size_t n) noexcept
Definition Lane.h:543
void emplace(Args &&... args)
Write a single element by constructing it.
Definition Lane.h:236
Lane(const Lane< Tp > &source)=delete
bool is_end() const
True when write_end() was called.
Definition Lane.h:388
void move_write(value_type *elements, size_t n)
Definition Lane.h:285
~Lane()
Destructor.
Definition Lane.h:154
void clear() noexcept
Clear the contents and reset the state of the Lane.
Definition Lane.h:191
void swap(Lane< Tp > &other) noexcept
Swap the contents of this Lane with another.
Definition Lane.h:176
Lane< Tp > & operator=(Lane< Tp > &&source) noexcept
Move assignment.
Definition Lane.h:167
void wait_for_empty()
Wait until this Lane is empty.
Definition Lane.h:418
Lane< Tp > & operator=(const Lane< Tp > &source)=delete
void resize(size_t new_capacity)
Change the capacity of the Lane.
Definition Lane.h:405
void write(value_type &&element)
Write a single element by moving it in.
Definition Lane.h:263
void immediate_write(T *elements, size_t n) noexcept
This is a template to allow const and non-const (to be able to move).
Definition Lane.h:486
Lane(Lane< Tp > &&source) noexcept
Move construct a Lane.
Definition Lane.h:142
size_t capacity() const noexcept
Definition Lane.h:372
Tp value_type
Type of elements stored in the Lane.
Definition Lane.h:106
size_t size() const
Definition Lane.h:374
void write(const value_type *elements, size_t n)
Definition Lane.h:281
enum casacore::aocommon::Lane::@165250377254016164334045006074227223211343376364 _status
size_t discard(size_t n)
This method does the same thing as read(buffer, n) but discards the data.
Definition Lane.h:339
std::size_t size_type
Integer type used to store size types.
Definition Lane.h:103
size_t read_position() const noexcept
Definition Lane.h:450
void write_generic(T *elements, size_t n)
This is a template to allow const and non-const (to be able to move).
Definition Lane.h:460
bool empty() const
Definition Lane.h:379
void write(const value_type &element)
Write a single element.
Definition Lane.h:206
size_t read(value_type *destinations, size_t n)
Definition Lane.h:307
std::condition_variable _reading_possible_condition
Definition Lane.h:448
Lane(size_t capacity)
Construct a Lane with the given capacity.
Definition Lane.h:128
size_t free_read_space() const noexcept
Definition Lane.h:454
bool is_end_and_empty() const
True when write_end() and the lane does not contain items.
Definition Lane.h:396
void immediate_read(value_type *elements, size_t n) noexcept
Definition Lane.h:515
bool read(value_type &destination)
Definition Lane.h:289
Lane() noexcept
Construct a Lane with zero elements.
Definition Lane.h:116
struct Node * first
Definition malloc.h:328
void swap(aocommon::Lane< Tp > &first, aocommon::Lane< Tp > &second) noexcept
Definition Lane.h:584
LatticeExprNode round(const LatticeExprNode &expr)