📣 An issue occured with the embedded container registry on October 25 2021, between 10:30 and 12:10 (UTC+2). Any persisting issues should be reported to CC-IN2P3 Support. 🐛

Commit bedcdf93 authored by Pierre Aubert's avatar Pierre Aubert
Browse files

Add pub/sub test

parent 7f91bb81
Pipeline #98081 passed with stages
in 4 minutes and 34 seconds
......@@ -4,4 +4,5 @@ cmake_minimum_required(VERSION 2.8)
add_subdirectory(TEST_ZMQ_PUSH_PULL)
add_subdirectory(TEST_PZMQ_SOCKET_MANAGER_PUSH_PULL)
add_subdirectory(TEST_PZMQ_SOCKET_MANAGER_DATA_PUSH_PULL)
add_subdirectory(TEST_ZMQ_PUB_SUB)
project(Phoenix)
cmake_minimum_required(VERSION 2.8)
add_executable(test_pzmq_pub_sub main.cpp)
target_link_libraries(test_pzmq_pub_sub phoenix_zmq data_stream ${ZMQ_LIBRARY} pthread)
add_test(NAME TestPZMQPubSub
COMMAND ${CHECK_OUTPUT_FILE} ${CMAKE_CURRENT_BINARY_DIR}/test_pzmq_pub_sub
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
/***************************************
Auteur : Pierre Aubert
Mail : aubertp7@gmail.com
Licence : CeCILL-C
****************************************/
#include <unistd.h>
#include <fstream>
#include <iostream>
#include <thread>
#include "phoenix_zmq.h"
#define CONNECTION_PORT 3390
using namespace std;
///Send messages on socket
/** @param nbMessage : number of message to be sent
*/
void threadSendMessage(size_t nbMessage){
cout << "threadSendMessage : send message" << endl;
//Init the connection with host
zmq::context_t context(1);
zmq::socket_t* emiter = pzmq_createServerSocket(context, ZMQ_PUB, CONNECTION_PORT);
for(size_t i(0lu); i < nbMessage; ++i){
usleep(500000);
zmq::message_t message(sizeof(size_t));
memcpy(message.data(), &i, sizeof(size_t));
cout << "message n°"<<i<<", Send value " << i << ", message size = " << message.size() << endl;
zmq::send_result_t isSent = emiter->send(message, zmq::send_flags::none); //ZMQ_DONTWAIT
if(!isSent){
std::cerr << "threadSendMessage : cannot send message" << std::endl;
return;
}
}
cout << "threadSendMessage : stop" << endl;
pzmq_closeServerSocket(emiter);
}
///Recieved messages on socket
/** @param[out] b : true on success, false otherwise
* @param address : address of the socket
* @param threadId : id of the thread
* @param nbMessage : number of message to be read
*/
void threadRecievedMessage(bool & b, const std::string & address, size_t threadId, size_t nbMessage){
b = true;
cout << "threadRecievedMessage "<<threadId<<" : read message to '"<<address<<"'" << endl;
//Init the connection with host
zmq::context_t context(1);
zmq::socket_t* reciever = pzmq_createClientSocket(context, ZMQ_SUB, address, CONNECTION_PORT);
size_t i(0lu);
while(i < nbMessage){
usleep(100000);
cout << "threadRecievedMessage : wait for message" << endl;
zmq::message_t message(sizeof(size_t));
zmq::recv_result_t isRecieved = reciever->recv(message, zmq::recv_flags::dontwait); //ZMQ_DONTWAIT
if(!isRecieved){continue;}
size_t value(0lu);
memcpy(&value, message.data(), sizeof(size_t));
cout << "Recv("<<threadId<<") message n°"<<i<<", value = " << value << ", message size = " << message.size() << endl;
if(value != i){
b = false;
}
++i;
}
cout << "threadRecievedMessage : stop" << endl;
pzmq_closeServerSocket(reciever);
}
///Launch the process
/** @return 0 on success, -1 otherwise
*/
int launchProcess(){
size_t nbMessage(10lu);
std::string hostName("localhost");
bool okThread1(true);
std::thread thrRecv0(threadRecievedMessage, std::ref(okThread1), hostName, 0lu, nbMessage);
// std::thread thrRecv1(threadRecievedMessage, hostName, 1lu, nbMessage);
usleep(500000);
std::thread thrSend(threadSendMessage, nbMessage);
thrRecv0.join();
thrSend.join();
// thrRecv1.join();
std::cout << "launchProcess : okThread1 : " << okThread1 << std::endl;
bool b(okThread1);
return b - 1;
}
int main(int argc, char **argv){
return launchProcess();
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment