Skip to content
Snippets Groups Projects
merger_spy.cpp 10.5 KiB
Newer Older
Diego Gruyer's avatar
Diego Gruyer committed
//Created by KVClassFactory on Sat Jun 14 15:18:15 2014
//Author: gruyer,,,

#include "merger_spy.h"

#include "TThread.h"
#include "math.h"
#include "Riostream.h"
#include "TMath.h"

#include "TF1.h"
#include "TRandom3.h"
#include "TH1F.h"
#include "TProfile.h"

#include <iostream>
#include <fstream>
#include <string>
#include <sstream>

#include "zmq.hpp"

////////////////////////////////////////////////////////////////////////////////
// BEGIN_HTML <!--
/* -->
<h2>merger_spy</h2>
<h4>merger_spy...</h4>
<!-- */
// --> END_HTML
////////////////////////////////////////////////////////////////////////////////

//ClassImp(merger_spy)


void merger_spy::fill_protobuf(myprotobuf::merger_info& minfo)
{
   auto hf = minfo.add_histo_frames();
   hf->set_name("Merger histos");

   for(int ii=0; ii<10; ii++)
   {
      TH1* dist = (TH1*) histos.At(ii);
      auto hh = hf->add_histos();
      hh->set_name(dist->GetTitle());
//      hh->set_labelx("time (ns)");
      for(int ib=1; ib<dist->GetXaxis()->GetNbins()+1; ib++)
      {
         auto pp = hh->add_points();
         pp->set_x(dist->GetXaxis()->GetBinCenter(ib));
         pp->set_y(dist->GetBinContent(ib));
      }
   }
}

merger_spy::merger_spy(int send_delay, bool simu)
{
   fReader = new MFMBufferReader;
   fDt = send_delay;
   fSimuMode = simu;
   zmqin = zmqout = false;

   nev_indra = nev_fazia = nev_merged = nbuffer = nev_faziasolo = nev_indrasolo = 0;
   fazia_ts_errors = indra_ts_errors = random_errors = 0;

   h_delta_timestamp = new TH1F("DT","INDRA_TS-FAZIA_TS (us)",3000,-3,3);
   h_dt_lastmerge = new TH1F("DT2","Time between merge events (ms)",1000,0,500);
   h_dt_lastmerge2 = new TH1F("DT3","Time between merge events (us)",10000,0,5000);
   h_dt_indra = new TH1F("DT_INDRA","Time between INDRA events (ns)",10000,0,5e+05);
   h_dt_indra_ms = new TH1F("DT_INDRA_MS","Time between INDRA events (ms)",10000,0,500);
   h_dt_fazia = new TH1F("DT_FAZIA","Time between FAZIA events (ns)",10000,0,5e+05);
//   h_events = new TH1F("M_EVENTS","Number of events INDRA/FAZIA/MERGED",3,0,3);
   h_events = new TProfile("M_EVENTS","Number of events INDRA/FAZIA/MERGED",3,0,3);
   h_dt_fazia_indra_faziaseul = new TH1F("h_dt_fazia_indra_faziaseul","FAZIAts-lastINDRAts (FAZIA seul) [ms]", 500,0,.5);
   h_dt_fazia_indra_merge = new TH1F("h_dt_fazia_indra_merge","FAZIAts-lastINDRAts (merge event) [ms]",500,0,.5);
   h_mult_merge=new TH1F("h_mult_merge","Nb frames in merge frames",5,.5,5.5);
   //   h_mult_merge_qui=new TH2F("h_mult_merge_qui","Nb frames INDRA vs. Nb frames FAZIA",5,.5,5.5,5,.5,5.5);
   h_coinc_freq_avg_100=new TH1F("h_coinc_freq_avg_100","Coincidence frequency [Hz] (avg.)", 500,0,100);
   h_coinc_freq_avg_500=new TH1F("h_coinc_freq_avg_500","Coincidence frequency [Hz] (avg.)", 1000,0,500);
   h_coinc_freq_avg_1000=new TH1F("h_coinc_freq_avg_1000","Coincidence frequency [Hz] (avg.)", 2000,0,1000);

   histos.Add(h_delta_timestamp);
   histos.Add(h_dt_lastmerge);
   histos.Add(h_dt_lastmerge2);
   histos.Add(h_dt_indra);
   histos.Add(h_dt_indra_ms);
   histos.Add(h_dt_fazia);
   histos.Add(h_events);
   histos.Add(h_dt_fazia_indra_faziaseul);
   histos.Add(h_dt_fazia_indra_merge);
   histos.Add(h_mult_merge);
   histos.Add(h_coinc_freq_avg_100);
   histos.Add(h_coinc_freq_avg_500);
   histos.Add(h_coinc_freq_avg_1000);
}

