📣 An issue occured with the embedded container registry on October 25 2021, between 10:30 and 12:10 (UTC+2). Any persisting issues should be reported to CC-IN2P3 Support. 🐛

Commit d02cd555 authored by Pierre Aubert's avatar Pierre Aubert
Browse files

Add method to send and recieve data directly

parent c7537277
Pipeline #98065 passed with stages
in 8 minutes and 7 seconds
......@@ -3,5 +3,5 @@ cmake_minimum_required(VERSION 2.8)
add_subdirectory(TEST_ZMQ_PUSH_PULL)
add_subdirectory(TEST_PZMQ_SOCKET_MANAGER_PUSH_PULL)
add_subdirectory(TEST_PZMQ_SOCKET_MANAGER_DATA_PUSH_PULL)
project(Phoenix)
cmake_minimum_required(VERSION 2.8)
add_executable(test_pzmq_socket_manager_data_push_pull main.cpp)
target_link_libraries(test_pzmq_socket_manager_data_push_pull phoenix_zmq data_stream ${ZMQ_LIBRARY} pthread)
add_test(NAME TestPZmqSocketManagerDataPushPull
COMMAND ${CHECK_OUTPUT_FILE} ${CMAKE_CURRENT_BINARY_DIR}/test_pzmq_socket_manager_data_push_pull
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
/***************************************
Auteur : Pierre Aubert
Mail : aubertp7@gmail.com
Licence : CeCILL-C
****************************************/
#include <unistd.h>
#include <fstream>
#include <iostream>
#include <thread>
#include "PZmqSocketManager.h"
#define CONNECTION_PORT 3390
using namespace std;
///Send messages on socket
/** @param nbMessage : number of message to be sent
*/
void threadSendMessage(size_t nbMessage){
cout << "threadSendMessage : send message" << endl;
PZmqSocketManager<std::string> managerServer;
managerServer.addServerSocket("Alice", CONNECTION_PORT, ZMQ_PUSH);
for(size_t i(0lu); i < nbMessage; ++i){
cout << "message n°"<<i<<", Send value " << i << endl;
bool isSent = managerServer.send("Alice", i, zmq::send_flags::none); //ZMQ_DONTWAIT
if(!isSent){
std::cerr << "threadSendMessage : cannot send message" << std::endl;
return;
}
usleep(500000);
}
cout << "threadSendMessage : stop" << endl;
}
///Recieved messages on socket
/** @param[out] b : true on success, false otherwise
* @param address : address of the socket
* @param threadId : id of the thread
* @param nbMessage : number of message to be read
*/
void threadRecievedMessage(bool & b, const std::string & address, size_t threadId, size_t nbMessage){
b = true;
cout << "threadRecievedMessage "<<threadId<<" : read message to '"<<address<<"'" << endl;
//Init the connection with host
PZmqSocketManager<std::string> managerClient;
managerClient.addClientSocket("Bob", address, CONNECTION_PORT, ZMQ_PULL);
size_t i(0lu);
while(i < nbMessage){
usleep(100000);
cout << "threadRecievedMessage : wait for message" << endl;
size_t value(0lu);
bool isRecieved = managerClient.recv("Bob", value, zmq::recv_flags::dontwait); //ZMQ_DONTWAIT
if(!isRecieved){continue;}
cout << "Recv("<<threadId<<") message n°"<<i<<", value = " << value << endl;
if(value != i){
b = false;
}
++i;
}
cout << "threadRecievedMessage : stop" << endl;
}
///Launch the process
/** @return 0 on success, -1 otherwise
*/
int launchProcess(){
size_t nbMessage(10lu);
std::string hostName("localhost");
bool okThread1(true);
std::thread thrSend(threadSendMessage, nbMessage);
std::thread thrRecv0(threadRecievedMessage, std::ref(okThread1), hostName, 0lu, nbMessage);
// std::thread thrRecv1(threadRecievedMessage, hostName, 1lu, nbMessage);
thrRecv0.join();
thrSend.join();
// thrRecv1.join();
std::cout << "launchProcess : okThread1 : " << okThread1 << std::endl;
bool b(okThread1);
return b - 1;
}
int main(int argc, char **argv){
return launchProcess();
}
......@@ -2,7 +2,7 @@ project(Phoenix)
cmake_minimum_required(VERSION 2.8)
add_executable(test_pzmq_socket_manager_push_pull main.cpp)
target_link_libraries(test_pzmq_socket_manager_push_pull phoenix_zmq ${ZMQ_LIBRARY} pthread)
target_link_libraries(test_pzmq_socket_manager_push_pull phoenix_zmq data_stream ${ZMQ_LIBRARY} pthread)
add_test(NAME TestPZmqSocketManagerPushPull
COMMAND ${CHECK_OUTPUT_FILE} ${CMAKE_CURRENT_BINARY_DIR}/test_pzmq_socket_manager_push_pull
......
......@@ -30,7 +30,7 @@ void threadSendMessage(size_t nbMessage){
memcpy(message.data(), &i, sizeof(size_t));
cout << "message n°"<<i<<", Send value " << i << ", message size = " << message.size() << endl;
zmq::send_result_t isSent = managerServer.send("Alice", message, zmq::send_flags::none); //ZMQ_DONTWAIT
bool isSent = managerServer.send("Alice", message, zmq::send_flags::none); //ZMQ_DONTWAIT
if(!isSent){
std::cerr << "threadSendMessage : cannot send message" << std::endl;
return;
......@@ -59,7 +59,7 @@ void threadRecievedMessage(bool & b, const std::string & address, size_t threadI
cout << "threadRecievedMessage : wait for message" << endl;
zmq::message_t message(sizeof(size_t));
zmq::recv_result_t isRecieved = managerClient.recv("Bob", message, zmq::recv_flags::dontwait); //ZMQ_DONTWAIT
bool isRecieved = managerClient.recv("Bob", message, zmq::recv_flags::dontwait); //ZMQ_DONTWAIT
if(!isRecieved){continue;}
size_t value(0lu);
memcpy(&value, message.data(), sizeof(size_t));
......
......@@ -2,7 +2,7 @@ project(Phoenix)
cmake_minimum_required(VERSION 2.8)
add_executable(test_pzmq_push_pull main.cpp)
target_link_libraries(test_pzmq_push_pull phoenix_zmq ${ZMQ_LIBRARY} pthread)
target_link_libraries(test_pzmq_push_pull phoenix_zmq data_stream ${ZMQ_LIBRARY} pthread)
add_test(NAME TestPZMQPushPull
COMMAND ${CHECK_OUTPUT_FILE} ${CMAKE_CURRENT_BINARY_DIR}/test_pzmq_push_pull
......
......@@ -8,6 +8,10 @@
#define __PZMQSOCKETMANAGER_H__
#include <map>
#include "data_size.h"
#include "data_message.h"
#include "phoenix_zmq.h"
......@@ -27,8 +31,43 @@ class PZmqSocketManager{
void clear();
zmq::send_result_t send(const T & name, zmq::message_t& msg, zmq::send_flags flags = zmq::send_flags::none);
zmq::recv_result_t recv(const T & name, zmq::message_t& msg, zmq::recv_flags flags = zmq::recv_flags::none);
bool send(const T & name, zmq::message_t& msg, zmq::send_flags flags = zmq::send_flags::none);
bool recv(const T & name, zmq::message_t& msg, zmq::recv_flags flags = zmq::recv_flags::none);
///Send message on the given socket
/** @param name : name of the socket to be used
* @param data : data to be sent
* @param flags : flags to be used to send the message (none, dontwait, sndmore, etc)
* @return true on success, false otherwise
* There is no way to use const U & data instead of U & data because of the genericness of data_size, based on data_stream
*/
template<typename U>
bool send(const T & name, U & data, zmq::send_flags flags = zmq::send_flags::none){
size_t dataSize(data_size<U>(data));
zmq::message_t msg(dataSize);
char* iter = (char*)msg.data();
if(data_message_save<char*, U>(iter, data)){ //Save the message
return send(name, msg, flags);
}else{
return false;
}
}
///Recieve message from the given socket
/** @param name : name of the socket to be used
* @param data : data to be recieved
* @param flags : flags to be used to send the message (none, dontwait, sndmore, etc)
* @return true on success, false otherwise
*/
template<typename U>
bool recv(const T & name, U & data, zmq::recv_flags flags = zmq::recv_flags::none){
zmq::message_t msg;
if(recv(name, msg, flags)){
char* iter = (char*)msg.data();
return data_message_load<U>(iter, data);
}else{
return false;
}
}
zmq::socket_t* getSocket(const T & name);
......
......@@ -85,13 +85,15 @@ void PZmqSocketManager<T>::clear(){
/** @param name : name of the socket to be used
* @param msg : message to be sent
* @param flags : flags to be used to send the message (none, dontwait, sndmore, etc)
* @return result of the sending
* @return true on success, false otherwise
*/
template<typename T>
zmq::send_result_t PZmqSocketManager<T>::send(const T & name, zmq::message_t& msg, zmq::send_flags flags){
bool PZmqSocketManager<T>::send(const T & name, zmq::message_t& msg, zmq::send_flags flags){
zmq::socket_t* socket = getSocket(name);
if(socket != NULL){
return socket->send(msg, flags);
zmq::send_result_t res = socket->send(msg, flags);
return res.has_value(); //Seems, if there is no value, there is also no error
// return (size_t)res.value() != -1lu;
}else{
return false;
}
......@@ -99,15 +101,17 @@ zmq::send_result_t PZmqSocketManager<T>::send(const T & name, zmq::message_t& ms
///Recieve message from the given socket
/** @param name : name of the socket to be used
* @param msg : message to be sent
* @param msg : message to be recieved
* @param flags : flags to be used to send the message (none, dontwait, sndmore, etc)
* @return result of the sending
* @return true on success, false otherwise
*/
template<typename T>
zmq::recv_result_t PZmqSocketManager<T>::recv(const T & name, zmq::message_t& msg, zmq::recv_flags flags){
bool PZmqSocketManager<T>::recv(const T & name, zmq::message_t& msg, zmq::recv_flags flags){
zmq::socket_t* socket = getSocket(name);
if(socket != NULL){
return socket->recv(msg, flags);
zmq::recv_result_t res = socket->recv(msg, flags);
return res.has_value(); //Seems, if there is no value, there is also no error
// return (size_t)res.value() != -1lu;
}else{
return false;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment