/* PSA-actor base class, J. Ljungvall 2008, * based on implementation by Olivier Stezowski * * Modified and maintained by * D.Mengoni * D.Bazzacco * F.Recchia */ #include "PSAFilter.h" #include "AgataKeyFactory.h" #include "AgataFrameFactory.h" #include #include #include #include #include #include "adetParams.h" #include "misc.h" std::string PSAFilter::gMotherClass = "PSAFilter"; std::string PSAFilter::gActualClass; using namespace std; using namespace ADF; const int preTrigger = 10; PSAFilter::PSAFilter() : fFrameCrystal(NULL), fFramePSA(NULL), fTrigger("data:ccrystal") { fOdirPrefix.clear(); fBasisFile.clear(); fPsaXtalkFile.clear(); fTraceLengthPSA = defTraceLengthPSA; fWriteNumTraces = 0; fCoreEnerMin = 10.f; fCoreEnerMax = 50000.f; fPsaMinSegMult = 1; fPsaMaxSegMult = CrystalInterface::kNbSegments; fVerbose = false; fWritePsaHits = false; fEnergyGain = 1.f; fDeadSegment = -1; // no one memset(fTauSlice, 0, sizeof(fTauSlice)); fTauGiven = false; theSlots = NULL; #ifdef PSA_THREADED fPsaCount = TCOUNT; fPsaModulo = TMODULO; theBlocks = NULL; fPsaSlots = fPsaCount*fPsaModulo; #else fPsaCount = 0; fPsaModulo = 1; fPsaSlots = 1; #endif fUseAdaptive = true; fTryTwoHits = false; fCoarseOnly = false; fPsaSegCenter = false; fFullSearch = false; fFitTZero = true; fDistMetric = -1; // unititialized !! fUseMultiHist = true; fPsaMatrXYZR = false; //fBlockIn.SetModeIO(ConfAgent::kRead); //fBlockOut.SetModeIO(ConfAgent::kWrite); // not used since very long time #ifdef PSA_FromGRU PSAxy_Proje = new TH2F("PSAxy_Proje","PSAxy_Proje",128,-50,50,128,-50,50); PSAxz_Proje = new TH2F("PSAxz_Proje","PSAxz_Proje",128,-50,50,128,0,100); PSAyz_Proje = new TH2F("PSAyz_Proje","PSAyz_Proje",128,-50,50,128,0,100); PSASpectraDB = new GSpectra(); PSANetworkRoot = new GNetServerRoot (PSASpectraDB); // GRU-v8 Int_t port = 9094; PSANetworkRoot->SetPort(port); PSASpectraDB->SetfDefaultHostPort(port); PSASpectraDB->SetfDefaultHostName((char*)gSystem->HostName()); PSASpectraDB->AddSpectrum (PSAxy_Proje); PSASpectraDB->AddSpectrum (PSAxz_Proje); PSASpectraDB->AddSpectrum (PSAyz_Proje); PSANetworkRoot->StartGNetServer(false); //UNCOMMENT HERE TO START THE GRU SERVER #endif // PSA_FromGRU } PSAFilter::~PSAFilter() { for(Int_t slot = 0; slot < fPsaSlots; slot++) { theSlots[slot].Destroy(); } #ifdef PSA_FromGRU delete PSAxy_Proje; delete PSAxz_Proje; delete PSAyz_Proje; #endif } void PSAFilter::process_config(const Char_t *directory_path, UInt_t *error_code) { *error_code = 0; cout << gMotherClass + "::process_config() called with directory_path = " << directory_path << endl; // first init narval stuff NarvalInterface::process_config(directory_path, error_code); if( *error_code ) return; // Get name of daughter class from from configuration directory_path/gMotherClass.conf int rv = getKeyFromFile(GetGlobalConfPath() + gMotherClass + ".conf", "ActualClass", gActualClass); if(rv == 2) *error_code = 102; // Fatal error because the configuration file MUST be present } void PSAFilter::process_initialise (UInt_t *error_code) { Log.ClearMessage(); Log.GetProcessName() = gMotherClass; Log.SetProcessMethod("process_initialise"); Log.SetPID(GetPID()); GetParameters(error_code); if(*error_code) { Log << dolog; return; } // to get the input/output frames if( fFrameCrystal ) { delete fFrameCrystal; fFrameCrystal = NULL; } UInt_t rerr = 0; cout << endl; UInt_t defaultLength = ADF::CrystalInterface::kDefaultLength; ADF::CrystalInterface::kDefaultLength = fTraceLengthPSA; // set "default" lenght of traces to 60//80 fFrameCrystal = fTrigger.Add("Agata", "data:ccrystal"); // discard input data if( fFrameCrystal ) { // link named items with local variables. GObject *glob = GetDataPointer(fFrameCrystal)->Global(); glob->LinkItem("CrystalID", &crystal_id); glob->LinkItem("CrystalStatus", &crystal_status); } else rerr |= 8; ADF::CrystalInterface::kDefaultLength = defaultLength; // restore "default" length of traces fFramePSA = fTrigger.SetOutputFrame("Agata", "data:psa"); if(fFramePSA) { // link named items with local variables. GObject *glob = GetDataPointer(fFramePSA)->Global(); glob->LinkItem("CrystalID", &crystal_id); glob->LinkItem("CrystalStatus", &crystal_status); glob->LinkItem("CoreE0", &CoreE0); glob->LinkItem("CoreE1", &CoreE1); glob->LinkItem("CoreT0", &CoreT0); glob->LinkItem("CoreT1", &CoreT1); //glob->Link("Anonymous", &myAno); //bool bb = glob->IsFullyLinked(); } else rerr |= 4; // the trigger is registered if( !fFrameIO.Register(&fTrigger) ) rerr |= 1; if(rerr) { cout << "Trigger definition error " << rerr << " in PSAFilter::process_initialise()" << endl; *error_code = 100 + rerr; Log << dolog; return; } // state set to kIdle so that the data can be treated fFrameIO.SetStatus(BaseFrameIO::kIdle); // generate the interface to the grid search algorithm if(fPsaCount < 1 || fPsaModulo < 1 || fPsaSegCenter) { fPsaCount = 0; fPsaModulo = 1; fPsaSlots = 1; } else { fPsaSlots = fPsaCount*fPsaModulo; } theSlots = new PsaSlot[fPsaSlots]; for(Int_t slot = 0; slot < fPsaSlots; slot++) { theSlots[slot].InitInput(fTraceLengthPSA); } // version-specific initializations *error_code = AlgoSpecificInitialise(); cServer.SetCommandFile(GetConfPath() + gMotherClass + ".live"); #ifdef PSA_MULTIHIST cServer.SetHistGroup(&hGroup); #endif //PSA_MULTIHIST cServer.Start(gMotherClass); #ifdef PSA_THREADED if(fPsaCount > 0) { // launch the threads of the local chains cout << "Grid Search performed using " << fPsaCount << " THREADS with blocks of " << fPsaModulo << " EVENTS" << endl; theBlocks = new PsaBlock[fPsaCount]; for(Int_t slot = 0; slot < fPsaCount ; slot++) { theBlocks[slot].State = 0; // start in the non-running state theBlocks[slot].First = slot*fPsaModulo; // the first slot of this thread theBlocks[slot].Count = 0; // empty theBlocks[slot].Thread = new boost::thread(PsaThread(this, slot)); cout << " LAUNCHED THREAD PsaThread [" << slot << "] " << theBlocks[slot].Thread << endl; } } #endif Log << dolog; } void PSAFilter::GetParameters(UInt_t *error_code) { *error_code = 0; string configFileName = GetConfPath() + gMotherClass + ".conf"; ifstream configfile( configFileName.c_str() ); if(!configfile.good()) { cout << endl << "Error opening " << configFileName << endl; *error_code = 102; return; } cout << endl << gMotherClass + "::GetParameters() reading from --> " << configFileName << endl; string line, keyw, data; bool ok = true; while(getline(configfile, line)) { if(!stringSplit(line, " \t", keyw, data)) continue; // empty or comment lines cout << line; ok = false; if( stringEq(keyw, "ActualClass") ) { ok = data.size() > 0; gActualClass = data; } else if( stringEq(keyw, "SaveDataDir") || stringEq(keyw, "WriteDataDir") || stringEq(keyw, "WriteDataPrefix") ) { ok = data.size() > 0; fOdirPrefix = data; forceTailSlash(fOdirPrefix); } else if( stringEq(keyw, "BasisFile") ) { ok = data.size() > 0; fBasisFile = data; } else if( stringEq(keyw, "XtalkFile") ) { ok = data.size() > 0; fPsaXtalkFile = data; } else if( stringEq(keyw, "NoMultiHist") || stringEq(keyw, "NoLocalSpectra") ) { fUseMultiHist = false; ok = true; } else if( stringEq(keyw, "MatrXYZR") || stringEq(keyw, "XYZRmatr") ) { fPsaMatrXYZR = true; ok = true; } else if( stringEq(keyw, "Verbose") ) { fVerbose = true; ok = true; } else if( stringEq(keyw, "DistanceMetric") ) { ok = 1 == sscanf(data.c_str(), "%f", &fDistMetric); } else if( stringEq(keyw, "GridSearch") || stringEq(keyw, "GridSearchType") ) { if( stringEq(data, "Full") ) { fUseAdaptive = false; fTryTwoHits = false; fCoarseOnly = false; fPsaSegCenter = false; fFullSearch = true; } else if( stringEq(data, "Adaptive") ) { fUseAdaptive = true; fTryTwoHits = false; fCoarseOnly = false; fPsaSegCenter = false; fFullSearch = false; } else if( stringEq(data, "AdaptiveTwoHits") ) { fUseAdaptive = true; fTryTwoHits = true ; fCoarseOnly = false; fPsaSegCenter = false; fFullSearch = false; } else if(stringEq(data, "CoarseOnly") ) { fUseAdaptive = true; fTryTwoHits = false; fCoarseOnly = true; fPsaSegCenter = false; fFullSearch = false; } //else if(stringEq(data, "CoarseOnlyTwoHits") ) { // fUseAdaptive = true; fTryTwoHits = true ; fCoarseOnly = true; fPsaSegCenter = false; fFullSearch = false; //} else if( stringEq(data, "Center") || stringEq(data, "SegCenter") ) { fUseAdaptive = false; fTryTwoHits = false; fCoarseOnly = false; fPsaSegCenter = true; fFullSearch = false; } else { fUseAdaptive = true; fTryTwoHits = false; fCoarseOnly = false; fPsaSegCenter = false; fFullSearch = false; // defaults to Adaptive } ok = true; } else if( stringEq(keyw, "NoFitTZero") ) { fFitTZero = false; ok = true; } else if( stringEq(keyw, "Threads") ) { #ifdef PSA_THREADED ok = 2 == sscanf(data.c_str(), "%d %d", &fPsaCount, &fPsaModulo); #else cout << " --> PSA_THREADED NOT DEFINED --> ignored" << endl; #endif ok = true; } //else if( stringEq(keyw, "TraceLengthPSA") || stringEq(keyw, "TraceLength") ) { // ok = 1 == sscanf(data.c_str(), "%d", &fTraceLengthPSA); //} else if( stringEq(keyw, "WriteTraces") ) { ok = 1 == sscanf(data.c_str(), "%d", &fWriteNumTraces); } else if( stringEq(keyw, "WritePsaHits") ) { fWritePsaHits = true; ok = true; } else if( stringEq(keyw, "EnergyGain") ) { ok = 1 == sscanf(data.c_str(), "%f", &fEnergyGain); } else if( stringEq(keyw, "CoreEnergyGate") ) { ok = 2 == sscanf(data.c_str(), "%f %f", &fCoreEnerMin, &fCoreEnerMax); } else if( stringEq(keyw, "SegmentFoldGate") ) { ok = 2 == sscanf(data.c_str(), "%d %d", &fPsaMinSegMult, &fPsaMaxSegMult); } else if( stringEq(keyw, "DeadSegment") ) { ok = 1 == sscanf(data.c_str(), "%d", &fDeadSegment); if(fDeadSegment >= CrystalInterface::kNbSegments) fDeadSegment = -1; } // this should be taken from the general calibration file of the detector, PreprocessingFilterPSA.conf else if( stringEq(keyw, "TauSlice") ) { ok = 7 == sscanf(data.c_str(), "%f %f %f %f %f %f %f", fTauSlice, fTauSlice+1, fTauSlice+2, fTauSlice+3, fTauSlice+4, fTauSlice+5, fTauSlice+6); fTauGiven = ok; } else { cout << " --> ignored"; ok = true; } if(!ok) { cout << " --> missing argument(s)" << endl; *error_code = 103; return; } cout << endl; } } Int_t PSAFilter::SetInput(int slot) { Frame *frame_in = fFrameCrystal->GetFrame(); CrystalInterface *cdata = GetDataPointer(frame_in); // loads the values from the frame into the ADFObject attached to the frame UInt_t bytes_in = frame_in->Read(); PsaSlot *pSlot = theSlots + slot; pSlot->crystal_id = crystal_id; pSlot->crystal_status = crystal_status; for(UShort_t isg=0; isgGetSegment(isg); pSlot->SegE[isg] = (float)seg->GetE(); seg->GetSignal()->Get(pSlot->fTracesSG[isg], fTraceLengthPSA); } for(UShort_t icc=0; iccGetCore(icc); pSlot->CoreE[icc] = (float)core->GetE(); pSlot->CoreT[icc] = (float)core->GetT(); core->GetSignal()->Get(pSlot->fTracesCC[icc], fTraceLengthPSA); } pSlot->evnumber = ((AgataKey *)frame_in->GetKey())->GetEventNumber(); pSlot->timestamp = ((AgataKey *)frame_in->GetKey())->GetTimeStamp(); pSlot->numNetSegs = 0; pSlot->shiftT0 = 0; pSlot->tCycles = 0; return 0; } Int_t PSAFilter::SetOutput(int slot) { Frame *frame_out = fFramePSA->GetFrame(); PSAInterface *psadata = GetDataPointer(frame_out); frame_out->Reset(); PsaSlot *pSlot = theSlots + slot; if(pSlot->numNetSegs < 1) return 0; // there should be a way to mark two hits in the same segment // e.g. using the PSAHit::fstatus PsaOut_t * pOut = pSlot->PsaOut; for(UInt_t nh = 0; nh < pSlot->numNetSegs; nh++, pOut++) { ADF::PSAHit *pHit = (PSAHit*)psadata->NewHit(); pHit->Reset(); pHit->SetE(pOut->enerSG*pOut->bestFactor); pHit->SetXYZ(pOut->fx1, pOut->fy1, pOut->fz1); pHit->SetT(pOut->dTns); pHit->SetTrapCCe(pOut->eE_CC1); pHit->SetTrapCCh(pOut->hE_CC1); pHit->SetTrapSGe(pOut->eE_SG1); pHit->SetTrapSGh(pOut->hE_SG1); pHit->SetID(pOut->netChargeSeg,0); pHit->SetID(crystal_id,1); if(pOut->bestPoint2 >= 0) { ADF::PSAHit *pHit = (PSAHit*)psadata->NewHit(); pHit->Reset(); pHit->SetE(pOut->enerSG*(1.f-pOut->bestFactor)); pHit->SetXYZ(pOut->fx2, pOut->fy2, pOut->fz2); pHit->SetT(pOut->dTns); pHit->SetTrapCCe(pOut->eE_CC2); pHit->SetTrapCCh(pOut->hE_CC2); pHit->SetTrapSGe(pOut->eE_SG2); pHit->SetTrapSGh(pOut->hE_SG2); pHit->SetID(pOut->netChargeSeg, 0); pHit->SetID(crystal_id,1); } } ((AgataKey *)frame_out->GetKey())->SetEventNumber(pSlot->evnumber); ((AgataKey *)frame_out->GetKey())->SetTimeStamp(pSlot->timestamp); crystal_id = pSlot->crystal_id; crystal_status = pSlot->crystal_status; CoreE0 = pSlot->CoreE[0]; CoreE1 = pSlot->CoreE[1]; CoreT0 = pSlot->CoreT[0]; CoreT1 = pSlot->CoreT[1]; #define ADDPSATIME #ifdef ADDPSATIME // Add the T0 correction as determined by the PSA. // Using a large number of samples (nsamples>20) in PsaFilterGridSearch::FitT0AfterPSA to determine Tzero, // Tgamma-gamma becomes ~10% worse than using only the timing done in PreprocessingFilterPSA. // With (nsamples=10) Tgamma-gamma improves by ~10% but the peaks start getting tails. // A reasonable compromise seems to be (nsamples=15), but the Tgamma-gamma improvement is rather modest. // This matter is not completely clarified and one can decide to ignore the correction by commenting the #define ADDPSATIME. CoreT0 += pSlot->shiftT0; CoreT1 += pSlot->shiftT0; #endif //myAno.Set(crystal_id, 0); //myAno.fRealSize = sizeof(crystal_id); //myAno.Set(pSlot->timestamp, myAno.fRealSize); //myAno.fRealSize += sizeof(pSlot->timestamp); UInt_t bytes_out = frame_out->Write(); return bytes_out ? 0 : 1; } Int_t PSAFilter::Process(int slot, int nslots) { Int_t result = 0; //cout << "WARNING!! Empty Process()" << endl << flush; return result; } //void PSAFilter::process_block( void *input_buffer, UInt_t size_of_input_buffer, // void *output_buffer, UInt_t size_of_output_buffer, // UInt_t *used_size_of_output_buffer, // UInt_t *error_code) //{ // fBlockIn.SetBlock((Char_t *)input_buffer,size_of_input_buffer); // fBlockOut.SetBlock((Char_t *)output_buffer,size_of_output_buffer); // // *error_code = PSAFilter::ProcessBlock(fBlockIn, fBlockOut); // *used_size_of_output_buffer = UInt_t(fBlockOut.GetSize()); //} UInt_t PSAFilter::ProcessBlock (ADF::FrameBlock &in, ADF::FrameBlock &out) { #ifdef PSA_THREADED if(fPsaCount > 0) return ProcessBlockThreads(in, out); else #endif return ProcessBlockNoThreads(in, out); } // In the non-threaded version, the processing is done one event at a time UInt_t PSAFilter::ProcessBlockNoThreads(ADF::FrameBlock &inBlock, ADF::FrameBlock &outBlock) { // attach the input/output buffer to the FrameIO system fFrameIO.Attach(&inBlock, &outBlock); // start the processing UInt_t error_code = 0; UInt_t nevs = cServer.GetCounts(); while ( fFrameIO.Notify() ) { // fill local variables with data from the input if( SetInput() ) { error_code = 1; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : SetInput()" << dolog; break; } // process the input buffer if( Process() ) { error_code = 2; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : Process()" << dolog; break; } if(theSlots[0].numNetSegs < 1) continue; // spectra and traces PostProcess(); // fill the output buffer if( SetOutput() ) { error_code = 3; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : SetOutput()" << dolog; break; } // ok, so send the produced frame to the ouput if( !fFrameIO.Record() ) { error_code = 4; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : Record()" << dolog; break; } } fFrameIO.Detach(&inBlock, &outBlock); nevs = cServer.GetCounts() - nevs; LOCK_COUT; cServer.Prompt(crystal_id, nevs, UInt_t(outBlock.GetSize()) ); return error_code; } #ifdef PSA_THREADED // In the threaded version, fPsaCount blocks of fPsaModulo events are processed in parallel. // The blocks are used (writing-to, processing and reading-from) in a ring-counter mode UInt_t PSAFilter::ProcessBlockThreads(ADF::FrameBlock &inBlock, ADF::FrameBlock &outBlock) { // attach the input/output buffer to the FrameIO system fFrameIO.Attach(&inBlock, &outBlock); // start the processing UInt_t error_code = 0; UInt_t nevs = cServer.GetCounts(); int block = -1; // index of the block being used int evInp = 0; // just for ispection during debug int evOut = 0; bool endOfInput = false; while (!endOfInput) { block = (block + 1) % fPsaCount; PsaBlock *pBlock = theBlocks + block; bool blockHasData = false; { // Lock and check if the block has been activated (pBlock->State > 0) // In case, wait for its thread to finish the job (pBlock->State == 2) boost::mutex::scoped_lock slock(pBlock->Mutex); if(pBlock->State > 0) { blockHasData = true; if(pBlock->State != 2) { // but not yet done while(pBlock->State != 2) { pBlock->Condition.wait(slock); } } } pBlock->State = 0; // having the lock we can safely set it to free } // Now the block is ours if(blockHasData) { // pass content of the block to the output for(UInt_t evn = 0; evn < pBlock->Count; evn++) { int slot = block*fPsaModulo+evn; if(theSlots[slot].retValue) { //chState[block] = 0; ??? error_code = 2; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : Process()" << dolog; break; } if(theSlots[slot].numNetSegs >0) { // spectra and traces PostProcess(slot); // fill the output buffer if( SetOutput(slot) ) { //chState[block] = 0; ??? error_code = 3; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : SetOutput()" << dolog; break; } // ok, so send the produced frame to the ouput if( !fFrameIO.Record() ) { //chState[block] = 0; ??? error_code = 4; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : Record()" << dolog; break; } evOut++; } theSlots[slot].numNetSegs = 0; } } if(error_code) { //chState[block] = 0; // ?? break; } // fill this block with new data from the input pBlock->Count = 0; Int_t evn = 0; for( ; evn < fPsaModulo; evn++) { if(!fFrameIO.Notify() ) { endOfInput = true; break; } int slot = block*fPsaModulo+evn; if( SetInput(slot) ) { error_code = 1; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : SetInput()" << dolog; break; } theSlots[slot].retValue = 0; evInp++; } if(evn < 1) break; // no more data or error pBlock->Count = evn; { // Lock and mark the block as ready to be processed boost::mutex::scoped_lock slock(pBlock->Mutex); pBlock->State = 1; } // tell the thread to proceed pBlock->Condition.notify_one(); } // while(!endOfInput) // Wait for the launched threads to finish // At this point the status of fFrameIO is BaseFrameIO::kIdle and we should no more write on it. // However, the output from the still-running threads is saved by forcing it to BaseFrameIO::kRunning // It would be more efficient to synchronize the unfinished threads at the next call of ProcessBlock // but we do it here so as to keep the correspondence between input and output buffers for(Int_t nrs = 0; nrs < fPsaCount; nrs++) { block = (block + 1) % fPsaCount; PsaBlock *pBlock = theBlocks + block; bool blockHasData = false; { // Lock and check if the block has been activated (pBlock->State > 0) // In case, wait for its thread to finish the job (pBlock->State == 2) boost::mutex::scoped_lock slock(pBlock->Mutex); if(pBlock->State) { blockHasData = true; if(pBlock->State != 2) { // but not yet done while(pBlock->State != 2) { pBlock->Condition.wait(slock); } } } pBlock->State = 0; // having the lock we can safely set it to free } // Now the block is ours if(blockHasData) { // pass content of the block to the output for(UInt_t evn = 0; evn < pBlock->Count; evn++) { int slot = block*fPsaModulo+evn; if(theSlots[slot].retValue ) { //chState[block] = 0; ??? error_code = 2; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : Process()" << dolog; break; } if(theSlots[slot].numNetSegs >0 ) { // spectra and traces PostProcess(slot); // fill the output buffer if( SetOutput(slot) ) { //chState[block] = 0; ??? error_code = 3; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : SetOutput()" << dolog; break; } // Reopen fFrameIO by forcing it to BaseFrameIO::kRunning if (fFrameIO.GetStatus() == BaseFrameIO::kIdle ) fFrameIO.SetStatus(BaseFrameIO::kRunning); if( !fFrameIO.Record() ) { //chState[block] = 0; ??? error_code = 4; LOCK_COUT; Log.SetProcessMethod("ProcessBlock"); Log << error << " During : Record()" << dolog; break; } evOut++; } theSlots[slot].numNetSegs = 0; } pBlock->Count = 0; } if(error_code) { //chState[block] = 0; // ?? break; } } fFrameIO.Detach(&inBlock, &outBlock); nevs = cServer.GetCounts() - nevs; LOCK_COUT; cServer.Prompt(crystal_id, nevs, UInt_t(outBlock.GetSize()) ); return error_code; } #endif // #ifdef PSA_THREADED Int_t PSAFilter::PostProcess(int slot) { PsaSlot *pSlot = theSlots + slot; int numNetSegs = pSlot->numNetSegs; if(numNetSegs < 1) return 0; // give the Algorithm the possibility to perform its own thread-safe PostProcess actions AlgoSpecificPostProcess(slot); #ifdef PSA_FromGRU pOut = pSlot->PsaOut; for(UInt_t nh = 0; nh < pSlot->numNetSegs; nh++, pOut++) { float fx = pOut->fx1; float fy = pOut->fy1; float fz = pOut->fz1; PSAxy_Proje -> Fill(fx,fy); PSAxz_Proje -> Fill(fx,fz); PSAyz_Proje -> Fill(fy,fz); if(pOut->bestPoint2 >= 0) { float fx = pOut->fx2; float fy = pOut->fy2; float fz = pOut->fz2; PSAxy_Proje -> Fill(fx,fy); PSAxz_Proje -> Fill(fx,fz); PSAyz_Proje -> Fill(fy,fz); } } #endif cServer.Exec(pSlot->timestamp); return 0; } void PSAFilter::process_reset (UInt_t *error_code) { *error_code = 0; Log.ClearMessage(); Log.SetProcessMethod("process_reset"); fFrameIO.Print(Log()); cServer.Reset(); Log << dolog; } void PSAFilter::process_start (UInt_t *error_code) { Log.GetProcessName() = "PSAFilter"; Log.SetProcessMethod("process_start"); cServer.Start(gMotherClass); *error_code = 0; Log << dolog; } void PSAFilter::process_stop (UInt_t *error_code) { cout << crystal_id << "-PSAFilter::process_stop called with GetPID() " << GetPID() << endl; *error_code = 0; cServer.Finish(); } #ifdef PSA_THREADED void PsaThread::operator()() { while(true) { { // lock and verify if this thread can proceed (pslot->State == 1) boost::mutex::scoped_lock slock(pBlock->Mutex); if(pBlock->State != 1) { while(pBlock->State != 1 ) { pBlock->Condition.wait(slock); } } } // Do the job: // two ways of calling PSAFilterGridSearch::Process(...) #if 0 // one event at a time for(int evn = 0; evn < pBlock->Count; evn++) { int slot = pBlock->First + evn; int retval = pPsa->Process(slot, 1); pPsa->theSlots[slot].retValue = retval; } #else // only once, exploiting its internal event loop int retval = pPsa->Process(pBlock->First, pBlock->Count); #endif { // Lock and set status as processing finished boost::mutex::scoped_lock slock(pBlock->Mutex); pBlock->State = 2; } // notify it pBlock->Condition.notify_one(); } } #endif // #ifdef PSA_THREADED