EventQueue.cpp 6.33 KB
Newer Older
dino's avatar
dino committed
1
#include <iostream>
2
#include <algorithm>
dino's avatar
dino committed
3

4 5 6 7 8
#include "EventQueue.h"
#include "commonDefs.h"

using namespace std;

9 10 11 12
#define TSTDIFF_LM_TYPE 0   // list mode of timestamp differences produced in: 0-addData(...) or 1-moveOut(...)
#define EXCLUDE_TST_ZERO    // discard events with timestamp=0; count them as errors
//#define CHECK_DETAILS       // DEBUG 

13
const bool discardOutOfOrderEvents = true;  // if true, doReorderEvents is redundant
14
const bool doReorderEvents = true; // fix timestamp inversion events while they are inserted into the queue
15

16 17 18 19 20
UInt_t EventQueue::addData(UInt_t * i4dat, UInt_t n4dat, Int_t tstCorr)
{
  if(EodInserted) // no data can be inserted on a closed queue
    return 0;     // in which case the data is simply discarded

21 22 23 24 25 26 27
#ifdef CHECK_DETAILS
  UInt_t wrong = 0;
  UInt_t count1 = countEvents();
#endif
      
  // if not enough room for the new events
  // shift existing eventss to beginning of buffer
28 29 30 31 32 33 34 35 36 37 38
  if(indNext+n4dat >= validSize) {
    if(indFirst && indNext) {
      endCount++;
      int mm = 0;
      for(UInt_t nn = indFirst; nn < indNext; nn++) {
        data[mm++] = data[nn];
      }
      indLast -= indFirst;
      indFirst = 0;
      indNext  = mm;
    }
39 40 41 42 43
#ifdef CHECK_DETAILS
    UInt_t count2 = countEvents();
    if(count2 != count1)
      wrong++;
#endif
44 45
  }

46
  // take new events
47 48 49 50
  UInt_t i4Add = 0;
  UInt_t added = 0;
  while(n4dat > 0) {
    int i4len = i4dat[0]/4;
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84

    // check if frame is valid
    if( i4dat[1] != inpKey ) {
      if( i4dat[1]!=kMetaSync && i4dat[1]!=kMetaEof)
        badKeys++;              // counted as errors if not meta:sync or meta:eof
      i4dat += i4len;           // remove event from input
      n4dat -= i4len;
      continue;
    }

    ULong64_t tstNew = getTstamp(i4dat + 3);

#ifdef EXCLUDE_TST_ZERO
    // check timestamp
    if( tstNew == 0 ) {
      errCount++;             // counted as errors
      i4dat += i4len;         // remove event from input
      n4dat -= i4len;
      continue;
    }
#endif

    // check if enough space to copy event
    if(indNext+i4len >= validSize) {
      break;                  // input queue full
    }

#if TSTDIFF_LM_TYPE == 0
    if(sTstDiff)
      addTstDiff(tstNew);     // LM produced with original time stamp value
#endif

    tstNew += tstCorr;        // apply timestamp correction

85 86

    if (tstNew < tstLast) {
87
      errCount++;             // timestamp inversions counted as errors
88 89 90 91 92 93
      if (discardOutOfOrderEvents) {
        i4dat += i4len;       // remove this event from the input buffer
        n4dat -= i4len;
        continue;
      }
    }
94

95
    if (tstNew >= tstLast || numEvts == 0 || !doReorderEvents) {
96
      // don't care, or no inversion, or queue is empty
97 98
      memcpy(data+indNext, i4dat, i4len*sizeof(UInt_t));
      if(tstCorr)
99 100
        addTstamp(data + indNext + 3, tstCorr);
      indLast = indNext;
101 102
    }
    else {
103 104 105 106 107 108 109
      // insert the event into the right place, of course only respect to the events that are still in the input queue
      UInt_t indInsert = insertPoint(tstNew);
      memmove(data+indInsert+i4len, data+indInsert, (indNext-indInsert)*sizeof(UInt_t)); // overlapping regions
      memcpy(data+indInsert, i4dat, i4len*sizeof(UInt_t));
      if(tstCorr)
        addTstamp(data + indInsert + 3, tstCorr);
      indLast += i4len;
110
    }
111 112 113 114 115 116 117 118

    indNext = indLast + data[indLast]/4;        // updated after each insert
    tstLast = getTstamp(data + indLast + 3);    //  ""
    i4Add  += i4len;
    numEvts++;
    added++;

    i4dat += i4len;   // remove this event from the input buffer
119 120
    n4dat -= i4len;

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
  } // while(n4dat > 0)

#ifdef CHECK_DETAILS
  UInt_t count3 = countEvents();
  if(count3 != count1+added)
    wrong++;
#endif

  if(numEvts > 0) {   // with EXCLUDE_TST_ZERO the input queue could end up being empty 
    maxInputSize = max(maxInputSize, i4Add);
    maxQueueSize = max(maxQueueSize, indNext-indFirst);
   
    tstFirst = getTstamp(data + indFirst + 3);  // in principle this is already in place
    tstLast  = getTstamp(data + indLast  + 3);  // ""
    //evnFirst = getEvnumb(data + indFirst + 2);
    //evnLast  = getEvnumb(data + indLast  + 2);

    if(tstVeryFirst==0)
      tstVeryFirst = tstFirst;
    if(tstLast != 0xFFFFFFFFFFFFFFFFULL)
      tstVeryLast  = tstLast;
  }
143

144 145 146 147 148
  return n4dat;   // how much is still in the input buffer
}

