35#ifndef PARALLEL_TRONCONNEUSE_HPP
36#define PARALLEL_TRONCONNEUSE_HPP
38#include "../my_config.h"
48#include <libthreadar/libthreadar.hpp>
64 enum class tronco_flags { normal = 0, stop = 1, eof = 2, die = 3, data_error = 4, exception_below = 5, exception_worker = 6, exception_error = 7 };
104 std::unique_ptr<crypto_module> & ptr);
189 enum class thread_status { running, suspended, dead };
212 std::deque<std::unique_ptr<crypto_segment> > lus_data;
213 std::deque<signed int> lus_flags;
219 std::unique_ptr<crypto_segment> tempo_write;
224 std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > scatter;
225 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > gather;
226 std::shared_ptr<libthreadar::barrier> waiter;
227 std::shared_ptr<heap<crypto_segment> > tas;
231 std::deque<std::unique_ptr<crypto_worker> > travailleur;
232 std::unique_ptr<read_below> crypto_reader;
233 std::unique_ptr<write_below> crypto_writer;
308 static U_I get_ratelier_size(U_I num_worker) {
return num_worker + num_worker/2; };
309 static U_I get_heap_size(U_I num_worker);
319 class read_below:
public libthreadar::thread
322 read_below(
const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & to_workers,
323 const std::shared_ptr<libthreadar::barrier> & waiter,
325 U_I clear_block_size,
326 generic_file* encrypted_side,
327 const std::shared_ptr<heap<crypto_segment> > xtas,
331 ~read_below() {
if(ptr) tas->put(std::move(ptr)); cancel(); join(); };
343 void set_initial_shift(
const infinint & x) { initial_shift = x; };
351 void set_pos(
const infinint & pos) { skip_to = pos; };
371 const infinint & get_clear_flow_start()
const {
return clear_flow_start; };
380 const infinint & get_pos_in_flow()
const {
return pos_in_flow; };
383 void read_ahead_up_to(
const infinint & offset);
387 virtual void inherited_run()
override;
390 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > workers;
391 std::shared_ptr<libthreadar::barrier> waiting;
394 generic_file* encrypted;
395 archive_version version;
396 std::shared_ptr<heap<crypto_segment> > tas;
397 infinint initial_shift;
400 std::unique_ptr<crypto_segment> ptr;
403 infinint read_ahead_offset;
404 libthreadar::mutex ra_cntrl;
409 U_I encrypted_buf_size;
415 infinint clear_flow_start;
416 infinint pos_in_flow;
419 infinint get_ready_for_new_offset();
423 void position_clear2crypt(
const infinint & pos,
424 infinint & file_buf_start,
425 infinint & clear_buf_start,
426 infinint & pos_in_buf,
427 infinint & block_num);
430 void check_read_ahead();
440 class write_below:
public libthreadar::thread
443 write_below(
const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & from_workers,
444 const std::shared_ptr<libthreadar::barrier> & waiter,
446 generic_file* encrypted_side,
447 const std::shared_ptr<heap<crypto_segment> > xtas);
449 ~write_below() { cancel(); join(); };
451 bool exception_pending()
const {
return error; };
452 const infinint & get_error_block()
const {
return error_block; };
455 virtual void inherited_run()
override;
458 std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > workers;
459 std::shared_ptr<libthreadar::barrier> waiting;
462 generic_file* encrypted;
463 std::shared_ptr<heap<crypto_segment> > tas;
465 infinint error_block;
466 std::deque<std::unique_ptr<crypto_segment> >ones;
467 std::deque<signed int> flags;
480 class crypto_worker:
public libthreadar::thread
483 crypto_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
484 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_side,
485 std::shared_ptr<libthreadar::barrier> waiter,
486 std::unique_ptr<crypto_module> && ptr,
489 virtual ~crypto_worker() { cancel(); join(); };
492 virtual void inherited_run()
override;
495 enum class status { fine, inform, sent };
497 std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
498 std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
499 std::shared_ptr<libthreadar::barrier> waiting;
500 std::unique_ptr<crypto_module> crypto;
502 std::unique_ptr<crypto_segment> ptr;
class archive_version that rules which archive format to follow
class archive_version manages the version of the archive format
this is the interface class from which all other data transfer classes inherit
bool is_terminated() const
void sync_write()
write any pending data
the arbitrary large positive integer class
this is a partial implementation of the generic_file interface to cypher/decypher data block by block...
bool send_read_order(tronco_flags order, const infinint &for_offset=0)
send and order to subthreads and gather acks from them
void go_read()
wake up threads in read mode when necessary
virtual void inherited_terminate() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual void inherited_write(const char *a, U_I size) override
inherited from generic_file
virtual U_32 get_clear_block_size() const override
returns the block size given to constructor
void run_threads()
reset the interthread datastructure and launch the threads
virtual void inherited_sync_write() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual infinint get_position() const override
inherited from generic_file
void join_threads()
wait for threads to finish and eventually rethrow their exceptions in current thread
virtual void write_end_of_file() override
in write_only mode indicate that end of file is reached
virtual bool skippable(skippability direction, const infinint &amount) override
inherited from generic_file
bool purge_unack_stop_order(const infinint &pos=0)
removing the ignore_stop_acks pending on the pipe
virtual void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) override
parallel_tronconneuse(parallel_tronconneuse &&ref)=default
move constructor
std::unique_ptr< crypto_module > crypto
the crypto module use to cipher / uncipher block of data
tronco_flags purge_ratelier_from_next_order(infinint pos=0)
purge the ratelier from the next order which is provided as returned value
virtual bool skip_to_eof() override
inherited from generic_file
archive_version reading_ver
archive format we follow
parallel_tronconneuse(U_I workers, U_32 block_size, generic_file &encrypted_side, const archive_version &reading_ver, std::unique_ptr< crypto_module > &ptr)
This is the constructor.
virtual bool skip(const infinint &pos) override
inherited from generic_file
~parallel_tronconneuse() noexcept
destructor
parallel_tronconneuse & operator=(const parallel_tronconneuse &ref)=delete
assignment operator
virtual bool skip_relative(S_I x) override
inherited from generic_file
virtual void inherited_truncate(const infinint &pos) override
this prorected inherited method is now private for inherited classed of tronconneuse
virtual U_I inherited_read(char *a, U_I size) override
this protected inherited method is now private for inherited classes of tronconneuse
bool check_bytes_to_skip
whether to check for bytes to skip
void send_write_order(tronco_flags order)
send order in write mode
parallel_tronconneuse(const parallel_tronconneuse &ref)=delete
copy constructor
U_I ignore_stop_acks
how much stop ack still to be read (aborted stop order context)
infinint initial_shift
the offset in the "encrypted" below layer at which starts the encrypted data
U_32 clear_block_size
size of a clear block
virtual void inherited_flush_read() override
this protected inherited method is now private for inherited classes of tronconneuse
virtual bool truncatable(const infinint &pos) const override
inherited from generic_file
U_I num_workers
number of worker threads
bool find_offset_in_lus_data(const infinint &pos)
flush lus_data/lus_flags up to requested pos offset to be found or all data has been removed
void stop_threads()
end threads taking into account the fact they may be suspended on the barrier
thread_status t_status
wehther child thread are waiting us on the barrier
void read_refill()
fill lus_data/lus_flags from ratelier_gather if these are empty
virtual void set_initial_shift(const infinint &x) override
this method to modify the initial shift. This overrides the constructor "no_initial_shift" of the con...
virtual void inherited_read_ahead(const infinint &amount) override
this protected inherited method is now private for inherited classes of tronconneuse
infinint current_position
current position for the upper layer perspective (modified by skip*, inherited_read/write,...
void join_workers_only()
call by join_threads() below just code simplification around exception handling
per block cryptography implementation
defines unit block of information ciphered as once
infinint(* trailing_clear_data_callback)(generic_file &below, const archive_version &reading_ver)
the trailing_clear_data_callback call back is a mean by which the upper layer cat tell when encrypted...
tronco_flags
status flags used between parallel_tronconneuse and its sub-threads
heap data structure (relying on FIFO)
switch module to limitint (32 ou 64 bits integers) or infinint
@ error
neither big nor little endian! (libdar cannot run on such system)
libdar namespace encapsulate all libdar symbols
defines common interface for tronconneuse and parallel_tronconneuse