📣 An issue occured with the embedded container registry on October 25 2021, between 10:30 and 12:10 (UTC+2). Any persisting issues should be reported to CC-IN2P3 Support. 🐛

Commit c7537277 authored by Pierre Aubert's avatar Pierre Aubert
Browse files

Add socket manager push pull test ok

parent 80fcd70e
Pipeline #98049 passed with stages
in 7 minutes and 49 seconds
......@@ -2,5 +2,6 @@ project(Phoenix)
cmake_minimum_required(VERSION 2.8)
add_subdirectory(TEST_ZMQ_PUSH_PULL)
add_subdirectory(TEST_PZMQ_SOCKET_MANAGER_PUSH_PULL)
project(Phoenix)
cmake_minimum_required(VERSION 2.8)
add_executable(test_pzmq_socket_manager_push_pull main.cpp)
target_link_libraries(test_pzmq_socket_manager_push_pull phoenix_zmq ${ZMQ_LIBRARY} pthread)
add_test(NAME TestPZmqSocketManagerPushPull
COMMAND ${CHECK_OUTPUT_FILE} ${CMAKE_CURRENT_BINARY_DIR}/test_pzmq_socket_manager_push_pull
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
/***************************************
Auteur : Pierre Aubert
Mail : aubertp7@gmail.com
Licence : CeCILL-C
****************************************/
#include <unistd.h>
#include <fstream>
#include <iostream>
#include <thread>
#include "PZmqSocketManager.h"
#define CONNECTION_PORT 3390
using namespace std;
///Send messages on socket
/** @param nbMessage : number of message to be sent
*/
void threadSendMessage(size_t nbMessage){
cout << "threadSendMessage : send message" << endl;
PZmqSocketManager<std::string> managerServer;
managerServer.addServerSocket("Alice", CONNECTION_PORT, ZMQ_PUSH);
for(size_t i(0lu); i < nbMessage; ++i){
zmq::message_t message(sizeof(size_t));
memcpy(message.data(), &i, sizeof(size_t));
cout << "message n°"<<i<<", Send value " << i << ", message size = " << message.size() << endl;
zmq::send_result_t isSent = managerServer.send("Alice", message, zmq::send_flags::none); //ZMQ_DONTWAIT
if(!isSent){
std::cerr << "threadSendMessage : cannot send message" << std::endl;
return;
}
usleep(500000);
}
cout << "threadSendMessage : stop" << endl;
}
///Recieved messages on socket
/** @param[out] b : true on success, false otherwise
* @param address : address of the socket
* @param threadId : id of the thread
* @param nbMessage : number of message to be read
*/
void threadRecievedMessage(bool & b, const std::string & address, size_t threadId, size_t nbMessage){
b = true;
cout << "threadRecievedMessage "<<threadId<<" : read message to '"<<address<<"'" << endl;
//Init the connection with host
PZmqSocketManager<std::string> managerClient;
managerClient.addClientSocket("Bob", address, CONNECTION_PORT, ZMQ_PULL);
size_t i(0lu);
while(i < nbMessage){
usleep(100000);
cout << "threadRecievedMessage : wait for message" << endl;
zmq::message_t message(sizeof(size_t));
zmq::recv_result_t isRecieved = managerClient.recv("Bob", message, zmq::recv_flags::dontwait); //ZMQ_DONTWAIT
if(!isRecieved){continue;}
size_t value(0lu);
memcpy(&value, message.data(), sizeof(size_t));
cout << "Recv("<<threadId<<") message n°"<<i<<", value = " << value << ", message size = " << message.size() << endl;
if(value != i){
b = false;
}
++i;
}
cout << "threadRecievedMessage : stop" << endl;
}
///Launch the process
/** @return 0 on success, -1 otherwise
*/
int launchProcess(){
size_t nbMessage(10lu);
std::string hostName("localhost");
bool okThread1(true);
std::thread thrSend(threadSendMessage, nbMessage);
std::thread thrRecv0(threadRecievedMessage, std::ref(okThread1), hostName, 0lu, nbMessage);
// std::thread thrRecv1(threadRecievedMessage, hostName, 1lu, nbMessage);
thrRecv0.join();
thrSend.join();
// thrRecv1.join();
std::cout << "launchProcess : okThread1 : " << okThread1 << std::endl;
bool b(okThread1);
return b - 1;
}
int main(int argc, char **argv){
return launchProcess();
}
......@@ -27,6 +27,11 @@ class PZmqSocketManager{
void clear();
zmq::send_result_t send(const T & name, zmq::message_t& msg, zmq::send_flags flags = zmq::send_flags::none);
zmq::recv_result_t recv(const T & name, zmq::message_t& msg, zmq::recv_flags flags = zmq::recv_flags::none);
zmq::socket_t* getSocket(const T & name);
private:
void initialisationPZmqSocketManager();
......
......@@ -66,7 +66,7 @@ bool PZmqSocketManager<T>::addServerSocket(const T & name, size_t port, int type
template<typename T>
void PZmqSocketManager<T>::removeSocket(const T & name){
typename std::map<T, zmq::socket_t*>::iterator it(p_mapZmqSocket.find(name));
if(it != p_mapZmqSocket){
if(it != p_mapZmqSocket.end()){
pzmq_closeServerSocket(it->second);
p_mapZmqSocket.erase(it);
}
......@@ -81,6 +81,52 @@ void PZmqSocketManager<T>::clear(){
p_mapZmqSocket.clear();
}
///Send message on the given socket
/** @param name : name of the socket to be used
* @param msg : message to be sent
* @param flags : flags to be used to send the message (none, dontwait, sndmore, etc)
* @return result of the sending
*/
template<typename T>
zmq::send_result_t PZmqSocketManager<T>::send(const T & name, zmq::message_t& msg, zmq::send_flags flags){
zmq::socket_t* socket = getSocket(name);
if(socket != NULL){
return socket->send(msg, flags);
}else{
return false;
}
}
///Recieve message from the given socket
/** @param name : name of the socket to be used
* @param msg : message to be sent
* @param flags : flags to be used to send the message (none, dontwait, sndmore, etc)
* @return result of the sending
*/
template<typename T>
zmq::recv_result_t PZmqSocketManager<T>::recv(const T & name, zmq::message_t& msg, zmq::recv_flags flags){
zmq::socket_t* socket = getSocket(name);
if(socket != NULL){
return socket->recv(msg, flags);
}else{
return false;
}
}
///Get a socket by name (or key)
/** @param name : of the socket to be used
* @return pointer to the found socket, or NULL if the socket does not exist
*/
template<typename T>
zmq::socket_t* PZmqSocketManager<T>::getSocket(const T & name){
typename std::map<T, zmq::socket_t*>::iterator it(p_mapZmqSocket.find(name));
if(it != p_mapZmqSocket.end()){
return it->second;
}else{
return NULL;
}
}
///Initialisation function of the class PZmqSocketManager
template<typename T>
void PZmqSocketManager<T>::initialisationPZmqSocketManager(){
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment