// // This is femul (previously named WinCtest), a Narval emulator written as a standalone program // to help developing and debugging the AGATA data acquisition and analysis (replay) software. // // The development started in ~2008, inspired by Joa's C_Test.c (which uses the .so libraries via dlopen, dlsym,...) // and has evolved towards an almost complete emulation of the Narval environment. // The main difference/limitation with the Narval implementation is the use of a single process on a single computer. // To speed-up execution in multi-core computers it is possible to run parallel threads // - in the PSAFilter library using boost:thread or std::future if the compiler upports the c++11 standard // - in the driver program (using boost::thread or std::thread) (if #define WCT_THREADED in ChainLocker.h). // // All libraries used so far in Agata Demonstrator experiments and analyses are available, using the same source code; // indeed, a large part of that code has been developed and/or tested by various contributors using this emulator. // // A C++ implementation of the EventBuilder replaces that of Narval (which is written in Ada). // // The program is supposed to be multi-platform and has been tested in Windows, [Cygwin??] and GNU/Linux. // // Dino Bazzacco 2008-2014 // #include #include #include #include #include #include #include #include #include #if OS_TYPE != OS_WINDOWS # include #endif #define FEMBUF_MINSIZE 1024 // Minimum size (bytes) of the data buffers connecting the actors #define FEMBUF_DEFSIZE (10*1024*1024) // Default size (bytes) of the data buffers connecting the actors #define FEMBUF_MAXSIZE (30*1024*1024) // Maximum size (bytes) of the data buffers connecting the actors #include "commonDefs.h" #include "Misc.h" #include "ChainOfActors.h" #include "ChainLocker.h" // WCT_THREADED is defined in ChainLocker.h //enum EActor { NONE, BUILDER, PRODUCER, FILTER, CONSUMER, DISPATCHER }; // THIS IS NOW DEFINED IN ChainOfActors.h std::string NActor[] = {"NONE ", "BUILDER ", "PRODUCER ", "FILTER ", "CONSUMER ", "DISPATCHER"}; /********************************************************* The processing chains can have the following actors BUILDER EventBuilder (can be derived from NarvalInterface as a PRODUCER) PRODUCER BasicAFP PRODUCER CrystalProducerATCA PRODUCER AncillaryProducerTCP FILTER PreprocessingFilterPSA FILTER PSAFilterGridSearch FILTER PostPSAFilter FILTER AncillaryFilterVME FILTER AncillaryFilterATCA FILTER GlobalFilter FILTER TrackingFilterOFT FILTER TrackingFilterMGT CONSUMER BasicAFC CONSUMER None pseudo-actor to close a chain without output DISPATCHER BuilderName pseudo actor to connect the output of a chain to an input queue of a BUILDER *********************************************************/ // implemented actors // type subtype identifier // // 1 BUILDER #include "EventBuilder.h" // 10 100 // 2 PRODUCER #include "BasicAFP.h" // 20 200 no mother-daughter model #include "CrystalProducer.h" // 21 210 #include "CrystalProducerATCA.h" // 211 #include "AncillaryProducerTCP.h" // 22 220 no mother-daughter model // 3 FILTER #include "PreprocessingFilter.h" // 30 300 #include "PreprocessingFilterPSA.h" // 301 #include "PSAFilter.h" // 31 310 #include "PSAFilterGridSearch.h" // 311 #include "AncillaryFilter.h" // 33 330 must find out why this cannot be included after TrackingFilterMGT.h #include "AncillaryFilterVME.h" // 331 #include "AncillaryFilterATCA.h" // 332 #include "TrackingFilter.h" // 32 320 #include "TrackingFilterOFT.h" // 321 #include "TrackingFilterMGT.h" // 322 #include "PostPSAFilter.h" // 34 340 #ifdef ENABLE_GLOBAL_FILTER #include "GlobalFilter.h" // 35 350 #endif // 4 CONSUMER // None // 40 400 fake, used as last of a chain #include "BasicAFC.h" // 41 410 no mother-daughter model // 5 DISPATCHER femul mechanism to connect to an EventBuilder // BuilderName // 50 destination_chain #ifdef WCT_THREADED std::vector chThread; // pointers of the threads, needed to be able to join them thrLib::thread *ctrlThread; // pointer of the control thread for cyclic actions bool stopControl = false; // to force the control thread to exit bool runThreaded = true; // set it to false (from the command line) to run the unthreaded main loop #else bool runThreaded = false; // will of course run unthreaded #endif #if defined(COUT_LOCKED) thrLib::mutex ioMutex; // mutex for LOCK_COUT #endif // Replacement of Xavier's variable to ask producers to return from process_block even // if the buffer is not yet full. This is just to avoid complaints from the compiler, not // an implementation of the time-out mechanism (which is not very meaningful in off-line). unsigned int library_timeout = 0; // to control verbosity of the LogCollector (in topologyConfNew) #include "CentralLog.h" #include "LogCollector.h" int adfVerbose = 0; // 0=LogMessage::kError 1=LogMessage::kWarning 3=LogMessage::kInfo std::string topologyFile; std::string confPrefix("Conf/"); std::string StopFile("StopFile"); // presence of this file in cwd ends the analysis in an ordered way std::string KillFile("KillFile"); // presence of this file in cwd ends the analysis immediately int maxTurn = 0; // if using threads, the analysis will run for approximately maxTurn seconds int theTurn = 0; // counter of turns int nSIGINT = 0; // count number of SIGINT (CTRL_C) bool stopSig = false; // used to check StopFile bool killSig = false; // used to check KillFile bool listKeys = false; // rudimentary help on the keywords accepted by the various actors double wTstart; double cTstart; time_t tTstart; using namespace std; using namespace ADF; // the specific analysis is recorded here vector theChains; // this part used only during setup struct builder_t { builder_t(std::string name) : pEventBuilder(NULL), theName(name), numInputs(0), indChain(-1) {} EventBuilder *pEventBuilder; std::string theName; int numInputs; int indChain; }; vector theBuilders; void showWhat(); void getparams(int, char **); void numparams(char *, int); void listKeysAndExit(); // call GetParameters(doList=true) and exit bool checkADFpath(); // check presence of ADF.conf int eventLoop1(); // non-threaded int eventLoop2(); // threaded bool topologyRead(std::string); // reads the topology file void topologyPrint(int nn = 0); // listing of the topology bool topologyConfNew(); // process_config+new bool topologyInit(); // process_initialise bool topologyStart(); // process_start bool topologyStop(); // process_stop bool topologyDelete(); // call the destructors of the created objects void threadWorker(int nn); // managment of PRODUCER/BUILDER chains in the threaded version void threadControl(); // the control thread for cyclic printouts and tests bool CheckFile(std::string checkFile, bool remove); // check presence of checkFile and delete it void catch_int(int /*sig_num*/) { nSIGINT++; stopSig = true; if(nSIGINT > 1) killSig = true; if(nSIGINT > 2) // give up immediately if multiple CTRL_C exit(EXIT_FAILURE); signal(SIGINT, catch_int); } int main(int argc, char **argv) { cout << endl; cout << "######################################" << endl; cout << "# Using Narval actors without Narval #" << endl; cout << "######################################" << endl; showWhat(); getparams(argc, argv); if(listKeys) listKeysAndExit(); bool ok; ok = checkADFpath(); if(!ok) exit(EXIT_FAILURE); ok = topologyRead(topologyFile); if(!ok) exit(EXIT_FAILURE); topologyPrint(); ADF::CrystalInterface::kDefaultLength = defTraceLengthRaw; // Because of the static variables used to register and configure the actors in the Narval environment, // a flat emulator that creates multiple instances of the same class in the same process, has 2 options: // 1) call process_config immediately before creating each of the objects so that the constructors can // move the globals to local storage (essentially gConfPath --> fConfPath) // 2) create the objects and call process_config and process_initialise in sequence for all of them; in this // case it is process_initialise that moves gConfPath to fConfPath // In both cases it is not possible to call process_config for all actors and then process_initialise. // The present version follows 1), which is similar to the way of doing things in Narval // Be aware that to change the way of doing things one has to (slightly) modify the actors. // process_config + new ok = topologyConfNew(); if(!ok) exit(EXIT_FAILURE); topologyPrint(); ok = topologyInit(); if(!ok) exit(EXIT_FAILURE); ok = topologyStart(); if(!ok) exit(EXIT_FAILURE); // CNTRL_C can be used to stop execution signal(SIGINT, catch_int); printf("\nAnalysis can be terminated by typing CTRL_C (or kill -2 PID) \n"); //#define DO_IT_AGAIN_LOOP // enable this symbol to test the stop/start loop in main() #ifdef DO_IT_AGAIN_LOOP AGAIN_LOOP: #endif ////////////////////////////////////////////////////////// /////////////// the execution loop(s) //////////////////// ////////////////////////////////////////////////////////// wTstart = get_wall_time(); cTstart = get_cpu_time(); time(&tTstart); string sstart = ctime(&tTstart); sstart = sstart.substr(0, sstart.size()-1); cout << "\n********** START time : " << sstart << endl; CheckFile(StopFile, true); // remove if present CheckFile(KillFile, true); // remove if present nSIGINT = 0; stopSig = false; killSig = false; theTurn = 0; ////////////////// EVENT LOOP ///////////////////////// if(!runThreaded) eventLoop1(); else eventLoop2(); /////////////////////////////////////////////////////// ////////////////////////////////////////////////////////// //////////// end of execution loop /////////////////////// ////////////////////////////////////////////////////////// // Statistics of chains cout << endl; cout << "chain" << setw(12) << "Started" << setw(12) << "Counted" << setw(12) << "DoSleep" << endl; for( size_t ich = 0; ich < theChains.size(); ich++) { cout << setw( 5) << theChains[ich]->myIndex; cout << setw(12) << theChains[ich]->statStarted; cout << setw(12) << theChains[ich]->statCounted; cout << setw(12) << theChains[ich]->statDoSleep; cout << endl; } ok = topologyStop(); if(!ok) exit(EXIT_FAILURE); { double wTstop = get_wall_time(); double cTstop = get_cpu_time(); time_t tTstop; time(&tTstop); double wstot = wTstop - wTstart; double cstot = cTstop - cTstart; string sstop = ctime(&tTstop); sstop = sstop.substr(0, sstop.size()-1); LOCK_COUT; cout << endl << endl; cout << "********** STOP time : " << sstop << endl; cout << "********** " << fixed << setprecision(1) << wstot << " elapsed seconds (" << fixed << setprecision(1) << cstot << " CPU seconds)" << endl; cout << endl; } #ifdef DO_IT_AGAIN_LOOP ok = topologyStart(); if(!ok) exit(EXIT_FAILURE); goto AGAIN_LOOP; #endif topologyDelete(); return 1; } void showWhat() { cout << endl; #if OS_TYPE == OS_WINDOWS cout << "OS is Windows " << endl; #elif OS_TYPE == OS_LINUX cout << "OS is Linux " << endl; #elif OS_TYPE == OS_APPLE cout << "OS is Apple " << endl; #elif OS_TYPE == OS_CYGWIN cout << "OS is Cygwin " << endl; #endif #if THR_TYPE == THR_BOOST cout << "Using boost " << BOOST_VERSION/100000 << "_" << (BOOST_VERSION/100)%1000 << "_" << BOOST_VERSION%100 << endl; #elif THR_TYPE == THR_STD cout << "Using std threads" << endl; #else cout << "Not using threads" << endl; #endif #if defined(TRF_ROOTTREE) || defined(GLF_ROOTTREE) || defined(ANF_ROOTTREE) cout << "Using root " << ROOT_RELEASE << endl; #else cout << "Not using root" << endl; #endif string sadf = ADF::GetPathToConfFile(); sadf += "/ADF.conf"; if( !fileExists(sadf) ) { cout << endl; cout << "File not found: " << sadf << endl; cout << "This file is required to properly initialize the adf library. Please check" << endl; cout << "the definition of ADFPATHCONFFILE in AgataSoftware/include/standalone/Env.h" << endl; exit(EXIT_FAILURE); } else { cout << "Library ADF is " << sadf << endl; } } void getparams(int argc, char *argv[]) { char *cmd = NULL; int ok; if(argc < 2 ) { ok = -1; } else if(!strcmp(argv[1], "-h")|| !strcmp(argv[1], "--help")) { ok = -1; } else if(!strcmp(argv[1], "-k") || !strcmp(argv[1], "--keys")) { listKeys = true; return; } else { topologyFile = argv[1]; ok = (argc > 1) ? 1 : 0; } int argn = 2; while(ok == 1 && argn < argc) { cmd = argv[argn++]; ok = 1; if (!strcmp(cmd, "-h") || !strcmp(cmd, "--help")) { ok = -1; } else if(!strcmp(cmd, "-k") || !strcmp(cmd, "--keys")) { listKeys = true; return; } else if(!strcmp(cmd, "-conf")) { numparams(cmd, argc-argn-1); confPrefix = argv[argn++]; forceTailSlash(confPrefix); } else if(!strcmp(cmd, "-turns")) { numparams(cmd, argc-argn-1); maxTurn = abs(atoi(argv[argn++])); } else if(!strcmp(cmd, "-nothr") || !strcmp(cmd, "-nothreads")) { runThreaded = false; } else if(!strcmp(cmd, "-vadf")) { numparams(cmd, argc-argn-1); adfVerbose = abs(atoi(argv[argn++])); } else { ok = 0; } } if(ok < 1) { if(ok == 0 && argc > 2) cout << "\nInvalid switch " << argv[argn-1] << endl; cout << "\nUsage: " << argv[0] << " TopologyFile [OPTION]..." << endl; cout << " -h print this list and exit" << endl ; cout << " -k print the list of keywords accepted by the actors and exit" << endl ; cout << " -conf conf_dir name of configuration directory [Conf]" << endl ; cout << " -turns nn limit analysis to nn turns (unlimited if <=0) [0]" << endl ; #ifdef WCT_THREADED cout << " -nothr run the non-threaded event loop" << endl ; #endif cout << " -vadf nn adf LogMessage level (0=kError, 1=kWarning, 2=kInfo, 3=kOut)" << endl ; cout << " " << endl ; if(argc > 1 || ok < 0) exit(EXIT_FAILURE); } // echo the command line time_t ltime; time(<ime); cout << endl; cout << "# " << ctime(<ime); cout << "#"; for(int nn = 0; nn < argc; nn++) cout << " " << argv[nn]; cout << endl << endl; cout << "Topology file " << topologyFile << endl; cout << "Configuration directory " << confPrefix << endl; if(maxTurn > 0) cout << "Analysis limited to " << maxTurn << (runThreaded ? " seconds" : " turns") << endl; #ifdef WCT_THREADED if(!runThreaded) cout << "Non-threaded main loop " << endl; #endif } void numparams(char * cmd, int nn) { if(nn >= 0) return; nn = -nn; printf("error decoding command %s ==> %d parameter(s) missing \n", cmd, nn); exit(EXIT_FAILURE); } bool checkADFpath() { bool found = false; string adfpath; if(getenv("ADF_CONF_PATH")) { adfpath = getenv("ADF_CONF_PATH"); forceTailSlash(adfpath); adfpath += "ADF.conf"; found = fileExists(adfpath); } else { cout << "\nThe symbol ADF_CONF_PATH is not set" << endl; cout << "Looking for ADF.conf in cwd ..."; adfpath = "./ADF.conf"; found = fileExists(adfpath); if(found) cout << " found"; } if(found) cout << "\nThe ADF configuration file is " << adfpath << endl; else cout << "\nERROR: The ADF configuration file ADF.conf was not found" << endl; return found; } // read topology file and classify actors as: // EActor::BUILDER // EActor::PRODUCER // EActor::FILTER // EActor::CONSUMER // EActor::DISPATCHER bool topologyRead(string topo) { if(topo.size() < 1) { cout << "Topology file is missing" << endl; return false; } ifstream ifs(topo.c_str()); if( !ifs.good() ) { cout << "Could not open topology file " << topo << endl; return false; } cout << endl << "Reading topology from --> " << topo << endl; bool loopState = false; size_t loopIndValue = 0; size_t loopIndActor = 0; string loopSymbol; vector loopValues; vector loopActors; //LOOP CRY 1R 1G 1B 2R 2G 2B 3R 3G 3B 4R 4G 4B // Chain 3 CRY // Producer BasicAFP 1000000 // Filter PostPSAFilter // Dispatcher EventBuilder //ENDLOOP // //Chain 3 Ancillary //Producer AncillaryProducer 5000000 //Filter AncillaryFilter //Dispatcher EventMerger // //Chain 2 Global/ //Builder EventBuilder 10000000 //Dispatcher EventMerger // //Chain 3 Global/ //Builder EventMerger 5000000 //Filter TrackingFilter //Consumer BasicAFC string line, keyw, data; while(true) { // read new line or get it from stored loop statements loopState &= loopIndValue < loopValues.size(); if(loopState) { line = loopActors.at(loopIndActor++); if(loopIndActor >= loopActors.size()) { loopIndValue++; loopIndActor = 0; } } else { if(!getline(ifs, line)) break; } if(!stringSplit(line, keyw, data)) continue; // empty or comment lines if( stringEq(keyw, "Loop") ) { cout << line << endl; string values, value1, value2; if(!stringSplit(data, loopSymbol, values)) { cout << "Format is: Loop Name Value1 Value2 ..." << endl; return false; } // reset vectors loopValues.clear(); loopActors.clear(); // tokenize values while(stringSplit(values, value1, value2)) { loopValues.push_back(value1); values = value2; } // read and store until EndLoop bool hasEndLoop = false; while(getline(ifs, line)) { if(!stringSplit(line, keyw, data)) continue; // empty or comment lines cout << line << endl; if( stringEq(keyw, "EndLoop") || stringEq(keyw, "End_Loop") ) { hasEndLoop = true; break; } loopActors.push_back(line); } if(!hasEndLoop) { cout << "Missing ENDLOOP statement" << endl; return false; } loopIndValue = 0; loopIndActor = 0; loopState = true; //cout << endl; continue; } if( stringEq(keyw, "Chain") ) { cout << line; string count, conf; if(!stringSplit(data, count, conf)) { cout << "\nFormat is: Chain numActors confDir" << endl; return false; } if(loopState && conf==loopSymbol) { // replace conf with the next entry in loopNames conf = loopValues[loopIndValue]; cout << " ==> " << conf; } cout << endl; int nacts = atoi(count.c_str()); if(nacts < 2) { cout << "There should be at least 2 actors per chain" << endl; return false; } ChainOfActors * pChain = new ChainOfActors( (int)theChains.size() ); string lconf, lsize; stringSplit(conf, lconf, lsize); pChain->confDir = confPrefix+lconf; for(int nn = 0; nn < nacts; ) { actor theActor(nn); // read new line or get it from stored loop statements if(loopState) { line = loopActors[loopIndActor++]; if(loopIndActor >= loopActors.size()) { loopIndValue++; loopIndActor = 0; } } else { if(!getline(ifs, line)) { cout << "Error reading line " << nn << " of chain " << theChains.size() << endl; return false; } } if(!stringSplit(line, keyw, data)) continue; // empty or comment lines cout << line << endl; string lname, lmore; stringSplit(data, lname, lmore); // isolate the extra parts after the name of the library theActor.libName = lname; theActor.libMore = lmore; theActor.libConf = pChain->confDir; theActor.sizeOut = FEMBUF_DEFSIZE; // size set to default value if( stringEq(keyw, "BUILDER") ) { if(nn) { cout << "BUILDER can only be the first actor in a chain" << endl; return false; } theActor.libType = BUILDER; if(lmore.size() > 0) theActor.sizeOut = atoi(lmore.c_str()); if(theActor.sizeOut < FEMBUF_MINSIZE) theActor.sizeOut = FEMBUF_MINSIZE; // not smaller than this if(theActor.sizeOut > FEMBUF_MAXSIZE) theActor.sizeOut = FEMBUF_MAXSIZE; // not bigger than this } else if( stringEq(keyw, "PRODUCER") ) { if(nn) { cout << "PRODUCER can only be the first actor in a chain" << endl; return false; } theActor.libType = PRODUCER; if(lmore.size() > 0) theActor.sizeOut = atoi(lmore.c_str()); if(theActor.sizeOut < FEMBUF_MINSIZE) theActor.sizeOut = FEMBUF_MINSIZE; // not smaller than this if(theActor.sizeOut > FEMBUF_MAXSIZE) theActor.sizeOut = FEMBUF_MAXSIZE; // not bigger than this } else if( stringEq(keyw, "FILTER") ) { if(!nn) { cout << "FILTER cannot be the first actor in a chain" << endl; return false; } theActor.libType = FILTER; } else if( stringEq(keyw, "CONSUMER") ) { if(!nn) { cout << "CONSUMER cannot be the first actor in a chain" << endl; return false; } theActor.libType = CONSUMER; if(lmore.size() > 0) theActor.libConf += lmore; } else if( stringEq(keyw, "DISPATCHER") ) { if(nn != nacts-1) { cout << "DISPATCHER can only be the last actor in a chain" << endl; return false; } theActor.libType = DISPATCHER; if(lmore.size() > 0) theActor.libConf += lmore; } else { cout << "Bad keyword in " << data << endl; cout << "Valid keywords are: Builder Producer Filter Consumer Dispatcher" << endl; return false; } theActor.libItem = -1; theActor.nrvPointer = NULL; theActor.evbPointer = NULL; pChain->theActors.push_back(theActor); pChain->ioBuffsSize = max(pChain->ioBuffsSize, theActor.sizeOut); pChain->numActors++; nn++; // actor was read correctly } if(pChain->theActors[nacts-1].libType != CONSUMER && pChain->theActors[nacts-1].libType != DISPATCHER) { cout << "The last item of each chain must be CONSUMER or DISPATCHER" << endl; return false; } pChain->myIndex = (int)theChains.size(); theChains.push_back(pChain); } else { cout << " Invalid keyword: " << keyw << " (here, the only valid one is: Chain)" << endl; return false; } } for(unsigned int nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; for(int na = 0; na < pChain->numActors; na++) { actor *pActor = &pChain->theActors[na]; if(na) pActor->sizeOut = pChain->ioBuffsSize; // in this way it is possible to have a smaller size for producer #if 0 // Tests on the size of the ioBuffers, done in the attempt to avoid buffer overflow in the filters // No more needed because ChainOfActors::Run() gives only 3/4 of the actual size to PRODUCERs and BUILDERs switch (pActor->libType) { case BUILDER : // This test is due to the fact that after a Builder there is often a TrackingFilter // which can produce an ouput buffer size larger than its input pActor->sizeOut = min(pActor->sizeOut, (pChain->ioBuffsSize*8)/10); break; case PRODUCER : // PreprocessingFilter could also produce an ouput larger than the input, but // CrystalProducer already takes care of filling its output buffer only by ~80% // So, keep the value from the topology, if given, or the default FEMBUF_DEFSIZE pActor->sizeOut = min(pActor->sizeOut, (pChain->ioBuffsSize*8)/10); break; } #endif } // loop on actors } // loop on chains return true; } // simple printout of the topology void topologyPrint(int nchains) { if(nchains < 1 || nchains > (int)theChains.size()) nchains = (int)theChains.size(); cout << endl; for(int nc = 0; nc < nchains; nc++) { ChainOfActors * pChain = theChains[nc]; cout << setw(2) << nc << " "; //<< setw(2) << pChain->numActors; cout << " " << pChain->confDir << endl; for(int na = 0; na < pChain->numActors; na++) { actor *pActor = &pChain->theActors[na]; cout << " " << setw( 3) << na; cout << " " << NActor[pActor->libType]; cout << " " << setw( 3) << pActor->libItem; cout << " " << setw(25) << left << pActor->libName << right; if(pActor->libType == DISPATCHER || pActor->libName == "None") { cout << endl; continue; } cout << " " << setw(10) << pActor->sizeOut; if(pActor->libType == BUILDER) cout << " " << pActor->evbPointer; else cout << " " << pActor->nrvPointer; cout << " " << pActor->libConf; cout << endl; } } } // Call GetParameters(doList=true) for the relevant classes to get the list of accepted keys. void listKeysAndExit() { // get rid of adf messages CentralLog::theCentralLog()->GetCurrentLogCollector()->SetLevelFiltering(LogMessage::kError, (unsigned short)(0)); // The first object is not deleted because, when this function is called at the very start of the program, // the next "new" ends-up in a crash. // Apparently AgataConfAgent::gTheGlobalAgent is not reset to NULL by the destructor AncillaryProducerTCP *pa1 = new AncillaryProducerTCP; pa1->GetParameters("", true); //delete pa1; AncillaryFilter *pa2 = new AncillaryFilter; pa2->GetParameters("", true); delete pa2; CrystalProducer *pp1 = new CrystalProducer; pp1->GetParameters("", true); delete pp1; PreprocessingFilter *pf1 = new PreprocessingFilterPSA; pf1->GetParameters("", true); delete pf1; PSAFilter *pf2 = new PSAFilterGridSearch; pf2->GetParameters("", true); delete pf2; // pf2 = new PSAFilterFips; //pf2->GetParameters("", true); //delete pf2; PostPSAFilter *pf3 = new PostPSAFilter; pf3->GetParameters("", true); delete pf3; EventBuilder *pe1 = new EventBuilder("EventBuilder"); pe1->GetParameters("", true); delete pe1; #ifdef ENABLE_GLOBAL_FILTER GlobalFilter *pf4 = new GlobalFilter; pf4->GetParameters("", true); delete pf4; #endif TrackingFilter *pf5 = new TrackingFilter; pf5->GetParameters("", true); delete pf5; exit(EXIT_SUCCESS); } #if 1 // process_config + generation of objects // should change the behaviour to be more compliant to the adf/narval definition // 1) the topology file should only contain the name of the mother class // 2) the actual class should be obtained by the static method MotherClass::process_config(...) // 3) the object should be created after checking which daughter was given // See the example for TrackingFilter // actualClass from configuration bool topologyConfNew() { // Reduce verbosity of LogCollector //CentralLog::theCentralLog()->GetCurrentLogCollector()->SetLevelFiltering(LogMessage::kError, LogMessage::kError); //CentralLog::theCentralLog()->GetCurrentLogCollector()->SetLevelFiltering(LogMessage::kError, LogMessage::kWarning); CentralLog::theCentralLog()->GetCurrentLogCollector()->SetLevelFiltering(LogMessage::kError, (unsigned short)(adfVerbose)); #define IF_ERRORCONFIG(error_config) if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} #define ELSE_UNKNOWNCLASS(nc) else {cout << "Unknown class [ " << nc << " " << nc << " ] " << pActor->libName << endl; return false; } int numChains = (int)theChains.size(); UInt_t error_config = 0; for(int nc = 0; nc < numChains; nc++) { ChainOfActors * pChain = theChains[nc]; cout << "\n-- Calling process_config and instantiating objects for chain " << nc << endl; for(int na = 0; na < pChain->numActors; na++) { cout << "Actor " << nc << " " << na << " ---> "; actor *pActor = &pChain->theActors[na]; bool printmsg = true; switch (pChain->theActors[na].libType) { case BUILDER: //// 1 // 10 for(size_t ib = 0; ib < theBuilders.size(); ib++) { if(pActor->libName == theBuilders[ib]->theName) { cout << pActor->libName << " ..." << endl; if(theBuilders[ib]->indChain >= 0 ) { cout << "There can be at most one " << pActor->libName << endl; return false; } if(theBuilders[ib]->numInputs < 1 ) { cout << pActor->libName << " must have at least one input" << endl; return false; } pActor->libItem = 0; //EventBuilder::process_config(pActor->libConf.c_str(), &error_config); // see comment in EventBuilder::process_config() EventBuilder::process_config((pActor->libConf+"/"+pActor->libName+".conf").data(), &error_config); IF_ERRORCONFIG(error_config); pActor->libName = EventBuilder::gActualClass; pActor->evbPointer = new EventBuilder(pActor->libName, theBuilders[ib]->numInputs); #ifdef EVB_INHERITS_FROM_NARVAL_PRODUCER pActor->nrvPointer = pActor->evbPointer; #endif pActor->libItem = 100; theBuilders[ib]->pEventBuilder = pActor->evbPointer; theBuilders[ib]->indChain = nc; break; } else { if(ib == theBuilders.size()-1) { // This builder has never (or not yet) been invoked by a DISPATCHER cout << "Unknown class [ " << nc << " " << nc << " ] " << pActor->libName << endl; return false; } } } break; case PRODUCER: //// 2 // 20 if(pActor->libName == "BasicAFP") { // no mother-daughter model cout << "BasicAFP ..." << endl; BasicAFP::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->nrvPointer = new BasicAFP(nc); pActor->libItem = 200; } // 21 else if( pActor->libName == "CrystalProducer" || pActor->libName == "CrystalProducerATCA" ) { cout << "CrystalProducer ..." << endl; CrystalProducer::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->libName = CrystalProducer::gActualClass.empty() ? "CrystalProducer" : CrystalProducer::gActualClass; if(pActor->libName == "CrystalProducer") { pActor->nrvPointer = new CrystalProducer; pActor->libItem = 210; } else if(pActor->libName == "CrystalProducerATCA") { pActor->nrvPointer = new CrystalProducerATCA; pActor->libItem = 211; } ELSE_UNKNOWNCLASS(nc); } // 22 else if(pActor->libName == "AncillaryProducerTCP") { // not necessarily derived from ADF::NarvalProducer cout << "AncillaryProducerTCP ..." << endl; AncillaryProducerTCP::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->nrvPointer = new AncillaryProducerTCP; pActor->libItem = 220; } ELSE_UNKNOWNCLASS(nc); break; case FILTER: //// 3 // 30 if( pActor->libName == "PreprocessingFilter" || pActor->libName == "PreprocessingFilterPSA") { cout << "PreprocessingFilter ..." << endl; PreprocessingFilter::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->libName = PreprocessingFilter::gActualClass.empty() ? "PreprocessingFilter" : PreprocessingFilter::gActualClass; if(pActor->libName == "PreprocessingFilter") { pActor->nrvPointer = new PreprocessingFilter; pActor->libItem = 300; } else if(pActor->libName == "PreprocessingFilterPSA") { pActor->nrvPointer = new PreprocessingFilterPSA; pActor->libItem = 301; } ELSE_UNKNOWNCLASS(nc); } // 31 else if( pActor->libName == "PSAFilter" || pActor->libName == "PSAFilterGridSearch") { cout << "PSAFilter ..." << endl; PSAFilter::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->libName = PSAFilter::gActualClass.empty() ? "PSAFilter" : PSAFilter::gActualClass; if(pActor->libName == "PSAFilter") { pActor->nrvPointer = new PSAFilter; pActor->libItem = 310; } else if(pActor->libName == "PSAFilterGridSearch") { pActor->nrvPointer = new PSAFilterGridSearch; pActor->libItem = 311; } } // 32 else if( pActor->libName == "TrackingFilter" || pActor->libName == "TrackingFilterOFT" || pActor->libName == "TrackingFilterMGT" ) { cout << "TrackingFilter ..." << endl; TrackingFilter::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->libName = TrackingFilter::gActualClass.empty() ? "TrackingFilter" : TrackingFilter::gActualClass; // allow also to use the mother class ?? if(pActor->libName == "TrackingFilter") { pActor->nrvPointer = new TrackingFilter; pActor->libItem = 320; } else if(pActor->libName == "TrackingFilterOFT") { pActor->nrvPointer = new TrackingFilterOFT; pActor->libItem = 321; } else if(pActor->libName == "TrackingFilterMGT") { pActor->nrvPointer = new TrackingFilterMGT; pActor->libItem = 322; } ELSE_UNKNOWNCLASS(nc); } // 33 else if( pActor->libName == "AncillaryFilter" || pActor->libName == "AncillaryFilterVME" || pActor->libName == "AncillaryFilterATCA") { cout << "AncillaryFilter ..." << endl; AncillaryFilter::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->libName = AncillaryFilter::gActualClass.empty() ? "AncillaryFilter" : AncillaryFilter::gActualClass; if(pActor->libName == "AncillaryFilter") { pActor->nrvPointer = new AncillaryFilter; pActor->libItem = 330; } else if(pActor->libName == "AncillaryFilterVME") { pActor->nrvPointer = new AncillaryFilterVME; pActor->libItem = 331; } else if(pActor->libName == "AncillaryFilterATCA") { pActor->nrvPointer = new AncillaryFilterATCA; pActor->libItem = 332; } ELSE_UNKNOWNCLASS(nc); } // 34 else if(pActor->libName == "PostPSAFilter") { cout << "PostPSAFilter ..." << endl; PostPSAFilter::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->libName = PostPSAFilter::gActualClass.empty() ? "PostPSAFilter" : PostPSAFilter::gActualClass; if(pActor->libName == "PostPSAFilter") { pActor->nrvPointer = new PostPSAFilter; pActor->libItem = 340; } ELSE_UNKNOWNCLASS(nc); } #ifdef ENABLE_GLOBAL_FILTER // 35 else if(pActor->libName == "GlobalFilter") { cout << "GlobalFilter ..." << endl; GlobalFilter::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->libName = GlobalFilter::gActualClass.empty() ? "GlobalFilter" : GlobalFilter::gActualClass; if(pActor->libName == "GlobalFilter") { pActor->nrvPointer = new GlobalFilter; pActor->libItem = 350; } ELSE_UNKNOWNCLASS(nc); } #endif ELSE_UNKNOWNCLASS(nc); break; case CONSUMER: //// 4 // 40 if(pActor->libName == "None" || pActor->libName == "none") { // not a narval actor cout << "None ---" << endl; pActor->libName = "None"; pActor->nrvPointer = NULL; pActor->libItem = 410; printmsg = false; } // 41 else if(pActor->libName == "BasicAFC") { // no mother-daughter model cout << "BasicAFC ..." << endl; BasicAFC::process_config(pActor->libConf.c_str(), &error_config); IF_ERRORCONFIG(error_config); pActor->nrvPointer = new BasicAFC(nc); pActor->libItem = 400; } ELSE_UNKNOWNCLASS(nc); break; case DISPATCHER: //// 5 // not a narval actor { // check if destination already present int indPresent = -1; for(size_t ib = 0; ib < theBuilders.size(); ib++) { if(pActor->libName == theBuilders[ib]->theName) { indPresent = int(ib); break; } } if(indPresent < 0) { // add a new one builder_t *abuilder = new builder_t(pActor->libName); theBuilders.push_back(abuilder); } } for(size_t ib = 0; ib < theBuilders.size(); ib++) { if(pActor->libName == theBuilders[ib]->theName) { cout << theBuilders[ib]->theName << " ---" << endl; pActor->nrvPointer = NULL; pActor->libItem = Int_t(ib); pChain->indQueue = theBuilders[ib]->numInputs; theBuilders[ib]->numInputs++; printmsg = false; break; } } break; default: cout << "Unknown type for chain " << nc << " actor " << na << endl; return false; } if(printmsg) cout << "Actor " << nc << " " << na << " done" << endl; } pChain->thisLocker = new ChainLocker(nc, runThreaded); // instantiate the (possibly fake) lock for this chain cout << "-- Chain " << nc << " done" << endl; } for(size_t ib = 0; ib < theBuilders.size(); ib++) { if(theBuilders[ib]->indChain < 0) { cout << "Class " << theBuilders[ib]->theName << " has not been instantiated" << endl; return false; } } // Mark presence of a BUILDER and/or a DISPATCHER for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; pChain->hasBuilder = false; pChain->hasDispatcher = false; for(int na = 0; na < pChain->numActors; na++) { actor *pActor = &pChain->theActors[na]; if(pActor->libType == BUILDER) { pChain->hasBuilder = true; } if(pActor->libType == DISPATCHER) { pChain->hasDispatcher = true; } } } // Link the dispatchers to their builder chains for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; if(pChain->hasDispatcher) { // look for its builder actor *pActor = &pChain->theActors[pChain->numActors-1]; for(size_t ib = 0; ib < theBuilders.size(); ib++) { if( pActor->libName == theBuilders[ib]->theName ) { pChain->indBuilder = theBuilders[ib]->indChain; pChain->ptrBuilder = theBuilders[ib]->pEventBuilder; pActor->evbPointer = theBuilders[ib]->pEventBuilder; pChain->pchBuilder = theChains[pChain->indBuilder]; pActor->libItem = theBuilders[ib]->indChain; } } } } // link the builders to their dispatching chains for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; if(pChain->hasBuilder) { for(size_t ic = 0; ic < theChains.size(); ic++) { if(ic != nc) { ChainOfActors * iChain = theChains[ic]; if(iChain->hasDispatcher && iChain->indBuilder==nc) pChain->pchProducers.push_back(iChain); } } } } cout << "\nAll requested objects have been instantiated" << endl; return true; } #else // Actual class from Topology bool topologyConfNew() { // Reduce verbosity of LogCollector //CentralLog::theCentralLog()->GetCurrentLogCollector()->SetLevelFiltering(LogMessage::kError, LogMessage::kError); //CentralLog::theCentralLog()->GetCurrentLogCollector()->SetLevelFiltering(LogMessage::kError, LogMessage::kWarning); CentralLog::theCentralLog()->GetCurrentLogCollector()->SetLevelFiltering(LogMessage::kError, (unsigned short)(adfVerbose)); int numChains = (int)theChains.size(); UInt_t error_config = 0; for(int nc = 0; nc < numChains; nc++) { ChainOfActors * pChain = theChains[nc]; cout << "\n Calling process_config and instantiating objects for chain " << nc << endl; for(int na = 0; na < pChain->numActors; na++) { cout << "Actor " << nc << " " << na << " ---> "; actor *pActor = &pChain->theActors[na]; bool printmsg = true; switch (pChain->theActors[na].libType) { case BUILDER: for(size_t ib = 0; ib < theBuilders.size(); ib++) { if(pActor->libName == theBuilders[ib]->theName) { cout << pActor->libName << " ..." << endl; if(theBuilders[ib]->indChain >= 0 ) { cout << "There can be at most one " << pActor->libName << endl; return false; } if(theBuilders[ib]->numInputs < 1 ) { cout << pActor->libName << " must have at least one input" << endl; return false; } pActor->libItem = 0; //EventBuilder::process_config(pActor->libConf.c_str(), &error_config); // see comment in EventBuilder::process_config() EventBuilder::process_config((pActor->libConf+"/"+pActor->libName+".conf").data(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->evbPointer = new EventBuilder(pActor->libName, theBuilders[ib]->numInputs); #ifdef EVB_INHERITS_FROM_NARVAL_PRODUCER pActor->nrvPointer = pActor->evbPointer; #endif theBuilders[ib]->pEventBuilder = pActor->evbPointer; theBuilders[ib]->indChain = nc; break; } else { if(ib == theBuilders.size()-1) { // This builder has never (or not yet) been invoked by a DISPATCHER cout << "Unknown class [ " << nc << " " << nc << " ] " << pActor->libName << endl; return false; } } } break; case PRODUCER: if(pActor->libName == "BasicAFP") { cout << "BasicAFP ..." << endl; BasicAFP::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new BasicAFP(nc); pActor->libItem = 0; } else if(pActor->libName == "CrystalProducerATCA") { cout << "CrystalProducerATCA ..." << endl; CrystalProducer::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new CrystalProducerATCA; pActor->libItem = 1; } else if(pActor->libName == "AncillaryProducerTCP") { cout << "AncillaryProducerTCP ..." << endl; AncillaryProducerTCP::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new AncillaryProducerTCP; pActor->libItem = 2; } else { cout << "Unknown class [ " << nc << " " << nc << " ] " << pActor->libName << endl; return false; } break; case FILTER: if(pActor->libName == "PreprocessingFilterPSA") { cout << "PreprocessingFilterPSA ..." << endl; PreprocessingFilter::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new PreprocessingFilterPSA; pActor->libItem = 0; } else if(pActor->libName == "PSAFilterGridSearch") { cout << "PSAFilterGridSearch ..." << endl; PSAFilter::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new PSAFilterGridSearch; pActor->libItem = 1; } else if(pActor->libName == "TrackingFilterOFT") { cout << "TrackingFilterOFT ..." << endl; TrackingFilter::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new TrackingFilterOFT; pActor->libItem = 2; } else if(pActor->libName == "AncillaryFilterVME") { cout << "AncillaryFilterVME ..." << endl; AncillaryFilter::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new AncillaryFilterVME; pActor->libItem = 3; } else if(pActor->libName == "AncillaryFilterATCA") { cout << "AncillaryFilterATCA ..." << endl; AncillaryFilter::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new AncillaryFilterATCA; pActor->libItem = 4; } else if(pActor->libName == "TrackingFilterMGT") { cout << "TrackingFilterMGT ..." << endl; TrackingFilter::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new TrackingFilterMGT; pActor->libItem = 5; } else if(pActor->libName == "PostPSAFilter") { cout << "PostPSAFilter ..." << endl; PostPSAFilter::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new PostPSAFilter; pActor->libItem = 6; } #ifdef ENABLE_GLOBAL_FILTER else if(pActor->libName == "GlobalFilter") { cout << "GlobalFilter ..." << endl; GlobalFilter::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new GlobalFilter; pActor->libItem = 6; } else { cout << "Unknown class [ " << nc << " " << nc << " ] " << pActor->libName << endl; return false; } #endif break; case CONSUMER: if(pActor->libName == "None" || pActor->libName == "none") { cout << "None ---" << endl; pActor->libName = "None"; pActor->nrvPointer = NULL; pActor->libItem = 0; printmsg = false; } else if(pActor->libName == "BasicAFC") { cout << "BasicAFC ..." << endl; BasicAFC::process_config(pActor->libConf.c_str(), &error_config); if(error_config) {cout << pActor->libName << "::process_config failed" << endl; return false;} pActor->nrvPointer = new BasicAFC(nc); pActor->libItem = 1; } else { cout << "Unknown class [ " << nc << " " << nc << " ] " << pActor->libName << endl; return false; } break; case DISPATCHER: { // check if destination already present int indPresent = -1; for(size_t ib = 0; ib < theBuilders.size(); ib++) { if(pActor->libName == theBuilders[ib]->theName) { indPresent = int(ib); break; } } if(indPresent < 0) { // add a new one builder_t *abuilder = new builder_t(pActor->libName); theBuilders.push_back(abuilder); } } for(size_t ib = 0; ib < theBuilders.size(); ib++) { if(pActor->libName == theBuilders[ib]->theName) { cout << theBuilders[ib]->theName << " ---" << endl; pActor->nrvPointer = NULL; pActor->libItem = Int_t(ib); pChain->indQueue = theBuilders[ib]->numInputs; theBuilders[ib]->numInputs++; printmsg = false; break; } } break; default: cout << "Unknown type for chain " << nc << " actor " << na << endl; return false; } if(printmsg) cout << "Actor " << nc << " " << na << " done" << endl; } pChain->thisLocker = new ChainLocker(nc, runThreaded); // instantiate the (possibly fake) lock for this chain cout << "Chain " << nc << " done" << endl; } for(size_t ib = 0; ib < theBuilders.size(); ib++) { if(theBuilders[ib]->indChain < 0) { cout << "Class " << theBuilders[ib]->theName << " has not been instantiated" << endl; return false; } } // Mark presence of a BUILDER and/or a DISPATCHER for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; pChain->hasBuilder = false; pChain->hasDispatcher = false; for(int na = 0; na < pChain->numActors; na++) { actor *pActor = &pChain->theActors[na]; if(pActor->libType == BUILDER) { pChain->hasBuilder = true; } if(pActor->libType == DISPATCHER) { pChain->hasDispatcher = true; } } } // Link the dispatchers to their builder chains for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; if(pChain->hasDispatcher) { // look for its builder actor *pActor = &pChain->theActors[pChain->numActors-1]; for(size_t ib = 0; ib < theBuilders.size(); ib++) { if( pActor->libName == theBuilders[ib]->theName ) { pChain->indBuilder = theBuilders[ib]->indChain; pChain->ptrBuilder = theBuilders[ib]->pEventBuilder; pActor->evbPointer = theBuilders[ib]->pEventBuilder; pChain->pchBuilder = theChains[pChain->indBuilder];; } } } } // link the builders to their dispatching chains for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; if(pChain->hasBuilder) { for(size_t ic = 0; ic < theChains.size(); ic++) { if(ic != nc) { ChainOfActors * iChain = theChains[ic]; if(iChain->hasDispatcher && iChain->indBuilder==nc) pChain->pchProducers.push_back(iChain); } } } } cout << "\nAll requested objects have been instantiated" << endl; return true; } #endif // process_initialise bool topologyInit() { unsigned int error_config; unsigned int error_init; for(unsigned int nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; cout << "\n INITIALIZING CHAIN " << nc << endl; bool printmsg = true; for(int na = 0; na < pChain->numActors; na++) { error_config = 0; error_init = 0; actor *pActor = &pChain->theActors[na]; if(!pActor->nrvPointer && !pActor->evbPointer) continue; printmsg = true; switch (pActor->libType) { case BUILDER : if(!pActor->evbPointer) { error_config = 100; break; } pActor->evbPointer->process_initialise(&error_init); // add info about the feeding chains; for(size_t np = 0; np < pChain->pchProducers.size(); np++) { pActor->evbPointer->setChain(int(np), pChain->pchProducers[np]->thisLocker); } break; case PRODUCER : case FILTER : if(!pActor->nrvPointer) { error_config = 100; break; } pActor->nrvPointer->process_initialise(&error_init); break; case CONSUMER : if(pActor->nrvPointer) { // initialize only real consumer; do nothing for None pActor->nrvPointer->process_initialise(&error_init); } else printmsg = false; break; case DISPATCHER : printmsg = false; break; // Unknown default: cout << "Unknown type for chain " << nc << " actor " << na << endl; return false; } if(error_config) { cout << "Bad pointer for "<< pActor->libName << " in chain " << nc << endl; return false; } if(error_init) { cout << "Could not initialise "<< pActor->libName << " for chain " << nc << " ==> Returned error is " << error_init << endl; return false; } if(printmsg) cout << endl << pActor->libName << "->process_initialise ... done." << endl; } pChain->ioBuffs[0] = new char [pChain->ioBuffsSize]; pChain->ioBuffs[1] = new char [pChain->ioBuffsSize]; memset(pChain->ioBuffs[0], 0, pChain->ioBuffsSize*sizeof(char)); memset(pChain->ioBuffs[1], 0, pChain->ioBuffsSize*sizeof(char)); } return true; } bool topologyStart() { UInt_t error_code; uint32_t e_code; for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; cout << "\n STARTING CHAIN " << nc << endl; pChain->runThreaded = runThreaded; bool printmsg = true; for(int na = 0; na < pChain->numActors; na++) { error_code = 0; e_code = 0; actor *pActor = &pChain->theActors[na]; if(!pActor->nrvPointer && !pActor->evbPointer) continue; printmsg = true; switch (pActor->libType) { case BUILDER : pActor->evbPointer->process_start(&error_code); break; case PRODUCER : case FILTER : case CONSUMER : if(pActor->nrvPointer) pActor->nrvPointer->process_start(&error_code); else printmsg = false; break; case DISPATCHER : break; default: cout << "Unknown type for chain " << nc << " actor " << na << endl; return false; } if(error_code) { cout << "Could not start "<< pActor->libName << " for chain " << nc << " ==> Returned error is " << error_code << endl; return false; } else { if(printmsg) cout << pActor->libName << "->process_start ... done." << endl; } } // loop on the actors of a chain } // loop on the chains return true; } bool topologyStop() { UInt_t error_code; uint32_t e_code; for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; cout << "\n FINISHING CHAIN " << nc << endl; bool printmsg = true; for(int na = 0; na < pChain->numActors; na++) { error_code = 0; e_code = 0; actor *pActor = &pChain->theActors[na]; if(!pActor->nrvPointer && !pActor->evbPointer) continue; printmsg = true; switch (pActor->libType) { case BUILDER : pActor->evbPointer->process_stop(&error_code); break; case PRODUCER : case FILTER : case CONSUMER : if(pActor->nrvPointer) pActor->nrvPointer->process_stop(&error_code); else printmsg = false; break; case DISPATCHER : printmsg = false; break; default: cout << "Unknown type for chain " << nc << " actor " << na << endl; return false; } if(error_code) { cout << "Could not stop "<< pActor->libName << " for chain " << nc << " ==> Returned error is " << error_code << endl; return false; } else { if(printmsg) cout << pActor->libName << "->process_stop ... done." << endl; } } } return true; } bool topologyDelete() { for(size_t nc = 0; nc < theChains.size(); nc++) { ChainOfActors * pChain = theChains[nc]; cout << "\n DELETING CHAIN " << nc << endl; for(int na = 0; na < pChain->numActors; na++) { actor *pActor = &pChain->theActors[na]; if(!pActor->nrvPointer && !pActor->evbPointer) continue; bool printmsg = true; switch (pActor->libType) { case BUILDER : delete pActor->evbPointer; break; case PRODUCER : case FILTER : case CONSUMER : if(pActor->nrvPointer) { delete pActor->nrvPointer; pActor->nrvPointer = NULL; } else printmsg = false; break; case DISPATCHER : printmsg = false; break; default: cout << "Unknown type for chain " << nc << " actor " << na << endl; return false; } if(printmsg) cout << pActor->libName << " delete ... done." << endl; } } return true; } // The non threaded version calls the chains in turn until all of them are done int eventLoop1() { int numChains = (int)theChains.size(); for(; ; theTurn++) { int stot = (int)(get_wall_time() - wTstart); { LOCK_COUT; cout << "\n---Turn " << theTurn << " ( " << stot << " s )" << endl; } int nvalid = 0; for(int ich = 0; ich < numChains; ich++) { ChainOfActors *pChain = theChains[ich]; if(!pChain->isClosed) { nvalid++; pChain->Run(); // normal call to a PRODUCER/BUILDER chain } if(!pChain->myStatus) { stopSig = killSig = true; // exit immediately if the chain returned an error state. break; } } // for(int ich = 0; ich < numChains; ich++) { if(killSig || stopSig) break; if(nvalid==0) { { LOCK_COUT; DEBUG_LINE_N << " ===> GOING TO STOP due to NO QUEUE CALLED " << endl; } stopSig = true; break; } if(maxTurn > 0 && theTurn >= (maxTurn-1) ) { { LOCK_COUT; DEBUG_LINE_N << " ===> GOING TO STOP due to TURNMAX REACHED " << endl; } stopSig = true; break; } if(CheckFile(StopFile, true)) { { LOCK_COUT; DEBUG_LINE_N << " ===> GOING TO STOP due to detection of " << StopFile << endl; } stopSig = true; break; } if(CheckFile(KillFile, true)) { { LOCK_COUT; DEBUG_LINE_N << " ===> GOING TO STOP due to detection of " << KillFile << endl; } killSig = true; break; } } // for(; stopSig < 1; theTurn++) { if(killSig) return 0; { LOCK_COUT; cout << "\n---Ending all PRODUCER chains\n" << endl; } for(int ich = 0; ich < numChains ; ich++) { ChainOfActors * pChain = theChains[ich]; if(!pChain->hasBuilder) { pChain->hasEnded = true; } } { LOCK_COUT; cout << "---Waiting PRODUCER chains to close\n" << endl; } int nvalid = numChains; while(nvalid) { nvalid = 0; for(int ich = 0; ich < numChains; ich++) { ChainOfActors *pChain = theChains[ich]; if(!pChain->isClosed) { nvalid++; pChain->Run(); // normal call to a PRODUCER/BUILDER chain if(!pChain->myStatus) { killSig = true; // exit immediately if the chain returned an error state. return 0; } } } } if(killSig) return 0; { LOCK_COUT; cout << "\n---Waiting BUILDER threads to close\n" << endl; } bool allClosed = false; while(!allClosed) { allClosed = true; for(int ich = 0; ich < numChains ; ich++) { ChainOfActors * pChain = theChains[ich]; if(pChain->hasBuilder) { allClosed &= pChain->isClosed; } } if(!allClosed) { if(CheckFile(KillFile, true)) { killSig = true; return 0; } } } return 0; } bool CheckFile(std::string checkFile, bool remove) { FILE *fp = fopen(checkFile.c_str(), "r"); if(!fp) return false; fclose(fp); if(remove) { #if OS_TYPE == OS_WINDOWS _unlink(checkFile.c_str()); #else unlink(checkFile.c_str()); #endif LOCK_COUT; cout << "\n---" << checkFile << " detected and removed" << endl; } else { LOCK_COUT; cout << "\n---" << checkFile << " detected" << endl; } return true; } #ifndef WCT_THREADED // Defined to make the compiler happy but not called anyway int eventLoop2() { return 0; } #else // Launch all PRODUCER and BUILDER chains in parallel threads // Launch also a control thread, to take care of cyclic actions int eventLoop2() { if(!runThreaded) { DEBUG_LINE_N << " cannot be called if running unthreaded" << endl; return -1; } cout << endl; int numChains = (int)theChains.size(); // launch the threads of all worker chains for(int ich = 0; ich < numChains ; ich++) { chThread.push_back( new thrLib::thread(&threadWorker, ich) ); cout << "STARTED CHAIN " << ich << endl; } cout << endl; // launch a thread for cyclic actions, like checking the presence of StopFile or KillFile stopControl = false; ctrlThread = new thrLib::thread(&threadControl); // join() should be replaced with timed_join(...) to avoid blocking forever in case of // never-exiting threads. The control thread could take up the job of checking this case // join all PRODUCER and BUILDER threads for(int ich = 0; ich < numChains ; ich++) { chThread[ich]->join(); cout << "JOINED CHAIN " << ich << endl; } stopControl = true; // force also the control thread to exit if that has not already happened ctrlThread->join(); cout << "JOINED Control Thread " << endl; return 1; } void threadWorker(int ich) { ChainOfActors *thisChain = theChains[ich]; for(int myTurn = 0; ; myTurn++) { thrSleep(20); thisChain->isActive = true; thisChain->Run(); thisChain->isActive = false; if(!thisChain->myStatus) thisChain->isClosed = true; if(thisChain->isClosed) break; if(killSig) break; } { LOCK_COUT; cout << "---Exiting thread " << setw(2) << ich << endl; } } void threadControl() { int numChains = (int)theChains.size(); while(true) { if(stopControl) { { LOCK_COUT; DEBUG_LINE_N << " ===> GOING TO STOP due to stopControl" << endl; } return; } // execute every ~1 second thrSleep(1000); theTurn++; int stot = (int)(get_wall_time() - wTstart); { LOCK_COUT; cout << "\n---Time " << theTurn << " ( " << stot << " s )" << endl; } if(maxTurn > 0 && theTurn >= (maxTurn-1)) { { LOCK_COUT; DEBUG_LINE_N << " ===> GOING TO STOP due to TIMEMAX REACHED in the control thread " << endl; } stopSig = true; } else { // In Unix the ^C is not captured by catch_int() as defined in main() <== removed // This functionality is replaced by checking the presence of a file named // StopFile (for ordered close-up) or KillFile (for immediate stop) if(CheckFile(StopFile, true)) { stopSig = true; } if(CheckFile(KillFile, true)) { killSig = true; } } if(stopSig || killSig) break; } // while(true) if(killSig) { { LOCK_COUT; cout << "\n---Exiting the control thread\n" << endl; } return; } if(stopSig) { { LOCK_COUT; cout << "\n---Ending all PRODUCER threads\n" << endl; } for(int ich = 0; ich < numChains ; ich++) { ChainOfActors * pChain = theChains[ich]; if(!pChain->hasBuilder) { pChain->hasEnded = true; } } { LOCK_COUT; cout << "---Waiting PRODUCER threads to close\n" << endl; } bool allClosed = false; while(!allClosed) { allClosed = true; for(int ich = 0; ich < numChains ; ich++) { ChainOfActors * pChain = theChains[ich]; if(!pChain->hasBuilder) { allClosed &= pChain->isClosed; } } if(!allClosed) { thrSleep(200); if(CheckFile(KillFile, true)) { killSig = true; break; } } } if(!killSig) { { LOCK_COUT; cout << "\n---Waiting BUILDER threads to close\n" << endl; } allClosed = false; while(!allClosed) { allClosed = true; for(int ich = 0; ich < numChains ; ich++) { ChainOfActors * pChain = theChains[ich]; if(pChain->hasBuilder) { allClosed &= pChain->isClosed; } } if(!allClosed) { thrSleep(200); if(CheckFile(KillFile, true)) { killSig = true; break; } } } } } { LOCK_COUT; cout << "\n---Exiting the control thread\n" << endl; } } #endif // WCT_THREADED