//Created by KVClassFactory on Sat Jun 14 15:18:15 2014 //Author: gruyer,,, #include "merger_spy.h" #include "TThread.h" #include "math.h" #include "Riostream.h" #include "TMath.h" #include "TF1.h" #include "TRandom3.h" #include "TH1F.h" #include "TProfile.h" #include <iostream> #include <fstream> #include <string> #include <sstream> #include "zmq.hpp" //////////////////////////////////////////////////////////////////////////////// // BEGIN_HTML <!-- /* --> <h2>merger_spy</h2> <h4>merger_spy...</h4> <!-- */ // --> END_HTML //////////////////////////////////////////////////////////////////////////////// //ClassImp(merger_spy) void merger_spy::fill_protobuf(myprotobuf::merger_info& minfo) { auto hf = minfo.add_histo_frames(); hf->set_name("Merger histos"); for(int ii=0; ii<10; ii++) { TH1* dist = (TH1*) histos.At(ii); auto hh = hf->add_histos(); hh->set_name(dist->GetTitle()); // hh->set_labelx("time (ns)"); for(int ib=1; ib<dist->GetXaxis()->GetNbins()+1; ib++) { auto pp = hh->add_points(); pp->set_x(dist->GetXaxis()->GetBinCenter(ib)); pp->set_y(dist->GetBinContent(ib)); } } } merger_spy::merger_spy(int send_delay, bool simu) { fReader = new MFMBufferReader; fDt = send_delay; fSimuMode = simu; zmqin = zmqout = false; nev_indra = nev_fazia = nev_merged = nbuffer = nev_faziasolo = nev_indrasolo = 0; fazia_ts_errors = indra_ts_errors = random_errors = 0; h_delta_timestamp = new TH1F("DT","INDRA_TS-FAZIA_TS (us)",3000,-3,3); h_dt_lastmerge = new TH1F("DT2","Time between merge events (ms)",1000,0,500); h_dt_lastmerge2 = new TH1F("DT3","Time between merge events (us)",10000,0,5000); h_dt_indra = new TH1F("DT_INDRA","Time between INDRA events (ns)",10000,0,5e+05); h_dt_indra_ms = new TH1F("DT_INDRA_MS","Time between INDRA events (ms)",10000,0,500); h_dt_fazia = new TH1F("DT_FAZIA","Time between FAZIA events (ns)",10000,0,5e+05); // h_events = new TH1F("M_EVENTS","Number of events INDRA/FAZIA/MERGED",3,0,3); h_events = new TProfile("M_EVENTS","Number of events INDRA/FAZIA/MERGED",3,0,3); h_dt_fazia_indra_faziaseul = new TH1F("h_dt_fazia_indra_faziaseul","FAZIAts-lastINDRAts (FAZIA seul) [ms]", 500,0,.5); h_dt_fazia_indra_merge = new TH1F("h_dt_fazia_indra_merge","FAZIAts-lastINDRAts (merge event) [ms]",500,0,.5); h_mult_merge=new TH1F("h_mult_merge","Nb frames in merge frames",5,.5,5.5); // h_mult_merge_qui=new TH2F("h_mult_merge_qui","Nb frames INDRA vs. Nb frames FAZIA",5,.5,5.5,5,.5,5.5); h_coinc_freq_avg_100=new TH1F("h_coinc_freq_avg_100","Coincidence frequency [Hz] (avg.)", 500,0,100); h_coinc_freq_avg_500=new TH1F("h_coinc_freq_avg_500","Coincidence frequency [Hz] (avg.)", 1000,0,500); h_coinc_freq_avg_1000=new TH1F("h_coinc_freq_avg_1000","Coincidence frequency [Hz] (avg.)", 2000,0,1000); histos.Add(h_delta_timestamp); histos.Add(h_dt_lastmerge); histos.Add(h_dt_lastmerge2); histos.Add(h_dt_indra); histos.Add(h_dt_indra_ms); histos.Add(h_dt_fazia); histos.Add(h_events); histos.Add(h_dt_fazia_indra_faziaseul); histos.Add(h_dt_fazia_indra_merge); histos.Add(h_mult_merge); histos.Add(h_coinc_freq_avg_100); histos.Add(h_coinc_freq_avg_500); histos.Add(h_coinc_freq_avg_1000); } //________________________________________________________________ void merger_spy::Start() { if(zmqin&&zmqout){ if(!fSimuMode){ fThread = new TThread(Process,this); fThread->Run(); } fTimer.SetObject(this); fTimer.Start(fDt*1000); } } //________________________________________________________________ void merger_spy::Stop() { SafeDelete(fThread); fTimer.Stop(); } void merger_spy::treat_indra_frame(const MFMCommonFrame& frame) { lastINDRAtimestamp = indra_ts; indra_ts = frame.GetTimeStamp(); double dt = ((Long64_t)(indra_ts-lastINDRAtimestamp))*10.; if(dt==0||indra_ts==0) indra_ts_errors++;//cout << "bad FAZIA timestamp: " << fazia_ts << endl; if(lastINDRAtimestamp>0.) h_dt_indra->Fill(dt); // h_events->Fill(0.5); nev_indra++; nev_indrasolo++; } void merger_spy::treat_fazia_frame(const MFMCommonFrame& frame) { lastFAZIAtimestamp = fazia_ts; fazia_ts = frame.GetTimeStamp(); double dt = ((Long64_t)(fazia_ts-lastFAZIAtimestamp))*10.; if(dt==0||fazia_ts==0xffffffffffff) fazia_ts_errors++;//cout << "bad FAZIA timestamp: " << fazia_ts << endl; if(lastFAZIAtimestamp>0.&&dt>0.) h_dt_fazia->Fill(dt); // h_events->Fill(1.5); nev_fazia++; nev_faziasolo++; } int merger_spy::handle_single_mfmframe(const MFMCommonFrame& frame) { if(frame.GetFrameType() == MFM_MESYTEC_MDPP_FRAME_TYPE) { /*case INDRA MFMFaziaFrame: */ treat_indra_frame(frame); return INDRA; } else if(frame.GetFrameType() == MFM_FAZIA_FRAME_TYPE) { /*case FAZIA MFMFaziaFrame: */ treat_fazia_frame(frame); return FAZIA; } else cout << frame.GetFrameType() << endl; } void merger_spy::handle_merged_mfmframe(const MFMMergeFrameManager& mergeframe) { int who=0; h_mult_merge->Fill(mergeframe.GetNumberOfFrames()); while (mergeframe.ReadNextFrame()) { who += handle_single_mfmframe(mergeframe.GetFrameRead()); } if(who==INDRA+FAZIA) { merged = true; nev_merged++; nev_faziasolo--; nev_indrasolo--; } else if(who==INDRA){} } void* merger_spy::TreatEvent(MFMBufferReader* reader) { while(reader->ReadNextFrame()) { merged=false; if(reader->IsFrameReadMerge()) { handle_merged_mfmframe(reader->GetMergeManager()); } else { handle_single_mfmframe(reader->GetFrameRead()); } if(merged) { // h_events->Fill(2.5); h_delta_timestamp->Fill(((Long64_t)(indra_ts-fazia_ts))/100.); // h_mult_merge_qui->Fill(inmerge_fazia,inmerge_indra); nmergedforfreq++; if(lastmergeTSforfreq==0) lastmergeTSforfreq=lasttimestamp; if(nmergedforfreq>100){ double dt=((Long64_t)(lasttimestamp-lastmergeTSforfreq))*1.e-8; if(dt > 0){ h_coinc_freq_avg_100->Fill(nmergedforfreq/dt); h_coinc_freq_avg_500->Fill(nmergedforfreq/dt); h_coinc_freq_avg_1000->Fill(nmergedforfreq/dt); nmergedforfreq=0; lastmergeTSforfreq=lasttimestamp; } } } if(lastINDRAtimestamp && fazia_ts){ if(merged) h_dt_fazia_indra_merge->Fill(((Long64_t)(fazia_ts-lastINDRAtimestamp))*1.e-5); else h_dt_fazia_indra_faziaseul->Fill(((Long64_t)(fazia_ts-lastINDRAtimestamp))*1.e-5); } } return nullptr; } Bool_t merger_spy::HandleTimer(TTimer*) { TThread::Lock(); fTimeStamp.Set(); Double_t time = fTimeStamp.Convert(); int ntot = nev_indrasolo + nev_faziasolo + nev_merged; if(ntot){ h_events->Fill(0.5,100.*nev_indrasolo/ntot); h_events->Fill(1.5,100.*nev_faziasolo/ntot); h_events->Fill(2.5,100.*nev_merged/ntot); } cout << Form("[%s] publishing new merger-frame (%.1lf buffers received/s)",fTimeStamp.AsString(),nbuffer/(time-fStartTime)); if(ntot) cout << Form(" : %d-%d-%d",(int)100.*nev_indrasolo/ntot,(int)100.*nev_faziasolo/ntot,(int)100.*nev_merged/ntot); cout << endl; nev_indrasolo = nev_faziasolo = 0; myprotobuf::merger_info minfo; // h_events->SetBinContent(0,100.*nev_indra/ntot); // h_events->SetBinContent(1,100.*nev_fazia/ntot); // h_events->SetBinContent(2,100.*nev_merged/ntot); fill_protobuf(minfo); auto rf = minfo.add_rate_frames(); rf->set_name("Rates"); double val = 0; auto hh = rf->add_rates(); hh->set_name("Merged"); if(fSimuMode) {val = gRandom->Rndm()*10+70; if(gRandom->Rndm()<.1) val=0;} else{val = nev_merged/(time-fStartTime); nev_merged=0;} hh->set_value(val); hh = rf->add_rates(); hh->set_name("Indra"); if(fSimuMode) {val = gRandom->Rndm()*10+70; if(gRandom->Rndm()<.1) val=0;} else{val = nev_indra/(time-fStartTime); nev_indra=0;} hh->set_value(val); hh = rf->add_rates(); hh->set_name("Fazia"); if(fSimuMode) {val = gRandom->Rndm()*10+70; if(gRandom->Rndm()<.1) val=0;} else{val = nev_fazia/(time-fStartTime); nev_fazia=0;} hh->set_value(val); auto loginfo = minfo.add_logs(); loginfo->set_value(Form("new merger-frame (%d buffers)",nbuffer)); loginfo->set_type(myprotobuf::log::fDebug); nbuffer = 0; if(indra_ts_errors){ loginfo = minfo.add_logs(); loginfo->set_value(Form("%d bad timestamps in INDRA",indra_ts_errors)); loginfo->set_type(myprotobuf::log::fWarning); indra_ts_errors = 0; cout << "indra errors " << indra_ts_errors << endl; } if(fazia_ts_errors){ loginfo = minfo.add_logs(); loginfo->set_value(Form("%d bad timestamps in INDRA",fazia_ts_errors)); loginfo->set_type(myprotobuf::log::fWarning); fazia_ts_errors = 0; } std::string str; bool retval = minfo.SerializeToString(&str); fZmqOut->send(str.data(),str.size()); fStartTime = time; TThread::UnLock(); } void* merger_spy::Process(void* obj) { merger_spy* obg = (merger_spy*) obj; MFMBufferReader* reader = obg->fReader; zmq::socket_t *zmq_req = (zmq::socket_t *) obg->fZmqIn; if(zmq_req){ zmq::message_t event; cout << "starting on line processing !" << endl; while(1) { zmq_req->recv(&event); reader->SetBuffer((char*)event.data(),event.size()); obg->nbuffer++; obg->TreatEvent(reader); } } obg->Stop() ; return (void*)nullptr; } merger_spy::~merger_spy() { // Destructor if(fZmqIn) fZmqIn->close(); if(fZmqOut) fZmqOut->close(); delete fReader; } void merger_spy::ConnectZmqIn(const string& urlin) { zmq::context_t* sub_ctx = new zmq::context_t(1); fZmqIn = new zmq::socket_t(*sub_ctx, ZMQ_SUB); fZmqIn->setsockopt(ZMQ_SUBSCRIBE, "", 0); try { fZmqIn->connect(urlin.c_str()); } catch(zmq::error_t& zmq_err) { std::cout << "ERROR: connection to '" << urlin << "' failed - " << zmq_err.what() << std::endl; return; } zmqin = true; } void merger_spy::ConnectZmqOut(const string& urlout) { zmq::context_t* pub_ctx = new zmq::context_t(1); fZmqOut = new zmq::socket_t(*pub_ctx, ZMQ_PUB); int linger = 1; fZmqOut->setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); try { fZmqOut->bind(urlout.c_str()); } catch(zmq::error_t& zmq_err) { std::cout << "ERROR: connection to '" << urlout << "' failed - " << zmq_err.what() << std::endl; return; } zmqout = true; } void merger_spy::Run() { Start(); }