📣 An issue occured with the embedded container registry on October 25 2021, between 10:30 and 12:10 (UTC+2). Any persisting issues should be reported to CC-IN2P3 Support. 🐛

Commit 80fcd70e authored by Pierre Aubert's avatar Pierre Aubert
Browse files

Add socket manager and update multithread client/server test

parent ba848e90
Pipeline #98043 passed with stages
in 7 minutes and 21 seconds
......@@ -25,14 +25,10 @@ void threadSendMessage(size_t nbMessage){
zmq::context_t context(1);
zmq::socket_t* emiter = pzmq_createServerSocket(context, ZMQ_PUSH, CONNECTION_PORT);
std::ofstream fs;
fs.open("threadSendMessage.txt");
for(size_t i(0lu); i < nbMessage; ++i){
zmq::message_t message(sizeof(size_t));
memcpy(message.data(), &i, sizeof(size_t));
cout << "message n°"<<i<<", Send value " << i << ", message size = " << message.size() << endl;
fs << i << endl;
zmq::send_result_t isSent = emiter->send(message, zmq::send_flags::none); //ZMQ_DONTWAIT
if(!isSent){
......@@ -41,28 +37,23 @@ void threadSendMessage(size_t nbMessage){
}
usleep(500000);
}
fs.close();
cout << "threadSendMessage : stop" << endl;
pzmq_closeServerSocket(emiter);
}
///Recieved messages on socket
/** @param address : address of the socket
/** @param[out] b : true on success, false otherwise
* @param address : address of the socket
* @param threadId : id of the thread
* @param nbMessage : number of message to be read
*/
void threadRecievedMessage(const std::string & address, size_t threadId, size_t nbMessage){
void threadRecievedMessage(bool & b, const std::string & address, size_t threadId, size_t nbMessage){
b = true;
cout << "threadRecievedMessage "<<threadId<<" : read message to '"<<address<<"'" << endl;
//Init the connection with host
zmq::context_t context(1);
zmq::socket_t* reciever = pzmq_createClientSocket(context, ZMQ_PULL, address, CONNECTION_PORT);
std::stringstream outFile;
outFile << "threadRecievedMessage_" << threadId << ".txt";
std::ofstream fs;
fs.open(outFile.str());
size_t i(0lu);
while(i < nbMessage){
usleep(100000);
......@@ -74,12 +65,15 @@ void threadRecievedMessage(const std::string & address, size_t threadId, size_t
size_t value(0lu);
memcpy(&value, message.data(), sizeof(size_t));
cout << "Recv("<<threadId<<") message n°"<<i<<", value = " << value << ", message size = " << message.size() << endl;
fs << value << endl;
if(value != i){
b = false;
}
++i;
}
fs.close();
cout << "threadRecievedMessage : stop" << endl;
pzmq_closeServerSocket(reciever);
}
///Launch the process
......@@ -89,17 +83,19 @@ int launchProcess(){
size_t nbMessage(10lu);
std::string hostName("localhost");
bool okThread1(true);
std::thread thrSend(threadSendMessage, nbMessage);
std::thread thrRecv0(threadRecievedMessage, hostName, 0lu, nbMessage);
std::thread thrRecv0(threadRecievedMessage, std::ref(okThread1), hostName, 0lu, nbMessage);
// std::thread thrRecv1(threadRecievedMessage, hostName, 1lu, nbMessage);
thrRecv0.join();
thrSend.join();
// thrRecv1.join();
return 0;
std::cout << "launchProcess : okThread1 : " << okThread1 << std::endl;
bool b(okThread1);
return b - 1;
}
......
/***************************************
Auteur : Pierre Aubert
Mail : aubertp7@gmail.com
Licence : CeCILL-C
****************************************/
#ifndef __PZMQSOCKETMANAGER_H__
#define __PZMQSOCKETMANAGER_H__
#include <map>
#include "phoenix_zmq.h"
///@brief Socket manager for zmq socket
/** The template describes the type of the key to be used to get the different sockets
*/
template<typename T>
class PZmqSocketManager{
public:
PZmqSocketManager(size_t nbThread = 1lu);
virtual ~PZmqSocketManager();
bool addClientSocket(const T & name, const std::string & address, size_t port, int type = ZMQ_PULL);
bool addServerSocket(const T & name, size_t port, int type = ZMQ_PUSH);
void removeSocket(const T & name);
void clear();
private:
void initialisationPZmqSocketManager();
///Connexion context of the manager
zmq::context_t p_context;
///Map of the zmq sockets to be used by the manager
std::map<T, zmq::socket_t*> p_mapZmqSocket;
};
#include "PZmqSocketManager_impl.h"
#endif
/***************************************
Auteur : Pierre Aubert
Mail : aubertp7@gmail.com
Licence : CeCILL-C
****************************************/
#ifndef __PZMQSOCKETMANAGER_H_IMPL__
#define __PZMQSOCKETMANAGER_H_IMPL__
#include "PZmqSocketManager.h"
///Default constructor of PZmqSocketManager
/** @param nbThread : number of thread to be used for the zmq::context
*/
template<typename T>
PZmqSocketManager<T>::PZmqSocketManager(size_t nbThread)
:p_context(nbThread)
{
initialisationPZmqSocketManager();
}
///Destructor of PZmqSocketManager
template<typename T>
PZmqSocketManager<T>::~PZmqSocketManager(){
clear();
}
///Add a client socket to the manager
/** @param name : name (key) to get the socket
* @param address : address of the server to be connected to
* @param port : port to be used
* @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
* @return true on success, false otherwise
*/
template<typename T>
bool PZmqSocketManager<T>::addClientSocket(const T & name, const std::string & address, size_t port, int type){
zmq::socket_t* socket = pzmq_createClientSocket(p_context, type, address, port);
bool b(socket != NULL);
if(b){
p_mapZmqSocket[name] = socket;
}
return b;
}
///Add a server socket to the manager
/** @param name : name (key) to get the socket
* @param port : port to be used
* @param type : type of the connection (ZMQ_PULL, ZMQ_PUSH, etc)
* @return true on success, false otherwise
*/
template<typename T>
bool PZmqSocketManager<T>::addServerSocket(const T & name, size_t port, int type){
zmq::socket_t* socket = pzmq_createServerSocket(p_context, type, port);
bool b(socket != NULL);
if(b){
p_mapZmqSocket[name] = socket;
}
return b;
}
///Remove the given socket
/** @param name : name (key) of the socket to be removed
*/
template<typename T>
void PZmqSocketManager<T>::removeSocket(const T & name){
typename std::map<T, zmq::socket_t*>::iterator it(p_mapZmqSocket.find(name));
if(it != p_mapZmqSocket){
pzmq_closeServerSocket(it->second);
p_mapZmqSocket.erase(it);
}
}
///Clear the map of socket
template<typename T>
void PZmqSocketManager<T>::clear(){
for(typename std::map<T, zmq::socket_t*>::iterator it(p_mapZmqSocket.begin()); it != p_mapZmqSocket.end(); ++it){
pzmq_closeServerSocket(it->second);
}
p_mapZmqSocket.clear();
}
///Initialisation function of the class PZmqSocketManager
template<typename T>
void PZmqSocketManager<T>::initialisationPZmqSocketManager(){
}
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment