Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit adc75fdc authored by Jérémie Dudouet's avatar Jérémie Dudouet
Browse files

Merge branch 'TSFilter' into 'preprod'

Adding the TSFilter actor

See merge request IPNL_GAMMA/narval_emulator!77
parents bd7cd715 3be9f986
......@@ -64,6 +64,7 @@ FILTER TrackingFilterOFT
FILTER TrackingFilterMGT
FILTER BasicATSB
FILTER MFMtoADF
FILTER TSFilter
CONSUMER BasicAFC
CONSUMER TreeBuilder
CONSUMER None pseudo-actor to close a chain without output
......@@ -116,6 +117,8 @@ DISPATCHER BuilderName pseudo actor to connect the output of a chain to an i
#include "GANPRO_MFMtoADF.h" // 37 370
#endif
#include "AGAPRO_TSFilter.h" // 38 380
// 4 CONSUMER
// None // 40 400 fake, used as last of a chain
#include "BasicAFC.h" // 41 410 no mother-daughter model
......@@ -886,6 +889,10 @@ void listKeysAndExit()
delete pf7;
#endif
AGAPRO::TSFilter *pf8 = new AGAPRO::TSFilter;
pf8->GetParameters("", true);
delete pf8;
exit(EXIT_SUCCESS);
}
......@@ -1129,6 +1136,13 @@ bool topologyConfNew()
pActor->libItem = 370;
}
#endif
else if(pActor->libName == "TSFilter") {
cout << "TSFilter ..." << endl;
AGAPRO::TSFilter::process_config(pActor->libConf.c_str(), &error_config);
IF_ERRORCONFIG(error_config);
pActor->nrvPointer = new AGAPRO::TSFilter;
pActor->libItem = 380;
}
ELSE_UNKNOWNCLASS(nc);
break;
case CONSUMER: //// 4
......
......@@ -11,8 +11,9 @@ TstampFilter::TstampFilter()
}
TstampFilter::TstampFilter(const std::string& tstampFile, int tolerance, int id)
TstampFilter::TstampFilter(const std::string& tstampFile, int tolerance, int id, bool binary)
{
fBinaryFormat = binary;
Reset();
Initialize(tstampFile, tolerance, id);
}
......@@ -48,7 +49,16 @@ bool TstampFilter::Initialize(std::string tstampFile, int tolerance, int id)
fTstampFile = tstampFile;
//fTstampFile = GetConfPath() + fTstampFile;
if( (fTstampFILE = fopen(fTstampFile.c_str(),"r")) == NULL) {
cout<<fBinaryFormat<<endl;
if(fBinaryFormat) {
if( (fTstampFILE = fopen(fTstampFile.c_str(),"rb")) == NULL) {
cout << "Error opening " << fTstampFile << endl;
//*error_code = 133;
//Log << dolog;
return false;
}
}
else if( (fTstampFILE = fopen(fTstampFile.c_str(),"r")) == NULL) {
cout << "Error opening " << fTstampFile << endl;
//*error_code = 133;
//Log << dolog;
......@@ -58,7 +68,9 @@ bool TstampFilter::Initialize(std::string tstampFile, int tolerance, int id)
fTimestamp = 0x0FFFFFFFFFFFFFFFULL;
fMyId = id;
if(fMyId < 0) { // only tstamp present in file
int nn = fscanf(fTstampFILE, "%lld", &fTimestamp);
int nn;
if(fBinaryFormat) nn = fread (&fTimestamp,1,sizeof (fTimestamp),fTstampFILE)/sizeof(fTimestamp);
else nn = fscanf(fTstampFILE, "%lld", &fTimestamp);
if(nn != 1) {
cout << "Error reading " << fTstampFile << endl;
//*error_code = 134;
......@@ -71,7 +83,12 @@ bool TstampFilter::Initialize(std::string tstampFile, int tolerance, int id)
id = -1;
int nnr = 0;
while(true) {
int nn = fscanf(fTstampFILE, "%d %lld", & id, &fTimestamp);
int nn;
if(fBinaryFormat) {
nn = fread (&fTimestamp,1,sizeof (fTimestamp),fTstampFILE)/sizeof(fTimestamp);
nn += fread (&id,1,sizeof (id),fTstampFILE)/sizeof(id);
}
else nn = fscanf(fTstampFILE, "%d %lld", & id, &fTimestamp);
if(nn != 2) {
cout << "Error reading " << fTstampFile << endl;
//*error_code = 134;
......@@ -97,8 +114,10 @@ void TstampFilter::NextTstamp()
if(fMyId < 0) {
// timestamp only
while(true) {
int nn = fscanf(fTstampFILE, "%lld", &fTimestamp);
if(nn != 1) {
int nn;
if(fBinaryFormat) nn = fread (&fTimestamp,1,sizeof (fTimestamp),fTstampFILE)/sizeof(fTimestamp);
else nn = fscanf(fTstampFILE, "%lld", &fTimestamp);
if(nn != 1) {
cout << "Error reading " << fTstampFile << endl;
fTimestamp = 0xFFFFFFFFFFFFFFFFULL;
return;
......@@ -114,8 +133,13 @@ void TstampFilter::NextTstamp()
int id = -1;
while(true) {
while(true) {
int nn = fscanf(fTstampFILE, "%d %lld", &id, &fTimestamp);
if(nn != 2) {
int nn;
if(fBinaryFormat) {
nn = fread (&fTimestamp,1,sizeof (fTimestamp),fTstampFILE)/sizeof(fTimestamp);
nn += fread (&id,1,sizeof (id),fTstampFILE)/sizeof(id);
}
else nn = fscanf(fTstampFILE, "%d %lld", & id, &fTimestamp);
if(nn != 2) {
cout << "Error reading " << fTstampFile << endl;
fTimestamp = 0xFFFFFFFFFFFFFFFFULL;
return;
......
......@@ -21,8 +21,10 @@ public:
UInt_t fCount;
//UInt_t fStats[1000];
Bool_t fBinaryFormat = false;
TstampFilter();
TstampFilter(const std::string& tstampFile, int tolerance, int id = -1);
TstampFilter(const std::string& tstampFile, int tolerance, int id = -1, bool binary = false);
virtual ~TstampFilter();
void Reset();
bool Initialize(std::string tstampFile, int tolerance, int id = -1);
......
/* ADF file timestamp filtering class. J. Dudouet & J. Ljungvall 2020
* based on implementation by Olivier Stezowski.
*
*/
#include <iomanip>
#include <iostream>
#include <fstream>
#include <sstream>
#include <cmath>
#include <cstdlib>
#include "AGAPRO_TSFilter.h"
#include "Misc.h"
#include "ConfReader.h"
using namespace AGAPRO;
string TSFilter::gMotherClass = "TSFilter";
string TSFilter::gActualClass;
TSFilter::TSFilter()
{
}
TSFilter::~TSFilter()
{
}
void TSFilter::process_config ( const Char_t *directory_path, UInt_t *error_code )
{
*error_code = 0;
cout << gMotherClass + "::process_config() called with directory_path = " << directory_path << endl;
// first init narval stuff
NarvalInterface::process_config(directory_path, error_code);
if(*error_code) return;
// Get name of daughter class from configuration
// directory_path/gMotherClass.conf
int rv = getKeyFromFile(GetGlobalConfPath() + gMotherClass + ".conf", "ActualClass", gActualClass);
if(rv == 2) *error_code = 102;
// Fatal error because the configuration file MUST be present
}
void TSFilter::process_initialise ( UInt_t *error_code )
{
Log.ClearMessage();
Log.GetProcessName() = gMotherClass;
Log.SetProcessMethod("process_initialise");
Log.SetPID ( GetPID() );
*error_code = GetParameters(GetConfPath() + gMotherClass + ".conf");
if(*error_code) {
Log << dolog;
return;
}
UInt_t rerr = 0;
cout << endl;
if(fTstampFilterFile.size()) {
fTstampFilter = new TstampFilter(GetConfPath()+fTstampFilterFile, fTstampFilterTolerance, fTstampFilterId, fBinary);
if( !fTstampFilter->IsReady() ) {
*error_code = 200 + rerr;
return;
}
}
// create universal trigger
fTrigger = new AgataFrameTrigger("ADF");
fTrigger->Add(FactoryItem("Agata","agata",Version(4,0)), FactoryItem("Agata","agata",Version(0,0)));
fTrigger->Universal(true);
fFrameIn = fTrigger->GetInputSharedFP();
fFrameOut = fTrigger->GetInputSharedFP();
fFrameOut = fTrigger->SetOutputFrame(fFrameOut->GetFrame());
// the trigger is registered
if( !fFrameIO.Register(fTrigger) )
rerr |= 1;
// version-specific initialization
*error_code = AlgoInitialise();
// state set to kIdle so that the data can be treated
fFrameIO.SetStatus(BaseFrameIO::kIdle);
cServer.SetCommandFile(GetConfPath() + gMotherClass + ".live");
cServer.Start(gMotherClass);
Log << dolog;
}
UInt_t TSFilter::GetParameters(const std::string& confFile,
Bool_t doList)
{
ConfReader conf;
conf.Add("ActualClass", "name of daughter class"
, &gActualClass);
conf.Add("TstampFile", "File_with_timestams_to_selct Width_of_selection, ID of crystal (-1 if not present in file)"
, &fTstampFilterFile, &fTstampFilterTolerance, &fTstampFilterId);
conf.Add("Binary", "To be activated if the input ts file is written in binary format"
, &fBinary);
conf.Add("Verbose", "verbosity of printouts"
, &fVerbose);
if(doList) {conf.Show(gMotherClass); return 0;}
int rv = conf.Read(gMotherClass, confFile);
if(rv)
return rv; // Read() error
return rv;
}
UInt_t TSFilter::ProcessBlock (ADF::FrameBlock &inBlock, ADF::FrameBlock &outBlock)
{
// start the processing
UInt_t error_code = 0;
UInt_t nevs = 0;
// attach the input/output buffer to the FrameIO system
fFrameIO.Attach(&inBlock, &outBlock);
while ( fFrameIO.Notify() ) {
Frame *frame_in = fFrameIn->GetFrame();
message_type = ((AgataKey *)frame_in->GetKey())->GetMessage();
timestamp = ((AgataKey *)frame_in->GetKey())->GetTimeStamp();
if((message_type&0xFF00FF00)!=0xFA000200) {
if(!fTstampFilter->CheckTstamp(timestamp)) {
continue;
}
}
fFrameOut->GetFrame()->Copy(fFrameIn->GetFrame());
// ok, so send the produced frame to the ouput
if( !fFrameIO.Record() ) {
error_code = 4;
LOCK_COUT;
Log.SetProcessMethod("ProcessBlock");
Log << error << " During : Record()" << dolog;
break;
}
nevs++;
}
fFrameIO.Detach(&inBlock, &outBlock);
cServer.Exec(timestamp, nevs);
LOCK_COUT;
cServer.Prompt(0, (int)nevs, UInt_t(outBlock.GetSize()), true);
return error_code;
// // process the input
// Long64_t at_in=0,at_out=0;
// while(1) {
// if(at_in>=inBlock.GetSize() || at_out>=outBlock.GetMaxSize()) break;
// int frame_size = *((int*)(inBlock.GetBufferIO()->GetAddress()+at_in));
// if(frame_size==0) break;
// unsigned int message_type = *((unsigned int*)(inBlock.GetBufferIO()->GetAddress()+at_in+4));
// // apply mask 0xFF00FF00 to be sure any conf (0xFA000200) frames are not taken into acount
// if((message_type&0xFF00FF00)!=0xFA000200){
// ULong64_t timestamp = *((ULong64_t*)(inBlock.GetBufferIO()->GetAddress()+at_in+12));
// if(fTstampFilter) {
// if(!fTstampFilter->CheckTstamp(timestamp)) {
// at_in+=frame_size;
// continue;
// }
// }
// }
// std::copy((char*)(inBlock.GetBufferIO()->GetAddress()+at_in),
// (char*)(inBlock.GetBufferIO()->GetAddress()+at_in+frame_size),
// (char*)(outBlock.GetBufferIO()->GetAddress()+at_out));
// at_in+=frame_size;
// at_out+=frame_size;
// outBlock.GetBufferIO()->SetOffset(at_out);
// }
// nevs++;
// cServer.Exec(timestamp, nevs);
// LOCK_COUT;
// cServer.Prompt(0, (int)nevs, UInt_t(outBlock.GetSize()), true);
// return error_code;
}
void TSFilter::process_reset ( UInt_t *error_code )
{
*error_code = 0;
Log.ClearMessage();
Log.SetProcessMethod("process_reset");
fFrameIO.Print ( Log() );
Log << dolog;
//cServer.Reset();
}
void TSFilter::process_start ( UInt_t *error_code )
{
Log.GetProcessName() = "TSFilter";
Log.SetProcessMethod("process_start");
//Log << info << " Start the inner loop " << dolog;
cServer.Start(gMotherClass);
*error_code = 0;
Log << dolog;
}
void TSFilter::process_stop ( UInt_t *error_code )
{
cout << 0
<< "-TSFilter::process_stop called with GetPID() "
<< GetPID() << endl;
*error_code = 0;
cServer.Finish();
}
#ifndef TSFilter_H
#define TSFilter_H
/* Timestamp filter for DF files, J. Ljungvall 2020
* based on implementation by Olivier Stezowski.
*
*/
#include <ctime>
#include "NarvalInterface.h"
#include "CrystalFrame.h"
#include "Trigger.h"
#include "AgataKeyFactory.h"
#include "AgataFrameFactory.h"
#include "commonDefs.h"
#include "CycleServer.h"
#include "TstampFilter.h"
using namespace std;
using namespace ADF;
namespace AGAPRO {
//!Base class for preprocessing filters
class TSFilter : public ADF::NarvalFilter
{
public:
private:
protected:
ULong64_t timestamp = 0;
ULong64_t timestampOld = 0;
unsigned int message_type =0;
std::string fTstampFilterFile = "";
UInt_t fTstampFilterTolerance = 0;
Int_t fTstampFilterId = -1;
TstampFilter *fTstampFilter = nullptr;
//! input and ouput frames
ADF::SharedFP *fFrameIn = nullptr;
ADF::SharedFP *fFrameOut = nullptr;
//! Trigger for the input frame
ADF::AgataFrameTrigger *fTrigger = nullptr;
bool fUseMultiHist;
CycleServer cServer; // manager of cyclic operations
std::string fOdirPrefix = "";
Bool_t fVerbose = false;
Bool_t fBinary = false;
Int_t SetInput(); //! to init your local variables with the ones from the buffers
Int_t SetOutput(); //! to copy the result of the algrithm into the frame through ADFObjects
public:
static std::string gMotherClass; // Static string containing the name of the base class
static std::string gActualClass; // Static string to choose daughter class at run time
TSFilter();
virtual ~TSFilter();
static void process_config(const Char_t *, UInt_t *); //! to init globals (static) from a directory
virtual void process_initialise ( UInt_t *error_code ); //! Constructor implementation
virtual void process_reset ( UInt_t *error_code ); //! Destructor implementation
virtual UInt_t ProcessBlock(ADF::FrameBlock &, ADF::FrameBlock &);
virtual void process_start (UInt_t *error_code);
virtual void process_stop (UInt_t *error_code);
public:
UInt_t GetParameters(const std::string& confFile, Bool_t doList=false);
protected:
virtual Int_t AlgoInitialise() {return 0;}
};
}
#endif // TSFilter_H
#
# Just call the right Cmakefile depending of the building type
#
if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/build/T${AGAPRO_BUILDING_TYPE}/CMakeLists.txt")
include(build/T${AGAPRO_BUILDING_TYPE}/CMakeLists.txt)
else()
message(AUTHOR_WARNING "Building type ${AGAPRO_BUILDING_TYPE} unknown in ${CMAKE_CURRENT_SOURCE_DIR}/CMakeLists.txt")
endif()
# CMake for module of AGAPRO
# O. Stezowski
cmake_minimum_required (VERSION 2.6)
#
project (TSFilter)
#
MESSAGE("[AGAPRO] + ${PROJECT_NAME}")
#
# internal and external dependencies
#
set( EXTRA_INTERNAL_LIBRARIES AgaProCommon )
set( EXTRA_EXTERNAL_LIBRARIES ${ADF_LIBRARY} )
#
# headers / sources
file(GLOB headers ${PROJECT_SOURCE_DIR}/*.h)
file(GLOB sources ${PROJECT_SOURCE_DIR}/*.cpp)
# Remove if any
if ( NOT DO_NARVAL_INTERFACE )
list(REMOVE_ITEM sources ${PROJECT_SOURCE_DIR}/libAGAPRO_TSFilter.cpp)
endif()
if ( HAS_SSE )
ADD_DEFINITIONS(-msse4)
MESSAGE("[AGAPRO] SSE FOUND ")
endif()
# add to includes
#
include_directories(${AGAPRO_INCLUDE_DIRS})
#----------------------------------------------------------------------------
#
add_library(AGAPRO_TSFilter SHARED ${sources})
#
target_link_libraries(AGAPRO_TSFilter ${EXTRA_EXTERNAL_LIBRARIES} ${EXTRA_INTERNAL_LIBRARIES})
#
set_target_properties(AGAPRO_TSFilter PROPERTIES ${AGAPRO_LIBRARY_PROPERTIES})
#
install(TARGETS AGAPRO_TSFilter RUNTIME DESTINATION bin
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
COMPONENT libraries)
install(FILES ${headers} DESTINATION include COMPONENT headers )
#
# copy headers file in a single directory
# because cmake2.6 does not provide COPY in file, do it another way
if( ${CMAKE_VERSION} VERSION_GREATER "2.8.0" )
file(COPY ${headers} DESTINATION ${CMAKE_BINARY_DIR}/include)
else()
if ( EXISTS ${CMAKE_BINARY_DIR}/include )
else()
execute_process(COMMAND ${CMAKE_COMMAND} -E make_directory ${CMAKE_BINARY_DIR}/include/ )
endif()
foreach ( f ${headers} )
execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${f} ${CMAKE_BINARY_DIR}/include/ )
endforeach()
endif()
# =============================================================================
# ++++++++++++++++++++++++++++ Globals definitions ++++++++++++++++++++++++++++
#
# Modern CMake for module of AGAPRO
# [O. Stezowski]
#
# Effective cmake required
cmake_minimum_required (VERSION 3.5)
#
project (TSFilter)
#
message(STATUS " + ${PROJECT_NAME}")
#
# ---------------------------- Globals definitions ----------------------------
# =============================================================================
# =============================================================================
# ++++++++++++++++++++++++++++ Targets definitions ++++++++++++++++++++++++++++
#
# Main target is to produce a shared library
add_library(${PROJECT_NAME} SHARED
AGAPRO_TSFilter.cpp
libAGAPTO_TSFilter.cpp
)
#
# define public headers to be installed
set_target_properties(${PROJECT_NAME} PROPERTIES PUBLIC_HEADER "AGAPRO_TSFilter.h")
#
# alias for full tree
add_library(AGAPRO::${PROJECT_NAME} ALIAS ${PROJECT_NAME})
#
# directories for includes
target_include_directories( ${PROJECT_NAME}
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>
)
target_link_libraries(${PROJECT_NAME}
ADF
AGAPRO::AgaProCommon
)
#
# C++11 required for thread support / true since it had a config file by cmake
target_compile_features(${PROJECT_NAME} PUBLIC cxx_constexpr)
target_compile_definitions(${PROJECT_NAME} PUBLIC HAS_AGAPROCONFIG)
#target_compile_options(${PROJECT_NAME} PRIVATE -msse4)
set_target_properties(${PROJECT_NAME} PROPERTIES PREFIX libAGAPRO_)
#
# ---------------------------- Targets definitions ----------------------------
# =============================================================================
# =============================================================================
# ++++++++++++++++++++++++++ Install local targets ++++++++++++++++++++++++++++
#
install(TARGETS ${PROJECT_NAME}
EXPORT ${PROJECT_NAME}Targets
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
INCLUDES DESTINATION include
PUBLIC_HEADER DESTINATION include
)
#
# INSTALL CMAKE TARGETS
#
install(EXPORT ${PROJECT_NAME}Targets
FILE
${PROJECT_NAME}Targets.cmake
NAMESPACE
AGAPRO::
DESTINATION
lib/cmake/${PROJECT_NAME}
)
#
# Add this target to the full list of AGAPRO targets
#
set_property(GLOBAL APPEND PROPERTY LIST_OF_AGAPRO_TARGETS ${PROJECT_NAME})
#
# --------------------------- Install local targets ---------------------------
# =============================================================================
#include "AGAPRO_TSFilter.h"
using namespace AGAPRO;
extern "C" {
TSFilter *process_register(UInt_t *error_code)
{
if( (TSFilter::gActualClass.size() == 0) ||
(TSFilter::gActualClass == "basic") ||
(TSFilter::gActualClass == "TSFilter") ) {
std::cout << "\nTSFilter::gActualClass == \"TSFilter base class\"\n";
return new TSFilter();
}
else {
std