Commit 264b2dbd authored by Jérémie Dudouet's avatar Jérémie Dudouet
Browse files

Adding Histogramer actors

parent 55182d05
......@@ -6,6 +6,7 @@ project(consumers)
MESSAGE("[AGAPRO] + ${PROJECT_NAME}")
add_subdirectory(Basic)
add_subdirectory(Histogramer)
if (ROOT_FOUND)
add_subdirectory(TreeBuilder)
......
/***************************************************************************
* Copyright (C) 2018 by Jeremie Dudouet *
* jeremie.dudouet(AT)csnsm.in2p3.fr *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the *
* Free Software Foundation, Inc., *
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
#ifndef AGAPRO_Histogramer
#include "AGAPRO_Histogramer.h"
#endif
#include <iostream>
#include <iomanip>
#include <fstream>
#include <sstream>
#include <cmath>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include "PSAFilter.h"
#include "AgataKeyFactory.h"
#include "AgataFrameFactory.h"
#include "adetParams.h"
#include "commonDefs.h"
using namespace ADF;
using namespace AGAPRO;
string Histogramer::gMotherClass = "Histogramer";
Histogramer::Histogramer() :
NarvalConsumer()
{
}
Histogramer::~Histogramer()
{
// in principle not needed ... just in case reset it has not been called by narval
UInt_t error = 0u; process_reset(&error) ;
}
void Histogramer::process_config(const Char_t *directory_path, UInt_t *error_code)
{
*error_code = 0;
cout << gMotherClass + "::process_config() called with directory_path = " << directory_path << endl;
// first init narval stuff
NarvalInterface::process_config(directory_path, error_code);
if( *error_code )
return;
}
UInt_t Histogramer::ProcessBlock(ADF::FrameBlock &inBlock)
{
// attach the input/output buffer to the FrameIO system
fFrameIO.Attach(&inBlock, 0x0);
// start the processing
UInt_t error_code = 0;
UInt_t nevs = 0;
while ( fFrameIO.Notify() )
{
// fill local variables with data from the input
error_code = SetInput();
if( error_code == 1)
{
LOCK_COUT;
Log.SetProcessMethod("ProcessBlock");
Log << error << " During : SetInput()" << dolog;
break;
}
else
{
Process();
}
nevs++;
fReadEvts++;
}
fFrameIO.Detach(&inBlock, 0x0);
cServer.Exec(timestamp, nevs);
LOCK_COUT;
cServer.Prompt(crystal_id, (int)nevs, UInt_t(inBlock.GetSize()), true );
return error_code;
}
void Histogramer::process_reset (UInt_t *error_code)
{
*error_code = 0;
Log.ClearMessage();
Log.SetProcessMethod("process_reset");
fFrameIO.Print(Log());
cout << " #################################### " << endl;
cout << " Number of read events " << fReadEvts << endl;
cout << " #################################### " << endl;
fReadEvts = 0;
//cServer.Reset();
Log << dolog;
}
void Histogramer::process_start (UInt_t *error_code)
{
Log.GetProcessName() = "Histogramer";
Log.SetProcessMethod("process_start");
cServer.Start(gActualClass);
*error_code = 0;
Log << dolog;
}
void Histogramer::process_stop (UInt_t *error_code)
{
std::cout << "-Histogramer::process_stop called with GetPID() " << GetPID() << std::endl;
*error_code = 0;
cServer.Finish();
}
std::string Histogramer::GetCrystalNameFromID(const Int_t id)
{
Int_t ClusterNbr = (Int_t)id/3;
string Letter[3] = {"A","B","C"};
ostringstream ss;
ss<<setw(2)<<setfill('0')<<ClusterNbr<<Letter[id%3];
return ss.str();
}
UShort_t Histogramer::GetCrystalIDFromName(const string name)
{
string name_tmp(name);
ostringstream ss;
ss<<name_tmp[2];
string Letter = ss.str();
name_tmp.pop_back();
Int_t id = atoi(name_tmp.data())*3;
if(Letter == "B")
id += 1;
else if(Letter == "C")
id += 2;
return id;
}
std::string Histogramer::GetCoreNameFromID(const Int_t id)
{
string name;
if(id==0)
name="HG";
else if(id==1)
name="LG";
else
name="Id_unknown";
return name;
}
std::string Histogramer::GetSegmentNameFromID(const Int_t id)
{
string name;
if(id<=36){
Int_t SegNr = (Int_t)id%6+1;
string Letter[6] = {"A","B","C","D","E","F"};
ostringstream ss;
ss<<Letter[id/6]<<SegNr;
name = ss.str();
}
else
name="Id_unknown";
return name;
}
Hist1F *Histogramer::NewHist1F(std::string name, std::string title, std::string xtitle, std::string gruname, int nbins, float min, float max, ActorMap *map_ptr, Int_t PortNumber)
{
Log.ClearMessage();
Log.SetProcessMethod("NewHist1F");
fFrameIO.Print(Log());
if(nbins>MAX_TH1BINS)
{
Log.SetLevel(LogMessage::kError);
Log << name << ": Cannot define more than " << MAX_TH1BINS << " in TH1F axis" << dolog;
return nullptr;
}
if(map_ptr->N_TH1_Entries == MAX_TH1HISTS)
{
Log.SetLevel(LogMessage::kError);
Log << "Cannot allocate more than " << MAX_TH1HISTS << " TH1F, in the shared memory" << dolog;
return nullptr;
}
Hist1F *histptr = &map_ptr->hist1f_list[map_ptr->N_TH1_Entries];
strcpy(histptr->name,name.data());
strcpy(histptr->title,title.data());
strcpy(histptr->gruname,gruname.data());
histptr->spec_number = PortNumber*10000+1000 + map_ptr->N_TH1_Entries; //2 first digits: actor ; 3 next: crystal id ; 1next: (1 -> TH1 ; 2 -> TH2) ; 3 next, spectrum id
histptr->nevts = 0;
histptr->xbinmin=min;
histptr->xbinmax=max;
histptr->xnbins = nbins;
strcpy(histptr->xtitle,xtitle.data());
memset(histptr->bins, 0, sizeof(histptr->bins));
map_ptr->N_TH1_Entries++;
return histptr;
}
Hist2F *Histogramer::NewHist2F(std::string name, std::string title, std::string xtitle, std::string ytitle, std::string gruname, int nbinsx, float xmin, float xmax, int nbinsy, float ymin, float ymax, ActorMap *map_ptr, Int_t PortNumber)
{
Log.ClearMessage();
Log.SetProcessMethod("NewHist2F");
fFrameIO.Print(Log());
Hist2F *histptr = nullptr;
if(nbinsx>MAX_TH2BINS || nbinsy>MAX_TH2BINS)
{
Log.SetLevel(LogMessage::kError);
Log << name << ": Cannot define more than " << MAX_TH2BINS << " in TH2F axis" << dolog;
return nullptr;
}
if(map_ptr->N_TH2_Entries == MAX_TH2HISTS)
{
Log.SetLevel(LogMessage::kError);
Log << "Cannot allocate more than " << MAX_TH2HISTS << " TH2F, in the shared memory" << dolog;
return nullptr;
}
histptr = &map_ptr->hist2f_list[map_ptr->N_TH2_Entries];
strcpy(histptr->name,name.data());
strcpy(histptr->title,title.data());
strcpy(histptr->gruname,gruname.data());
histptr->spec_number = PortNumber*10000+2000 + map_ptr->N_TH2_Entries; //2 first digits: actor ; 3 next: crystal id ; 1next: (1 -> TH1 ; 2 -> TH2) ; 3 next, spectrum id
histptr->nevts = 0;
histptr->xnbins = nbinsx;
histptr->xbinmin= xmin;
histptr->xbinmax= xmax;
strcpy(histptr->xtitle,xtitle.data());
histptr->ynbins = nbinsy;
histptr->ybinmin= ymin;
histptr->ybinmax= ymax;
strcpy(histptr->ytitle,ytitle.data());
memset(histptr->bins, 0, sizeof(histptr->bins));
map_ptr->N_TH2_Entries++;
return histptr;
}
void Histogramer::ResetHist1F(Hist1F *hist)
{
memset(hist->bins, 0, sizeof(hist->bins));
hist->nevts = 0;
}
void Histogramer::FillHist1F(Hist1F *hist, Float_t val, Float_t weight)
{
// //Check if a reset has been done from the client
// Float_t Integral=0;
// for(int i=0 ; i<hist->nbins+2 ; i++)
// Integral+=hist->bins[i];
// if(Integral==0)
// hist->nevts = 0;
Int_t bin = 1 + Int_t(hist->xnbins*(val-hist->xbinmin)/(hist->xbinmax-hist->xbinmin));
//check underflows and overflows
if(bin<0) bin=0;
else if(bin>hist->xnbins) bin=hist->xnbins+1;
hist->bins[bin] += weight;
hist->nevts++;
// cout<<"val = "<<val<<" ; bin "<<bin<<" filled : "<<hist->bins[bin]<<" ; nevts = "<<hist->nevts<<endl;
}
void Histogramer::FillHist2F(Hist2F *hist, Float_t valx, Float_t valy, Float_t weight)
{
Int_t binx = 1 + Int_t(hist->xnbins*(valx-hist->xbinmin)/(hist->xbinmax-hist->xbinmin));
if(binx<0) binx=0;
else if(binx>hist->xnbins) binx=hist->xnbins+1;
Int_t biny = 1 + Int_t(hist->ynbins*(valy-hist->ybinmin)/(hist->ybinmax-hist->ybinmin));
if(biny<0) biny=0;
else if(biny>hist->ynbins) biny=hist->ynbins+1;
Int_t bin = biny*(hist->xnbins+2) + binx;
hist->bins[bin] += weight;
hist->nevts++;
// cout<<hist->nevts<<" "<<bin<<" "<< binx << " " << valx << " " << biny << " " << valy <<" "<<hist->bins[bin]<<endl;
}
ActorMap *Histogramer::CreateNewSMz(const char *name, Int_t Size, Int_t PortNumber)
{
int fd;
ostringstream SMZ;
ActorMap *map_ptr = nullptr;
int result;
bool SMZ_exists = TestSMZ(name);
if(SMZ_exists == false)
{
SMZ.str("");
SMZ << "curl http://localhost:5678/SMZs --data-binary '{";
SMZ << "\"name\":\""<< name <<"\", ";
SMZ << "\"size\":" << Size << ", ";
SMZ << "\"unit\":\"Bytes\"}'";
SMZ << " -H \"Content-Type: application/json\" -X POST ";
// cout<<SMZ.str()<<endl;
result = system(SMZ.str().data());
cout<<endl;
// create a new Transport Layer
SMZ.str("");
Float_t RefreshTimeInSec = 2.0;
SMZ << "curl http://localhost:8765/TLs --data-binary '{";
SMZ << "\"name\":\"source_" << name << "\", ";
SMZ << "\"buffer_name\":\"" << name << "\", ";
SMZ << "\"kind\":\"server\",";
SMZ << "\"tl_kind\":\"tl_plugin\", ";
SMZ << "\"size\":" << Size << ", ";
SMZ << "\"unit\":\"Bytes\", ";
SMZ << "\"plugin_name\":\"source_smz_updater\",";
SMZ << "\"period\":" << std::fixed << std::setprecision(1) << RefreshTimeInSec << ", ";
SMZ << "\"port\":" << PortNumber << ", ";
SMZ << "\"log_level\":\"debug\"}'";
SMZ << " -H \"Content-Type: application/json\" -X POST ";
// cout<<SMZ.str()<<endl;
result = system(SMZ.str().data());
cout<<endl;
// Connect a new Transport Layer to the SMZ
SMZ.str("");
SMZ << "curl http://localhost:8765/TLs/" << "source_" << name <<" --data-binary '{";
SMZ << "\"name\":\"source_" << name << "\", ";
SMZ << "\"action\":\"connect\"}'";
SMZ << " -H \"Content-Type: application/json\" -X PUT ";
// cout<<SMZ.str()<<endl;
result = system(SMZ.str().data());
cout<<endl;
}
else
{
cout<<"SMZ "<<name<<" found ==> Loaded !"<<endl<<endl;
}
//define the new actor active map
fd = shm_open(name, O_RDWR, 0666);
if (fd == -1)
{
fprintf(stderr, "Open failed for hist %s :%s\n",name, strerror(errno));
exit(EXIT_FAILURE);
}
//alocate the size of the shared memory zone
if (ftruncate(fd, sizeof(ActorMap)) == -1)
{
fprintf(stderr, "truncate failed for hist %s :%s\n",name, strerror(errno));
exit(EXIT_FAILURE);
}
//map the data on a ActorMap pointer
map_ptr = (ActorMap*) mmap(NULL, sizeof(ActorMap), PROT_WRITE, MAP_SHARED, fd, 0);
if (map_ptr == MAP_FAILED)
{
fprintf(stderr, "Map failed for hist %s :%s\n",name,strerror(errno));
exit(EXIT_FAILURE);
}
map_ptr->N_TH1_Entries = 0;
map_ptr->N_TH2_Entries = 0;
return map_ptr;
}
bool Histogramer::TestSMZ(const char *name)
{
bool SMZ_found = false;
char buffer[128];
string result = "";
FILE* pipe = popen("curl -s http://localhost:5678/SMZs | jq .", "r");
while (!feof(pipe))
{
if (fgets(buffer, 128, pipe) != NULL)
result += buffer;
}
pclose(pipe);
if(result.find(name) != string::npos )
SMZ_found = true;
return SMZ_found;
}
/***************************************************************************
* Copyright (C) 2018 by Jeremie Dudouet *
* jeremie.dudouet(AT)csnsm.in2p3.fr *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the *
* Free Software Foundation, Inc., *
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
***************************************************************************/
#ifndef _Histogramer
#define _Histogramer
#include "NarvalInterface.h"
#include "CrystalFrame.h"
#include "Trigger.h"
#include "CycleServer.h"
using namespace std;
//! Post preprocessing histogramer
/*!
*/
namespace AGAPRO {
#define MAX_TH1BINS 4096
#define MAX_TH2BINS 1024
#define MAX_TH1HISTS 79
#define MAX_TH2HISTS 1
#define MAX_CHAR_SIZE 200
struct Hist
{
Int_t nevts = 0;
Int_t spec_number = 0;
Float_t xbinmin = -1;
Float_t xbinmax = -1;
Int_t xnbins = -1;
char name[MAX_CHAR_SIZE];
char title[MAX_CHAR_SIZE];
char xtitle[MAX_CHAR_SIZE];
char gruname[MAX_CHAR_SIZE];
};
struct Hist1F : Hist
{
Float_t bins[(MAX_TH1BINS+2)]; // +2 for underflow and overflow bins
};
struct Hist2F : Hist
{
Float_t ybinmin = -1;
Float_t ybinmax = -1;
Int_t ynbins = -1;
Float_t bins[(MAX_TH2BINS+2)*(MAX_TH2BINS+2)]; // +2 for underflow and overflow bins
char ytitle[MAX_CHAR_SIZE];
};
struct ActorMap
{
int N_TH1_Entries = 0; // number of 1d histograms
int N_TH2_Entries = 0; // number of 2d histograms
Hist1F hist1f_list[MAX_TH1HISTS];
Hist2F hist2f_list[MAX_TH2HISTS];
};
struct binning_TH1{
Int_t NBins;
Float_t Min;
Float_t Max;
};
struct binning_TH2{
Int_t NBinsX;
Float_t XMin;
Float_t XMax;
Int_t NBinsY;
Float_t YMin;
Float_t YMax;
};
class Histogramer : public ADF::NarvalConsumer
{
public:
static const int kNCC = ADF::CrystalInterface::kNbCores;
static const int kNSG = ADF::CrystalInterface::kNbSegments;
protected:
CycleServer cServer; // manager of cyclic operations
string fOdirPrefix;
Int_t fReadEvts = 0;
UInt_t evnumber; // from last SetInput
ULong64_t timestamp;
Int_t fBasePortNumber;
public:
static std::string gMotherClass; // Static string containing the name of the base class
std::string gActualClass; // Static string to choose daughter class at run time
UShort_t crystal_id; // Static string to define the crystal id
std::string crystal_name; // Static string to define the crystal name
//Level 1 : list of actors
//level 2 : list of observable per actor
vector < vector< void * > > fListOfHists;
public:
Histogramer();
virtual ~Histogramer();
static void process_config(const Char_t *, UInt_t *); //! to init globals (static) from a directory
virtual void process_initialise ( UInt_t *error_code ){return;} //! Constructor implementation
virtual void process_reset ( UInt_t *error_code ); //! Destructor implementation
virtual UInt_t ProcessBlock(ADF::FrameBlock &);
virtual UInt_t Process(){return 0;}
virtual void process_start (UInt_t *error_code);
virtual void process_stop (UInt_t *error_code);
virtual Int_t SetInput(){return 0;} //! to init your local variables with the ones from the buffers
static std::string GetCrystalNameFromID(const Int_t id);
static UShort_t GetCrystalIDFromName(const string name);
static std::string GetSegmentNameFromID(const Int_t id);
static std::string GetCoreNameFromID(const Int_t id);