Commit 7dc31c32 authored by Guillaume Baulieu's avatar Guillaume Baulieu
Browse files

The redis cluster address and port is given in the configuration file

parent 992f8ae0
......@@ -45,7 +45,9 @@ fCurrentFileNumber(0u),
fMaxSize(kMaxInt_t),
fMyID(id),
fVertex(nullptr),
dp("134.158.136.19",7000),
dp(unique_ptr<DataPool>(nullptr)),
fRedis_server("unknown"),
fRedis_port(-1),
future_is_running(true),
buffer_map(),
fFramePSAIn(nullptr),
......@@ -87,7 +89,11 @@ void RedisAFC::process_config (const Char_t *directory_path, UInt_t *error_code)
void RedisAFC::retrieve_buffer(){
while(future_is_running){
Optional< pair<string, string> > psa_key = dp.blpop("res_list",5.0f);
if(!dp){
std::this_thread::sleep_for(std::chrono::milliseconds(100));
continue;
}
Optional< pair<string, string> > psa_key = dp->blpop("res_list",5.0f);
if(psa_key){
string strID = (*psa_key).second;
const unsigned long * pID = reinterpret_cast<const unsigned long *>(strID.data());
......@@ -97,9 +103,9 @@ void RedisAFC::retrieve_buffer(){
ss2<<*pID;
vector<int> dim;
buffer_mutex.lock();
dp.get_matrice(ss.str(),dim,buffer_map[ss2.str()],false);
dp->get_matrice(ss.str(),dim,buffer_map[ss2.str()],false);
buffer_mutex.unlock();
dp.del(ss.str());
dp->del(ss.str());
}
}
}
......@@ -298,19 +304,23 @@ void RedisAFC::process_initialise (UInt_t *error_code)
std::ifstream filein(conffile.data());
if ( filein.is_open() == true ) {
std::string pathforfiles, basename;
UInt_t starting_number;
filein >> pathforfiles >> basename >> starting_number;
std::string pathforfiles, basename, redis_server;
UInt_t starting_number, redis_port;
filein >> pathforfiles >> basename >> starting_number >> redis_server >> redis_port;
if ( filein.good() ) {
fPath = pathforfiles;
fBaseForName = basename;
fCurrentFileNumber = starting_number;
fRedis_server = redis_server;
fRedis_port = redis_port;
// GetFrameIO().GetConfAgent()->GetRunAgent()->SetSubRun(starting_number);
}
filein.close();
}
dp = unique_ptr<DataPool>(new DataPool(fRedis_server, fRedis_port));
// open the first file
if ( !NewFile() )
*error_code = 1;
......
......@@ -71,7 +71,9 @@ private:
std::string fCurrentFileName;
std::string fCurrentName;
// Object used for REDIS communications
DataPool dp;
unique_ptr<DataPool> dp;
std::string fRedis_server;
Int_t fRedis_port;
mutex buffer_mutex;
bool future_is_running;
future<void> buffer_reader_future;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment