1 #ifndef COMMANDCONNECTION_HPP
2 #define COMMANDCONNECTION_HPP
50 #include <condition_variable>
58 #include "al/io/al_Socket.hpp"
59 #include "al/types/al_SingleRWRingBuffer.hpp"
60 #include "al/types/al_ValueSource.hpp"
66 Message(uint8_t *message,
size_t length) : mData(message), mSize(length) {}
67 std::string getString() {
68 size_t startIndex = mReadIndex;
69 while (mReadIndex < mSize && mData[mReadIndex] != 0x00) {
73 if (mReadIndex < mSize && mReadIndex != startIndex) {
74 s.resize(mReadIndex - startIndex);
75 s.assign((
const char *)&mData[startIndex]);
81 bool empty() {
return mSize == 0 || mReadIndex == mSize; }
84 uint8_t val = mData[mReadIndex];
89 template <
typename DataType> DataType get() {
91 memcpy(&val, &mData[mReadIndex],
sizeof(DataType));
92 mReadIndex +=
sizeof(DataType);
96 uint32_t getUint32() {
98 memcpy(&val, &mData[mReadIndex], 4);
103 std::vector<std::string> getVectorString() {
104 std::vector<std::string> vs;
105 uint8_t count = mData[mReadIndex];
107 for (uint8_t i = 0; i < count; i++) {
108 vs.push_back(getString());
115 if (mReadIndex < mSize && mSize > 0) {
116 s.resize(mSize - mReadIndex);
117 strncpy((
char *)s.data(), (
char *)&mData[mReadIndex], mSize - mReadIndex);
118 std::cout << s << std::endl;
122 uint8_t *data() {
return mData + mReadIndex; }
123 size_t size() {
return mSize; }
124 size_t remainingBytes() {
return mSize - mReadIndex; }
126 void pushReadIndex(
size_t numBytes) {
127 mReadIndex += numBytes;
128 if (mReadIndex > mSize) {
129 std::cerr <<
"Message pushed too far" << std::endl;
132 void setReadIndex(
size_t numBytes) { mReadIndex = numBytes; }
137 size_t mReadIndex{0};
150 COMMAND_LAST_INTERNAL = 32,
153 virtual bool start(uint16_t port,
const char *addr) = 0;
156 virtual bool processIncomingMessage(
Message &message,
Socket *src) {
157 auto command = message.getByte();
158 if (command == PONG) {
160 std::cout << __FILE__ <<
"Got pong for " << src->
address() <<
":"
161 << src->
port() << std::endl;
163 }
else if (command == GOODBYE) {
164 std::cerr << __FILE__ <<
"Goodbye not implemented" << std::endl;
165 }
else if (command == HANDSHAKE) {
166 std::cerr << __FILE__ <<
"Unexpected handshake received" << std::endl;
185 virtual void onReceiveError() {}
187 void setVerbose(
bool verbose) { mVerbose = verbose; }
190 virtual void onConnection(Socket *newConnection){};
192 uint16_t mVersion = 0,
195 typedef enum { SERVER, CLIENT, NONE } BarrierState;
196 BarrierState mState{BarrierState::NONE};
197 std::mutex mConnectionsLock;
199 bool mRunning{
false};
200 std::vector<std::unique_ptr<std::thread>> mConnectionThreads;
201 std::vector<std::unique_ptr<std::thread>> mDataThreads;
202 std::vector<std::shared_ptr<al::Socket>>
204 std::vector<std::pair<uint16_t, uint16_t>> mConnectionVersions;
206 bool mVerbose{
false};
211 bool start(uint16_t serverPort = 34450,
212 const char *serverAddr =
"localhost")
override;
213 void stop()
override;
229 std::vector<float>
ping(
double timeoutSecs = 1.0);
231 size_t connectionCount();
233 std::vector<std::pair<std::string, uint16_t>> connections();
247 std::unique_ptr<std::thread> mBootstrapServerThread;
249 uint16_t mPortOffset = 12000;
254 bool start(uint16_t serverPort = 34450,
255 const char *serverAddr =
"localhost")
override;
260 bool isConnected() {
return mRunning && mSocket.
opened(); }
263 void clientHandlePing(
Socket &client);
266 uint16_t mPortOffset = 12000;
bool sendMessage(uint8_t *message, size_t length, Socket *dst=nullptr, al::ValueSource *src=nullptr) override
sendMessage
virtual bool sendMessage(uint8_t *message, size_t length, Socket *dst=nullptr, ValueSource *src=nullptr)
sendMessage
std::vector< float > ping(double timeoutSecs=1.0)
Wait for timeoutSecs for reply from all.
bool sendMessage(uint8_t *message, size_t length, Socket *dst=nullptr, ValueSource *src=nullptr) override
sendMessage
uint16_t waitForConnections(uint16_t connectionCount, double timeout=60.0)
Block until connectionCount connections are established.
const std::string & address() const
Get IP address string.
bool opened() const
Returns whether socket is open.
uint16_t port() const
Get port number.