//________________________________________________________________
void merger_spy::Start()
{
   if(zmqin&&zmqout){
      if(!fSimuMode){
         fThread = new TThread(Process,this);
         fThread->Run();
      }
      fTimer.SetObject(this);
      fTimer.Start(fDt*1000);
   }
}

//________________________________________________________________
void merger_spy::Stop()
{
   SafeDelete(fThread);
   fTimer.Stop();
}

void merger_spy::treat_indra_frame(const MFMCommonFrame& frame)
{
   lastINDRAtimestamp = indra_ts;
   indra_ts = frame.GetTimeStamp();
   double dt = ((Long64_t)(indra_ts-lastINDRAtimestamp))*10.;
   if(dt==0||indra_ts==0) indra_ts_errors++;//cout << "bad FAZIA timestamp: " << fazia_ts << endl;
   if(lastINDRAtimestamp>0.) h_dt_indra->Fill(dt);
//   h_events->Fill(0.5);
   nev_indra++;
   nev_indrasolo++;
}

void merger_spy::treat_fazia_frame(const MFMCommonFrame& frame)
{
   lastFAZIAtimestamp = fazia_ts;
   fazia_ts = frame.GetTimeStamp();
   double dt = ((Long64_t)(fazia_ts-lastFAZIAtimestamp))*10.;
   if(dt==0||fazia_ts==0xffffffffffff) fazia_ts_errors++;//cout << "bad FAZIA timestamp: " << fazia_ts << endl;
   if(lastFAZIAtimestamp>0.&&dt>0.) h_dt_fazia->Fill(dt);
//   h_events->Fill(1.5);
   nev_fazia++;
   nev_faziasolo++;
}

int merger_spy::handle_single_mfmframe(const MFMCommonFrame& frame)
{
   if(frame.GetFrameType() == MFM_MESYTEC_MDPP_FRAME_TYPE) {
      /*case INDRA MFMFaziaFrame: */
      treat_indra_frame(frame);
      return INDRA;
   }
   else if(frame.GetFrameType() == MFM_FAZIA_FRAME_TYPE) {
      /*case FAZIA MFMFaziaFrame: */
      treat_fazia_frame(frame);
      return FAZIA;
   }
   else cout << frame.GetFrameType() << endl;
}

void merger_spy::handle_merged_mfmframe(const MFMMergeFrameManager& mergeframe)
{
   int who=0;
   h_mult_merge->Fill(mergeframe.GetNumberOfFrames());
   while (mergeframe.ReadNextFrame()) {
      who += handle_single_mfmframe(mergeframe.GetFrameRead());
   }
   if(who==INDRA+FAZIA) {
      merged = true;
      nev_merged++;
      nev_faziasolo--;
      nev_indrasolo--;
   }
   else if(who==INDRA){}
}

void* merger_spy::TreatEvent(MFMBufferReader* reader)
{

   while(reader->ReadNextFrame())
   {
      merged=false;

      if(reader->IsFrameReadMerge())
      {
         handle_merged_mfmframe(reader->GetMergeManager());
      }
      else
      {
         handle_single_mfmframe(reader->GetFrameRead());
      }

      if(merged) {
//         h_events->Fill(2.5);
         h_delta_timestamp->Fill(((Long64_t)(indra_ts-fazia_ts))/100.);
         //      h_mult_merge_qui->Fill(inmerge_fazia,inmerge_indra);
         nmergedforfreq++;
         if(lastmergeTSforfreq==0) lastmergeTSforfreq=lasttimestamp;
         if(nmergedforfreq>100){
            double dt=((Long64_t)(lasttimestamp-lastmergeTSforfreq))*1.e-8;
            if(dt > 0){
               h_coinc_freq_avg_100->Fill(nmergedforfreq/dt);
               h_coinc_freq_avg_500->Fill(nmergedforfreq/dt);
               h_coinc_freq_avg_1000->Fill(nmergedforfreq/dt);
               nmergedforfreq=0;
               lastmergeTSforfreq=lasttimestamp;
            }
         }
      }
      if(lastINDRAtimestamp && fazia_ts){
         if(merged) h_dt_fazia_indra_merge->Fill(((Long64_t)(fazia_ts-lastINDRAtimestamp))*1.e-5);
         else h_dt_fazia_indra_faziaseul->Fill(((Long64_t)(fazia_ts-lastINDRAtimestamp))*1.e-5);
      }

   }
   return nullptr;
}


Bool_t merger_spy::HandleTimer(TTimer*)
{
   TThread::Lock();

   fTimeStamp.Set();
   Double_t time = fTimeStamp.Convert();



   int ntot = nev_indrasolo + nev_faziasolo + nev_merged;
   if(ntot){
   h_events->Fill(0.5,100.*nev_indrasolo/ntot);
   h_events->Fill(1.5,100.*nev_faziasolo/ntot);
   h_events->Fill(2.5,100.*nev_merged/ntot);
   }

   cout << Form("[%s] publishing new merger-frame (%.1lf buffers received/s)",fTimeStamp.AsString(),nbuffer/(time-fStartTime));
   if(ntot) cout << Form(" : %d-%d-%d",(int)100.*nev_indrasolo/ntot,(int)100.*nev_faziasolo/ntot,(int)100.*nev_merged/ntot);
   cout << endl;
   nev_indrasolo = nev_faziasolo = 0;


   myprotobuf::merger_info minfo;


//   h_events->SetBinContent(0,100.*nev_indra/ntot);
//   h_events->SetBinContent(1,100.*nev_fazia/ntot);
//   h_events->SetBinContent(2,100.*nev_merged/ntot);

   fill_protobuf(minfo);

   auto rf = minfo.add_rate_frames();
   rf->set_name("Rates");
   double val = 0;

   auto hh = rf->add_rates();
   hh->set_name("Merged");
   if(fSimuMode) {val = gRandom->Rndm()*10+70; if(gRandom->Rndm()<.1) val=0;}
   else{val = nev_merged/(time-fStartTime); nev_merged=0;}
   hh->set_value(val);

   hh = rf->add_rates();
   hh->set_name("Indra");
   if(fSimuMode) {val = gRandom->Rndm()*10+70; if(gRandom->Rndm()<.1) val=0;}
   else{val = nev_indra/(time-fStartTime); nev_indra=0;}
   hh->set_value(val);

   hh = rf->add_rates();
   hh->set_name("Fazia");
   if(fSimuMode) {val = gRandom->Rndm()*10+70; if(gRandom->Rndm()<.1) val=0;}
   else{val = nev_fazia/(time-fStartTime); nev_fazia=0;}
   hh->set_value(val);

   auto loginfo = minfo.add_logs();
   loginfo->set_value(Form("new merger-frame (%d buffers)",nbuffer));
   loginfo->set_type(myprotobuf::log::fDebug);
   nbuffer = 0;

   if(indra_ts_errors){
      loginfo = minfo.add_logs();
      loginfo->set_value(Form("%d bad timestamps in INDRA",indra_ts_errors));
      loginfo->set_type(myprotobuf::log::fWarning);
      indra_ts_errors = 0;
      cout << "indra errors " << indra_ts_errors << endl;
   }
   if(fazia_ts_errors){
      loginfo = minfo.add_logs();
      loginfo->set_value(Form("%d bad timestamps in INDRA",fazia_ts_errors));
      loginfo->set_type(myprotobuf::log::fWarning);
      fazia_ts_errors = 0;
   }


   std::string str;
   bool retval = minfo.SerializeToString(&str);
   fZmqOut->send(str.data(),str.size());

   fStartTime = time;
   TThread::UnLock();

}

void* merger_spy::Process(void* obj)
{
   merger_spy* obg = (merger_spy*) obj;
   MFMBufferReader* reader = obg->fReader;

   zmq::socket_t *zmq_req = (zmq::socket_t *) obg->fZmqIn;
   if(zmq_req){
      zmq::message_t event;

      cout << "starting on line processing !" << endl;

      while(1)
      {
         zmq_req->recv(&event);
         reader->SetBuffer((char*)event.data(),event.size());

         obg->nbuffer++;
         obg->TreatEvent(reader);
      }
   }
   obg->Stop() ;
   return (void*)nullptr;
}

merger_spy::~merger_spy()
{
   // Destructor
   if(fZmqIn) fZmqIn->close();
   if(fZmqOut) fZmqOut->close();
   delete fReader;
}

void merger_spy::ConnectZmqIn(const string& urlin)
{
   zmq::context_t* sub_ctx =  new zmq::context_t(1);
   fZmqIn =  new zmq::socket_t(*sub_ctx, ZMQ_SUB);
   fZmqIn->setsockopt(ZMQ_SUBSCRIBE, "", 0);

   try {
      fZmqIn->connect(urlin.c_str());
   } catch(zmq::error_t& zmq_err) {
      std::cout << "ERROR: connection to '" << urlin << "' failed - " << zmq_err.what() << std::endl;
      return;
   }
   zmqin = true;
}

void merger_spy::ConnectZmqOut(const string& urlout)
{
   zmq::context_t* pub_ctx = new zmq::context_t(1);
   fZmqOut = new zmq::socket_t(*pub_ctx, ZMQ_PUB);
   int linger = 1;
   fZmqOut->setsockopt(ZMQ_LINGER, &linger, sizeof(linger));

   try {
      fZmqOut->bind(urlout.c_str());
   } catch(zmq::error_t& zmq_err) {
      std::cout << "ERROR: connection to '" << urlout << "' failed - " << zmq_err.what() << std::endl;
      return;
   }
   zmqout = true;

}

void merger_spy::Run()
{
   Start();
}