/* PSA-actor base class, J. Ljungvall 2008, based on implementation by Olivier Stezowski*/ #include "PSAFilter.h" #include "AgataKeyFactory.h" #include "AgataFrameFactory.h" #include #include #include #include #include #include "misc.h" std::string PSAFilter::gMotherClass = "PSAFilter"; std::string PSAFilter::gActualClass; using namespace std; using namespace ADF; const int preTrigger = 10; PSAFilter::PSAFilter() : fFrameCrystal(NULL), fFramePSA(NULL), fTrigger("data:ccrystal") { fConfPath = GetConfPath(); fOdirPrefix.clear(); fBasisFile.clear(); fTraceLengthPSA = 80; fWriteNumTraces = 0; fPsaMinSegMult = 1; fPsaMaxSegMult = CrystalInterface::kNbSegments; fPsaSegCenter = false; fVerbose = false; mCount = 0; fInblock.SetModeIO(ConfAgent::kRead); fOublock.SetModeIO(ConfAgent::kWrite); for(int slot = 0; slot < TCOUNT*TMODULO; slot++) { DD[slot].numHits = 0; for(UInt_t nh = 0; nh < ADF::CrystalInterface::kNbSegments; nh++) { DD[slot].theHits[nh].Reset(); } } } PSAFilter::~PSAFilter() { for(int slot = 0; slot < TCOUNT*TMODULO; slot++) { for(int nn=0; nn (frame); GObject *glob = fFrameCrystal->Data()->Global(); glob->LinkItem("CrystalID", &crystal_id); glob->LinkItem("CrystalStatus", &crystal_status); } else rerr |= 8; ADF::CrystalInterface::kDefaultLength = defaultLength; // restore "default" length of traces frame = fTrigger.SetOutputFrame("Agata", "data:psa", GetConfAgent()); if(frame) { // to link some named item with the local variables. fFramePSA = dynamic_cast< APSAFrame * > (frame); GObject *glob = fFramePSA->Data()->Global(); glob->LinkItem("CrystalID", &crystal_id); glob->LinkItem("CrystalStatus", &crystal_status); glob->LinkItem("CoreE0", &CoreE0); glob->LinkItem("CoreE1", &CoreE1); glob->LinkItem("CoreT0", &CoreT0); glob->LinkItem("CoreT1", &CoreT1); //glob->LinkItem("PSAStatus", &PsaStatus); //glob->LinkItem("Spare", &PsaStatus); //glob->Link("Anonymous", &myAno); //bool bb = glob->IsFullyLinked(); } else rerr |= 4; // the trigger is registered if( !fFrameIO.Register(&fTrigger) ) rerr |= 1; if(rerr) { cout << "Trigger definition error " << rerr << " in PSAFilter::process_initialise()" << endl; *error_code = 100 + rerr; return; } // OComment // by default fFrameIO is in the kUndefined mode ... if the algo has been // properly initialised, it should set the state to kIdle so that the data could be treated fFrameIO.SetStatus(BaseFrameIO::kIdle); for(int slot = 0; slot < TCOUNT*TMODULO; slot++) { for(int nn = 0; nn < CrystalInterface::kNbSegments; nn++) { DD[slot].fTracesSG[nn] = new Float_t[fTraceLengthPSA]; memset(DD[slot].fTracesSG[nn], 0, sizeof(Float_t)*fTraceLengthPSA); DD[slot].SegE[nn] = 0; } for(int nn = 0; nn < CrystalInterface::kNbCores; nn++) { DD[slot].fTracesCC[nn] = new Float_t[fTraceLengthPSA]; memset(DD[slot].fTracesCC[nn], 0, sizeof(Float_t)*fTraceLengthPSA); DD[slot].CoreE[nn] = DD[slot].CoreT[nn] = 0; } DD[slot].numHits = 0; for(UInt_t nh = 0; nh < ADF::CrystalInterface::kNbSegments; nh++) { DD[slot].theHits[nh].Reset(); } } // version-specific initializations *error_code = AlgoSpecificInitialise(); #if TCOUNT > 1 // launch the threads of the local chains cout << "Grid Search called using " << TCOUNT << " THREADS with blocks of " << TMODULO << " EVENTS" << endl; for(int slot = 0; slot < TCOUNT ; slot++) { Slot[slot].State = 0; // start in the non-running state Slot[slot].Thread = new boost::thread(tProcess(this, slot)); cout << " LAUNCHED THREAD tProcess" << endl; } #endif } void PSAFilter::GetParameters(UInt_t *error_code) { *error_code = 0; string configFileName = fConfPath + gMotherClass + ".conf"; ifstream configfile( configFileName.c_str() ); if(!configfile.good()) { cout << "Error opening " << configFileName << endl; *error_code = 102; return; } cout << endl << gMotherClass + "::getParams() reading from --> " << configFileName << endl; string line, keyw, data; bool ok = true; while(getline(configfile, line)) { if(!stringSplit(line, " \t", keyw, data)) continue; // empty or comment lines cout << line; ok = false; if(keyw == "ActualClass") { ok = data.size() > 0; gActualClass = data; } else if(keyw == "WriteDataPrefix") { ok = data.size() > 0; fOdirPrefix = data; forceTailSlash(fOdirPrefix); } else if(keyw == "BasisFile") { ok = data.size() > 0; fBasisFile = data; } else if (keyw == "Verbose" ) { fVerbose = true; ok = true; } else if(keyw == "TraceLengthPSA" || keyw == "TraceLength") { ok = 1 == sscanf(data.c_str(), "%d", &fTraceLengthPSA); } else if(keyw == "WriteTraces") { ok = 1 == sscanf(data.c_str(), "%d", &fWriteNumTraces); } else if(keyw == "SegmentFoldGate") { ok = 2 == sscanf(data.c_str(), "%d %d", &fPsaMinSegMult, &fPsaMaxSegMult); } else if(keyw == "PlaceAtSegCenter") { fPsaSegCenter = true; ok = true; } else { cout << " --> ignored"; ok = true; } if(!ok) { cout << " --> missing argument(s)" << endl; *error_code = 103; return; } else { cout << endl; } } } Int_t PSAFilter::SetInput(int slot) { // loads the values from the frame into the ADFObject attached to the frame fFrameCrystal->Read(); PsaData *pD = DD + slot; pD->crystal_id = crystal_id; pD->crystal_status = crystal_status; for(int isg=0; isgData()->GetSegment(isg); pD->SegE[isg] = (float)seg->GetE(); seg->GetSignal()->Get(pD->fTracesSG[isg], fTraceLengthPSA); } for(int icc=0; iccData()->GetCore(icc); pD->CoreE[icc] = (float)core->GetE(); pD->CoreT[icc] = (float)core->GetT(); core->GetSignal()->Get(pD->fTracesCC[icc], fTraceLengthPSA); } pD->evnumber = ((AgataKey *)fFrameCrystal->GetKey())->GetEventNumber(); pD->timestamp = ((AgataKey *)fFrameCrystal->GetKey())->GetTimeStamp(); pD->numHits = 0; return 0; } Int_t PSAFilter::SetOutput(int slot) { fFramePSA->Reset(); PsaData *pD = DD + slot; if(pD->numHits < 0) return 0; for(UInt_t nh = 0; nh < pD->numHits; nh++) { ADF::PSAHit *hit = (PSAHit*)fFramePSA->Data()->NewHit(); hit->SetHit(pD->theHits[nh]); } ((AgataKey *)fFramePSA->GetKey())->SetEventNumber(pD->evnumber); ((AgataKey *)fFramePSA->GetKey())->SetTimeStamp(pD->timestamp); crystal_id = pD->crystal_id; crystal_status = pD->crystal_status; CoreE0 = pD->CoreE[0]; CoreE1 = pD->CoreE[1]; CoreT0 = pD->CoreT[0]; CoreT1 = pD->CoreT[1]; //myAno.Set(crystal_id, 0); //myAno.fRealSize = sizeof(crystal_id); //myAno.Set(pD->timestamp, myAno.fRealSize); //myAno.fRealSize += sizeof(pD->timestamp); UInt_t nwritten = fFramePSA->Write(); return nwritten ? 0 : 1; } Int_t PSAFilter::Process(int slot) { Int_t result = 0; cout << "WARNING!! Empty Process()" << endl << flush; return result; } void PSAFilter::process_block( void *input_buffer, UInt_t size_of_input_buffer, void *output_buffer, UInt_t size_of_output_buffer, UInt_t *used_size_of_output_buffer, UInt_t *error_code) { fInblock.SetBlock((Char_t *)input_buffer,size_of_input_buffer); fOublock.SetBlock((Char_t *)output_buffer,size_of_output_buffer); *error_code = PSAFilter::ProcessBlock(fInblock, fOublock); *used_size_of_output_buffer = fOublock.GetSize(); } #if TCOUNT > 1 // this is the threaded version UInt_t PSAFilter::ProcessBlock (ADF::FrameBlock &in, ADF::FrameBlock &out) { // attach the input/output buffer to the FrameIO system fFrameIO.Attach(&in,&out); // start the processing UInt_t error_code = 0; UInt_t nevs = mCount; if(mCount == 0) mRate.reset(); bool eoi = false; int slot = -1; int evcI = 0; int evcO = 0; while (true) { slot = (slot + 1) % TCOUNT; CSlot *pslot = Slot + slot; if(pslot->State) { // if this slot has been activated, wait until it has finished its job if(pslot->State != 2) { // not yet done boost::mutex::scoped_lock lk(pslot->Mutex); while(pslot->State != 2) { pslot->Condition.wait(lk); } } // pass content of the old slot to the output for(int snn = 0; snn < pslot->Count; snn++) { int sslot = slot*TMODULO+snn; if(DD[sslot].retValue) { //chState[slot] = 0; ??? error_code = 2; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : Process()" << dolog; break; } if(DD[sslot].numHits >0) { // spectra and traces PostProcess(sslot); // fill the output buffer if( SetOutput(sslot) ) { //chState[slot] = 0; ??? error_code = 3; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : SetOutput()" << dolog; break; } // ok, so send the produced frame to the ouput if( !fFrameIO.Record() ) { //chState[slot] = 0; ??? error_code = 4; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : Record()" << dolog; break; } evcO++; } DD[sslot].numHits = 0; } } if(error_code) { //chState[slot] = 0; // ?? break; } // fill this slot with new data from the input pslot->Count = 0; int snn = 0; for( ; snn < TMODULO; snn++) { if(!fFrameIO.Notify() ) { eoi = true; break; } int sslot = slot*TMODULO+snn; if( SetInput(sslot) ) { error_code = 1; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : SetInput()" << dolog; break; } DD[sslot].retValue = 0; evcI++; } pslot->Count = snn; if(snn == 0) break; // tell the thread to proceed { boost::mutex::scoped_lock lk(pslot->Mutex); pslot->State = 1; } pslot->Condition.notify_one(); if(eoi) break; } // Wait for the launched threads to finish // At this point the status of fFrameIO is BaseFrameIO::kIdle and we should no more write on it. // However, the output from the still running threads is saved by forcing it to BaseFrameIO::kRunning // It would be more efficient to synchronize the unfinished threads at the next call of ProcessBlock // but we do it here so as to keep the correspondence between input and output buffers for(int nrs = 0; nrs < TCOUNT; nrs++) { slot = (slot + 1) % TCOUNT; CSlot *pslot = Slot + slot; if(pslot->State) { // if this slot has been activated, wait that it has finished its job if(pslot->State != 2) { // not yet done boost::mutex::scoped_lock lk(pslot->Mutex); while(pslot->State != 2) { pslot->Condition.wait(lk); } } // pass content of the old slot to the output for(int snn = 0; snn < pslot->Count; snn++) { int sslot = slot*TMODULO+snn; if(DD[sslot].retValue ) { //chState[slot] = 0; ??? error_code = 2; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : Process()" << dolog; break; } if(DD[sslot].numHits >0 ) { // spectra and traces PostProcess(sslot); // fill the output buffer if( SetOutput(sslot) ) { //chState[slot] = 0; ??? error_code = 3; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : SetOutput()" << dolog; break; } // Reopen fFrameIO by forcing it to BaseFrameIO::kRunning if (fFrameIO.GetStatus() == BaseFrameIO::kIdle ) fFrameIO.SetStatus(BaseFrameIO::kRunning); if( !fFrameIO.Record() ) { //chState[slot] = 0; ??? error_code = 4; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : Record()" << dolog; break; } evcO++; } DD[sslot].numHits = 0; } { boost::mutex::scoped_lock lk(pslot->Mutex); pslot->State = 0; } } if(error_code) { //chState[slot] = 0; // ?? break; } } fFrameIO.Detach(&in, &out); nevs = mCount - nevs; LOCK_COUT; cout << setw(2) << crystal_id << setw(24) << left << "-PSAFilter:" << right << " " << setw(5) << nevs << " evts (" << setw(8) << fOublock.GetSize() << ")" << " Tot = " << setw(7) << mCount << " " << setw(4) << mRate.rate(mCount) << "/s" << endl; return error_code; } #else // TCOUNT > 1 // this is the non-threaded version UInt_t PSAFilter::ProcessBlock (ADF::FrameBlock &in, ADF::FrameBlock &out) { // attach the input/output buffer to the FrameIO system fFrameIO.Attach(&in,&out); // start the processing UInt_t error_code = 0; UInt_t nevs = mCount; if(mCount == 0) mRate.reset(); while ( fFrameIO.Notify() ) { // fill local variables with data from the input if( SetInput() ) { error_code = 1; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : SetInput()" << dolog; break; } // process the input buffer if( Process() ) { error_code = 2; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : Process()" << dolog; break; } if(DD[0].numHits < 1) continue; // spectra and traces PostProcess(); // fill the output buffer if( SetOutput() ) { error_code = 3; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : SetOutput()" << dolog; break; } // ok, so send the produced frame to the ouput if( !fFrameIO.Record() ) { error_code = 4; LOCK_COUT; Log.GetProcessMethod() = "ProcessBlock"; Log << error << " During : Record()" << dolog; break; } } fFrameIO.Detach(&in, &out); nevs = mCount - nevs; LOCK_COUT; cout << setw(2) << crystal_id << setw(24) << left << "-PSAFilter:" << right << " " << setw(5) << nevs << " evts (" << setw(8) << fOublock.GetSize() << ")" << " Tot = " << setw(7) << mCount << " " << setw(4) << mRate.rate(mCount) << "/s" << endl; return error_code; } #endif // TCOUNT == 1 void PSAFilter::process_reset (UInt_t *error_code) { *error_code = 0; Log.ClearMessage(); Log.GetProcessMethod() = "process_reset"; fFrameIO.Print(Log()); // connect to the frameIO // if ( fNarvalIO ) // { delete fNarvalIO; fNarvalIO = NULL; } // // to get the input/output frameIO // if ( fFrameCrystal ) // { delete fFrameCrystal; fFrameCrystal = NULL; } // if ( fFramePSA ) // { delete fFramePSA; fFramePSA = NULL; } // if ( fTrigger ) // { delete fTrigger; fTrigger = NULL; } Log << dolog; hGroup.write(mCount, true); mCount = 0; } void PSAFilter::process_start (UInt_t *error_code) { Log.GetProcessName() = "PSAFilter"; Log.GetProcessMethod() = "process_start"; Log << info << " Start the inner loop " << dolog; *error_code = 0; } void PSAFilter::process_stop (UInt_t *error_code) { cout << "\nPSAFilter::process_stop called with GetPID() " << GetPID() << endl; *error_code = 0; hGroup.write(mCount, true); mCount = 0; } void PSAFilter::process_pause (UInt_t *error_code) { cout << "\nPSAFilter::process_pause called with GetPID() " << GetPID() << endl; *error_code = 0; } void PSAFilter::process_resume (UInt_t *error_code) { cout << "\nPSAFilter::process_resume called with GetPID() " << GetPID() << endl; *error_code = 0; } /* void PSAFilter::process_unload (UInt_terror_code) { cout << "PSAFilter::process_unload called with GetPID()" << GetPID() << endl; *error_code = 0; } */ #if TCOUNT > 1 void tProcess::operator()() { while(true) { // verify if can proceed if(pslot->State != 1) { boost::mutex::scoped_lock lk(pslot->Mutex); while(pslot->State != 1 ) { pslot->Condition.wait(lk); } } // do the job for(int nnn = 0; nnn < pslot->Count; nnn++) { int sslot = slot*TMODULO + nnn; int retval = ppsa->Process(sslot); ppsa->DD[sslot].retValue = retval; } // notify job done { boost::mutex::scoped_lock lk(pslot->Mutex); pslot->State = 2; } pslot->Condition.notify_one(); } } #endif // TCOUNT > 1