UInt_t EventQueue::moveOut(UInt_t *dest)
{
149
#if TSTDIFF_LM_TYPE != 0
150 151 152 153
  if(sTstDiff)
     addTstDiff(getTstamp(data + indFirst + 3));
#endif

154 155 156 157 158 159
  //UInt_t wrong = 0;
  //UInt_t count1 = countEvents();
  UInt_t size = data[indFirst];
  if(dest)
    memcpy(dest, data+indFirst, size);
  numEvts--;
160 161
  UInt_t size4 = size/4;
  indFirst += size4;
162 163 164 165 166
  if(indFirst >= indNext) {
    indFirst = indLast = indNext = numEvts = 0;
  }
  else {
    tstFirst = getTstamp(data + indFirst + 3);
167
    //evnFirst = getEvnumb(data + indFirst + 2);
168 169 170 171
  }
  //UInt_t count2 = countEvents();
  //if(count2 != count1-1) 
  //  wrong++;
172
  return size4;
173 174
}

175 176 177 178 179 180 181 182 183 184 185 186
UInt_t EventQueue::countEvents()
{
  UInt_t nevs = 0;
  UInt_t ipos = indFirst;
  while(ipos < indNext) {
    nevs++;
    UInt_t size = data[ipos]/4;
    ipos += size;
  }
  return nevs;
}

187 188 189 190 191 192 193 194 195 196 197
UInt_t EventQueue::insertPoint(ULong64_t tstNew)
{
  UInt_t ipos = indFirst;
  while(ipos < indNext) {
    if( tstNew < getTstamp(data+ipos+3) )
      break;
    ipos += data[ipos]/4;
  }
  return ipos;
}

198 199 200 201 202
bool EventQueue::setEod()
{
  if(EodInserted)   // already closed
    return false;

203
  UInt_t i4dat[kEodKeySize] = {sizeof(i4dat), 0, 0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF};
204 205
  i4dat[1] = inpKey;

206
  validSize   = kQueueSize;                       // allow full size
207
  UInt_t rest = addData(i4dat, kEodKeySize, 0);   // i4
208
  validSize   = kQueueSize-kEodKeySize;           // back to safety
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
  //if(rest) {
  //  EodInserted = false; // just to be able to set a breakpoint
  //}
  EodInserted = true;

  return true;
}

void EventQueue::Error(int ich)
{
  errCount++;
  UInt_t evcount = 0;
  UInt_t ind = indFirst;
  while(ind < indNext) {
    evcount++;
    ind += data[ind]/4;
  } 

  LOCK_COUT;
  cout << "   " << myBuilder << "::ERROR  "  << errCount << "  Flushing " 
       << evcount << " events ( " << usedSize() << " words ) in chain " << ich << endl;
  indFirst = indLast = indNext = numEvts = 0;
}