Docker-in-Docker (DinD) capabilities of public runners deactivated. More info

Commit 33554dd8 authored by dino's avatar dino
Browse files

My latest version, not fully tested. Commit done to have it on the repository...

My latest version, not fully tested. Commit done to have it on the repository for others to check recovery

git-svn-id: svn://gal-serv.lnl.infn.it/agata/trunk/narval_emulator@1286 170316e4-aea8-4b27-aad4-0380ec0519c9
parent dfae9289
......@@ -13,13 +13,13 @@ set( CMAKE_MODULE_PATH "." )
################################################################
# Set the off-line version of the actors (no NRV_ONLINE for femul)
#ADD_DEFINITIONS( -DNRV_TYPE=NRV_OFFLINE )
ADD_DEFINITIONS( -DNRV_TYPE=NRV_ONLINE )
ADD_DEFINITIONS( -DNRV_TYPE=NRV_OFFLINE )
#ADD_DEFINITIONS( -DNRV_TYPE=NRV_ONLINE )
################################################################
# Select the threads library as THR_NONE, THR_BOOST or THR_STD
if( 0 ) # 0 to select from command line ( e.g. cmake . -DTHR_TYPE=THR_STD )
set( THR_TYPE THR_BOOST ) # default
if( 1 ) # 0 to select from comm line ( e.g. cmake . -DTHR_TYPE=THR_STD )
set( THR_TYPE THR_STD ) # default
endif()
if( NOT DEFINED THR_TYPE ) # in case default not set and nothing given from command line
......@@ -62,13 +62,27 @@ endif()
################################################################
# Enable root
if( 1 ) # 0 to disable
if( 0 ) # 0 to disable
ADD_DEFINITIONS(-DTRF_ROOTTREE)
FIND_PACKAGE(ROOT)
MESSAGE("Root Dirs: ${ROOT_INCLUDE_DIR} ${ROOT_LIBRARY_DIR}")
MESSAGE("Root Libs: ${ROOT_LIBRARIES}")
endif()
SET(USE_PRISMA 0) # 1 to enable PRISMA
if(USE_PRISMA)
MESSAGE("Using PRISMA")
else()
MESSAGE("NOT Using PRISMA")
endif()
SET(USE_SKSTREAM 0) # 1 to enable skstream (VME readout at LNL)
if(USE_SKSTREAM ) #AND (strequal NRV_TYPE NRV_ONLINE)
MESSAGE("Using skstream")
else()
MESSAGE("NOT Using skstream")
endif()
###################################
###################################
......@@ -94,10 +108,14 @@ ${ACTORS}/filters/Global
${ACTORS}/filters/Tracking ${ACTORS}/filters/Tracking/includeOFT ${ACTORS}/filters/Tracking/includeMGT
${ACTORS}/builders
${AGADIR}/include
${PRISMA}/include
if(USE_PRISMA)
${PRISMA}/include
#endif()
${Boost_INCLUDE_DIR}
${ROOT_INCLUDE_DIR}
##/usr/include/skstream-0.3 /usr/lib/skstream-0.3/include # only for NRV_TYPE=NRV_ONLINE which we don't use in femu;
if(USE_SKSTREAM ) # AND (strequal NRV_TYPE NRV_ONLINE)
/usr/include/skstream-0.3 /usr/lib/skstream-0.3/include # only for NRV_TYPE=NRV_ONLINE, which we don't use in femul
endif()
)
SET(actors_SRCS
......@@ -185,26 +203,28 @@ ${ADFDIR}/BasicAFC.cpp
${ADFDIR}/BasicAFP.cpp
)
SET(PRISMA_SRCS
${PRISMA}/src/prismaManager.cc
${PRISMA}/src/banGate.cc
${PRISMA}/src/banManager.cc
${PRISMA}/src/calCoef.cc
${PRISMA}/src/calManager.cc
${PRISMA}/src/CGaspBuffer.cc
${PRISMA}/src/ConfigurationFile.cc
${PRISMA}/src/detData.cc
${PRISMA}/src/fastSolver.cc
${PRISMA}/src/fullEvent.cc
${PRISMA}/src/massCalculator.cc
${PRISMA}/src/prismaIonCh.cc
${PRISMA}/src/prismaMcp.cc
${PRISMA}/src/prismaPPAC.cc
${PRISMA}/src/prismaSide.cc
${PRISMA}/src/RandomGenerator.cc
${PRISMA}/src/Vectors.cc
${PRISMA}/src/zedCalculator.cc
)
if(USE_PRISMA)
SET(PRISMA_SRCS
${PRISMA}/src/prismaManager.cc
${PRISMA}/src/banGate.cc
${PRISMA}/src/banManager.cc
${PRISMA}/src/calCoef.cc
${PRISMA}/src/calManager.cc
${PRISMA}/src/CGaspBuffer.cc
${PRISMA}/src/ConfigurationFile.cc
${PRISMA}/src/detData.cc
${PRISMA}/src/fastSolver.cc
${PRISMA}/src/fullEvent.cc
${PRISMA}/src/massCalculator.cc
${PRISMA}/src/prismaIonCh.cc
${PRISMA}/src/prismaMcp.cc
${PRISMA}/src/prismaPPAC.cc
${PRISMA}/src/prismaSide.cc
${PRISMA}/src/RandomGenerator.cc
${PRISMA}/src/Vectors.cc
${PRISMA}/src/zedCalculator.cc
)
endif()
SET(femul_SRCS ${FEMDIR}/femul.cpp ${FEMDIR}/ChainOfActors.cpp ${adf_SRCS} ${actors_SRCS} ${PRISMA_SRCS} )
......
......@@ -31,6 +31,12 @@ using namespace ADF;
// flag and returns (allowing the other chains to be run in turn). At the next run of this chain the
// flag is tested and the Dispatcher is called again; this mechanism is repeated until successful
// exchange and only then the chain executes the main body.
//
// 150521
// Introduced a mechanism to dinamically enlarge the output buffers of the filters to be at least
// OBG times larger than the used size of their input buffer.
// The largest values of OBG is 2.86, needed by AncillaryFilterVME for the case of PRISMA data.
// For AGATA crystals the output buffer of PreprocessingFILTER needs a factor of ~1.3
void ChainOfActors::Run()
{
......@@ -48,13 +54,13 @@ void ChainOfActors::Run()
if(hasDispatcher && dispatchFlag) {
Dispatch();
if(dispatchFlag)
return; // not yet done
return; // not yet done
}
// check EOD condition
if(hasEnded) {
DispatchEOD(); // if successful, sets isClosed
return; // in any case, there is nothing more to do
DispatchEOD(); // if successful, sets isClosed
return; // in any case, there is nothing more to do
}
// the dispatch buffer is free and the chain can proceed
......@@ -62,66 +68,59 @@ void ChainOfActors::Run()
dispatchSize = 0;
dispatchFlag = false;
void *bufinp = NULL;
void *bufout = NULL;
unsigned int size_of_input_buffer = 0;
unsigned int size_of_output_buffer = 0;
unsigned int used_of_input_buffer = 0;
unsigned int used_of_output_buffer = 0;
unsigned int error_code = 0;
int ping = 0;
int pong = (ping+1)%2;
bufinp = NULL; // no input buffer for producers
bufout = (void *)ioBuffs[pong]; // the first output buffer
int ping = 0; // input
int pong = 1; // output
bool hasError = false;
for(indActor = 0 ; indActor < numActors; indActor++) {
ptrActor = &theActors[indActor];
size_of_output_buffer = ptrActor->sizeOut;
// PRODUCERS
if(ptrActor->libType == PRODUCER) {
void *bufinp = NULL; // no input buffer for producers
void *bufout = (void *)ioBuffers[pong]; // output buffer is always [pong]
static_cast<NarvalProducer *>(ptrActor->nrvPointer)->process_block(
bufout, (size_of_output_buffer*3)/4, &used_of_output_buffer, // fill only up to 3/4
&error_code);
bufout, ptrActor->sizeOut, &used_of_output_buffer, &error_code); // producers use ptrActor->sizeOut
if(error_code)
hasEnded = true;
if(used_of_output_buffer == 0) {
thrYield(); // fake if non-threaded
break;
}
// swap buffers for the next actor
ping = pong; bufinp = (void *)ioBuffs[ping];
pong = (ping+1)%2; bufout = (void *)ioBuffs[pong];
size_of_input_buffer = used_of_output_buffer;
swap(ping, pong); // swap buffers for the next actor
size_of_input_buffer = used_of_output_buffer; // and remembber the actual size
}
// BUILDERS
else if(ptrActor->libType == BUILDER) {
void *bufinp = NULL; // builders are producers ==> no input buffer
void *bufout = (void *)ioBuffers[pong]; // output buffer is always [pong]
#ifdef EVB_INHERITS_FROM_NARVAL_PRODUCER
static_cast<NarvalProducer *>(ptrActor->nrvPointer)->process_block(
#else
ptrActor->evbPointer->process_block(
#endif
bufout, (size_of_output_buffer*3/4), &used_of_output_buffer, // fill only up to 3/4
&error_code);
bufout, ptrActor->sizeOut, &used_of_output_buffer, &error_code); // builders use ptrActor->sizeOut
if(error_code)
hasEnded = true;
if(used_of_output_buffer == 0) {
thrYield(); // fake if non-threaded
break;
}
// swap buffers for the next actor
ping = pong; bufinp = (void *)ioBuffs[ping];
pong = (ping+1)%2; bufout = (void *)ioBuffs[pong];
size_of_input_buffer = used_of_output_buffer;
swap(ping, pong); // swap buffers for the next actor
size_of_input_buffer = used_of_output_buffer; // and remembber the actual size
}
// FILTERS
else if(ptrActor->libType == FILTER) {
const int OBG = 3; // size increase of output buffer to avoid overruns.
CheckBufferSize(pong, OBG*size_of_input_buffer); // worst case is AncillaryFilterVME which needs ~2.85
void *bufinp = (void *)ioBuffers[ping]; // input buffer is always [ping]
void *bufout = (void *)ioBuffers[pong]; // input buffer is always [pong]
static_cast<NarvalFilter *>(ptrActor->nrvPointer)->process_block(
bufinp, size_of_input_buffer,
bufout, size_of_output_buffer, &used_of_output_buffer,
&error_code);
bufout, ioBufflen[pong], &used_of_output_buffer, &error_code);
dispatchFlag = (used_of_output_buffer > 0);
if(error_code) {
hasError = true;
......@@ -129,41 +128,41 @@ void ChainOfActors::Run()
}
if(used_of_output_buffer == 0)
break;
// swap buffers for the next actor
ping = pong; bufinp = (void *)ioBuffs[ping];
pong = (ping+1)%2; bufout = (void *)ioBuffs[pong];
size_of_input_buffer = used_of_output_buffer;
swap(ping, pong); // swap buffers for the next actor
size_of_input_buffer = used_of_output_buffer; // and remembber the actual size
}
// CONSUMERS
else if(ptrActor->libType == CONSUMER) {
if(!ptrActor->nrvPointer)
continue; // is a "none" fake consumer
void *bufinp = (void *)ioBuffers[ping]; // input buffer is always [ping]
void *bufout = NULL; // no output buffer for consumers
static_cast<NarvalConsumer *>(ptrActor->nrvPointer)->process_block(
bufinp, size_of_input_buffer,
&error_code);
bufinp, size_of_input_buffer, &error_code);
if(error_code) {
hasError = true;
break;
}
// CONSUMERS don't swap the buffers so as their input buffer is ready for the next actor
// CONSUMERS don't swap the buffers ==> their input buffer is ready for the next actor
}
// DISPATCHER
else if(ptrActor->libType == DISPATCHER) {
if(size_of_input_buffer > 0) {
dispatchBuff = bufinp;
dispatchBuff = (void *)ioBuffers[ping];
dispatchSize = size_of_input_buffer;
dispatchFlag = true;
Dispatch();
}
// DISPATCHERS don't swap the buffers, as they are the last in the execution
// DISPATCHERS don't swap the buffers, as they are the last in the execution chain.
// Notice that the ping/pong mechanism cannot be restarted until Dispatch has succeeded
// transfering the buffer (whichever of the two ioBuffs it is) to the exchange object
// transfering the buffer (whichever of the two ioBuffers it is) to the exchange object
break; // if present this is the last actor
}
else
// ERROR
else {
hasError = true;
break;
}
} // for(indActor = 0 ; indActor < numActors; indActor++)
if(hasError) {
......@@ -253,5 +252,29 @@ void ChainOfActors::DoSleep(int mseconds) {
//isSleeping = false;
}
#endif
}
bool ChainOfActors::ResizeBuffer(int nn, int size)
{
if (nn < 0 || nn > 1)
return false;
if (ioBuffers[nn])
delete[] ioBuffers[nn];
if (size <= 0)
size = ioBuffsSize; // use default
//size = // controlli su min max and modulo
ioBuffers[nn] = new (nothrow) char[size];
if (ioBuffers[nn] == NULL)
return false;
ioBufflen[nn] = size;
memset(ioBuffers[nn], 0, ioBufflen[nn] * sizeof(char));
return true;
}
......@@ -31,7 +31,8 @@ public:
int numActors; // number of actors in the chain
std::vector<actor> theActors; // actor: NONE; BUILDER; PRODUCER; FILTER; CONSUMER; DISPATCHER
UInt_t ioBuffsSize; // size, in bytes, of the buffers
char *ioBuffs[2]; // IO buffers. Input/Output type toggles along the execution type
char *ioBuffers[2]; // IO buffers. Input/Output type toggles along the execution type
int ioBufflen[2]; // actual size of the two buffers
bool runThreaded; // chain runs in its own thread
bool hasBuilder; // true if the first actor is a BUILDER (otherwise it is a PRODUCER)
int indBuilder; // builder-chain to which the output of this chain is connected
......@@ -66,12 +67,25 @@ public:
statStarted(0), statCounted(0)
{
ioBuffsSize = 0;
ioBuffs[0] = ioBuffs[1] = NULL;
ioBuffers[0] = ioBuffers[1] = NULL;
ioBufflen[0] = ioBufflen[1] = 0;
}
void Run();
void Dispatch();
void DispatchEOD();
void DoSleep(int mseconds = 10);
bool CheckBufferSize(int nn, int size) {
if (nn < 0 || nn > 1)
return false;
if (size <= ioBufflen[nn])
return true;
return ResizeBuffer(nn, size);
}
bool ResizeBuffer(int nn, int size = 0);
bool ResizeBuffers(int size = 0) {
return ResizeBuffer(0, size) && ResizeBuffer(1, size);
}
};
#endif // CHAINOFACTORS_H_INCLUDED
......@@ -265,7 +265,7 @@ AGAIN_LOOP:
time(&tTstart);
string sstart = ctime(&tTstart); sstart = sstart.substr(0, sstart.size()-1);
cout << "\n********** START time : " << sstart << endl;
cout << "\n********** START time: " << sstart << endl;
CheckFile(StopFile, true); // remove if present
CheckFile(KillFile, true); // remove if present
......@@ -312,7 +312,7 @@ AGAIN_LOOP:
LOCK_COUT;
cout << endl << endl;
cout << "********** STOP time : " << sstop << endl;
cout << "********** STOP time: " << sstop << endl;
cout << "********** " << fixed << setprecision(1) << wstot << " elapsed seconds ("
<< fixed << setprecision(1) << cstot << " CPU seconds)"
<< endl;
......@@ -1498,22 +1498,22 @@ bool topologyInit()
}
if(error_config) {
cout << "Bad pointer for "<< pActor->libName << " in chain " << nc << endl;
cout << "Bad pointer for " << pActor->libName << " in chain " << nc << endl;
return false;
}
if(error_init) {
cout << "Could not initialise "<< pActor->libName << " for chain " << nc << " ==> Returned error is " << error_init << endl;
cout << "Could not initialise " << pActor->libName << " for chain " << nc << " ==> Returned error is " << error_init << endl;
return false;
}
if(printmsg)
cout << endl << pActor->libName << "->process_initialise ... done." << endl;
}
pChain->ioBuffs[0] = new char [pChain->ioBuffsSize];
pChain->ioBuffs[1] = new char [pChain->ioBuffsSize];
if (!pChain->ResizeBuffers()) { // ResizeBuffers() == ResizeBuffers(0) ==> use default size
cout << "Could not intitalize ping/pong buffers for chain " << nc << endl;
return false;
}
memset(pChain->ioBuffs[0], 0, pChain->ioBuffsSize*sizeof(char));
memset(pChain->ioBuffs[1], 0, pChain->ioBuffsSize*sizeof(char));
}
return true;
......@@ -1680,7 +1680,7 @@ int eventLoop1()
for(; ; theTurn++) {
int stot = (int)(get_wall_time() - wTstart);
{ LOCK_COUT; cout << "\n---Turn " << theTurn << " ( " << stot << " s )" << endl; }
{ LOCK_COUT; cout << "\n----Turn " << theTurn << " ( " << stot << " s )" << endl; }
int nvalid = 0;
for(int ich = 0; ich < numChains; ich++) {
......@@ -1725,14 +1725,14 @@ int eventLoop1()
if(killSig)
return 0;
{ LOCK_COUT; cout << "\n---Ending all PRODUCER chains\n" << endl; }
{ LOCK_COUT; cout << "\n----Ending all PRODUCER chains\n" << endl; }
for(int ich = 0; ich < numChains ; ich++) {
ChainOfActors * pChain = theChains[ich];
if(!pChain->hasBuilder) {
pChain->hasEnded = true;
}
}
{ LOCK_COUT; cout << "---Waiting PRODUCER chains to close\n" << endl; }
{ LOCK_COUT; cout << "----Waiting PRODUCER chains to close\n" << endl; }
int nvalid = numChains;
while(nvalid) {
nvalid = 0;
......@@ -1752,7 +1752,7 @@ int eventLoop1()
if(killSig)
return 0;
{ LOCK_COUT; cout << "\n---Waiting BUILDER threads to close\n" << endl; }
{ LOCK_COUT; cout << "\n----Waiting BUILDER threads to close\n" << endl; }
bool allClosed = false;
while(!allClosed) {
allClosed = true;
......@@ -1788,11 +1788,11 @@ bool CheckFile(std::string checkFile, bool remove)
unlink(checkFile.c_str());
#endif
LOCK_COUT;
cout << "\n---" << checkFile << " detected and removed" << endl;
cout << "\n----" << checkFile << " detected and removed" << endl;
}
else {
LOCK_COUT;
cout << "\n---" << checkFile << " detected" << endl;
cout << "\n----" << checkFile << " detected" << endl;
}
return true;
}
......@@ -1864,7 +1864,7 @@ void threadWorker(int ich)
break;
}
{ LOCK_COUT; cout << "---Exiting thread " << setw(2) << ich << endl; }
{ LOCK_COUT; cout << "----Exiting thread " << setw(2) << ich << endl; }
}
void threadControl()
......@@ -1879,7 +1879,7 @@ void threadControl()
thrSleep(1000);
theTurn++;
int stot = (int)(get_wall_time() - wTstart);
{ LOCK_COUT; cout << "\n---Time " << theTurn << " ( " << stot << " s )" << endl; }
{ LOCK_COUT; cout << "\n----Time " << theTurn << " ( " << stot << " s )" << endl; }
if(maxTurn > 0 && theTurn >= (maxTurn-1)) {
{ LOCK_COUT; DEBUG_LINE_N << " ===> GOING TO STOP due to TIMEMAX REACHED in the control thread " << endl; }
......@@ -1902,19 +1902,19 @@ void threadControl()
} // while(true)
if(killSig) {
{ LOCK_COUT; cout << "\n---Exiting the control thread\n" << endl; }
{ LOCK_COUT; cout << "\n----Exiting the control thread\n" << endl; }
return;
}
if(stopSig) {
{ LOCK_COUT; cout << "\n---Ending all PRODUCER threads\n" << endl; }
{ LOCK_COUT; cout << "\n----Ending all PRODUCER threads\n" << endl; }
for(int ich = 0; ich < numChains ; ich++) {
ChainOfActors * pChain = theChains[ich];
if(!pChain->hasBuilder) {
pChain->hasEnded = true;
}
}
{ LOCK_COUT; cout << "---Waiting PRODUCER threads to close\n" << endl; }
{ LOCK_COUT; cout << "----Waiting PRODUCER threads to close\n" << endl; }
bool allClosed = false;
while(!allClosed) {
allClosed = true;
......@@ -1933,7 +1933,7 @@ void threadControl()
}
}
if(!killSig) {
{ LOCK_COUT; cout << "\n---Waiting BUILDER threads to close\n" << endl; }
{ LOCK_COUT; cout << "\n----Waiting BUILDER threads to close\n" << endl; }
allClosed = false;
while(!allClosed) {
allClosed = true;
......@@ -1953,7 +1953,7 @@ void threadControl()
}
}
}
{ LOCK_COUT; cout << "\n---Exiting the control thread\n" << endl; }
{ LOCK_COUT; cout << "\n----Exiting the control thread\n" << endl; }
}
#endif // WCT_THREADED
......
......@@ -150,7 +150,7 @@ void EventBuilder::PrintStat()
LOCK_COUT;
cout << "Statistics of " << fActualClass << " after " << nevCount << " events" << endl;
cout << " # counts good mult";
cout << " [ rest badKey shifts errors maxIsize maxQsize firstTimestamp lastTimeStamp ]" << endl;
cout << " [ rest badKey shifts errors maxIsize maxQsize firstTimestamp lastTimeStamp ]" << endl;
ULong64_t sumMult = 0;
ULong64_t totMult = 0;
for(UInt_t nn = 0; nn < nQueues; nn++) {
......@@ -164,7 +164,7 @@ void EventBuilder::PrintStat()
<< setw(6) << evQue[nn].numEvts << " "
<< setw(6) << evQue[nn].badKeys << " "
<< setw(6) << evQue[nn].endCount << " "
<< setw(6) << evQue[nn].errCount << " "
<< setw(8) << evQue[nn].errCount << " "
<< setw(8) << evQue[nn].maxInputSize << " "
<< setw(8) << evQue[nn].maxQueueSize << " "
<< setw(7) << evQue[nn].tstVeryFirst/tst1second << "."
......@@ -243,7 +243,7 @@ void EventBuilder::Process(void *obuffer, unsigned int osize, unsigned int *used
}
else {
ULong64_t dt = minTstamp - oldTstamp;
EvbSpecRate->incr(nQueues, UInt_t(dt/rQueuesStep) ); // off range placed at the border
EvbSpecRate->incr(nQueues, Int_t(dt/rQueuesStep) ); // off range placed at the border
oldTstamp = minTstamp;
}
}
......@@ -425,25 +425,26 @@ void EventBuilder::Process(void *obuffer, unsigned int osize, unsigned int *used
*error_code = 0;
LOCK_COUT;
cServer.Prompt(-1, oevCount, *used_osize, false);
cout << " tst = " << fixed << setprecision(1) << setw(8) << firstTstamp/double(tst1second);
const int chainsInPromptLine = 6;
if(fVerbose) {
if(nQueues > chainsInPromptLine) cout << "\n ";
cout << " {";
for(UInt_t nn = 0; nn < nQueues; nn++) {
cout << " " << setw(5) << evQueEvts[nn]; // before
}
cout << " |";
if(nQueues > chainsInPromptLine) cout << "\n |";
for(UInt_t nn = 0; nn < nQueues; nn++) {
cout << " " << setw(5) << evQue[nn].numEvts; // after
if (cServer.Prompt(-1, oevCount, *used_osize, false) >= 0 ) {
cout << " tst = " << fixed << setprecision(1) << setw(8) << firstTstamp / double(tst1second);
const int chainsInPromptLine = 6;
if (fVerbose) {
if (nQueues > chainsInPromptLine) cout << "\n ";
cout << " {";
for (UInt_t nn = 0; nn < nQueues; nn++) {
cout << " " << setw(5) << evQueEvts[nn]; // before
}
cout << " |";
if (nQueues > chainsInPromptLine) cout << "\n |";
for (UInt_t nn = 0; nn < nQueues; nn++) {
cout << " " << setw(5) << evQue[nn].numEvts; // after
}
cout << " }";
if (obufFull)
cout << " **";
}
cout << " }";
if(obufFull)
cout << " **";
cout << endl;
}
cout << endl;
if(oevCount > 0)
noBuild1 = noBuild2 = 0;
......@@ -528,7 +529,7 @@ void EventBuilder::CheckEvent_Tstamp()
ULong64_t dt = evQue[nn].hasData() ? evQue[nn].tstFirst-minTstamp : 0;
sTstamps->add( UInt_t(dt) );
if(dt > sumbusR) // only if off sumbus acceptance window
EvbSpecRate->incr(nn, UInt_t(dt/rQueuesStep) ); // off range put at the border
EvbSpecRate->incr(nn, Int_t(dt/rQueuesStep) ); // off range put at the border
}
sTstamps->add(UInt_t( minTstamp &0xFFFFFFFF)); // low part of head-element timestamp
sTstamps->add(UInt_t((minTstamp>>32)&0xFFFFFFFF)); // high part
......
......@@ -10,7 +10,8 @@ using namespace std;
#define EXCLUDE_TST_ZERO // discard events with timestamp=0; count them as errors
//#define CHECK_DETAILS // DEBUG
const bool doReorderEvents = true; // fix timestamp inversion events while they are inserted into the queue
const bool discardOutOfOrderEvents = true; // if true, doReorderEvents is redundant
const bool doReorderEvents = true; // fix timestamp inversion events while they are inserted into the queue
UInt_t EventQueue::addData(UInt_t * i4dat, UInt_t n4dat, Int_t tstCorr)
{
......@@ -81,11 +82,17 @@ UInt_t EventQueue::addData(UInt_t * i4dat, UInt_t n4dat, Int_t tstCorr)
tstNew += tstCorr; // apply timestamp correction
if(tstNew < tstLast)
if (tstNew < tstLast) {
errCount++; // timestamp inversions counted as errors
if (discardOutOfOrderEvents) {
i4dat += i4len; // remove this event from the input buffer
n4dat -= i4len;
continue;
}
}
if(!doReorderEvents || tstNew >= tstLast || numEvts==0) {
// don't care, or no inversion, or queue is empty
if (tstNew >= tstLast || numEvts == 0 || !doReorderEvents) {