Allolib  1.0
C++ Components For Interactive Multimedia
al_CommandConnection.hpp
1 #ifndef COMMANDCONNECTION_HPP
2 #define COMMANDCONNECTION_HPP
3 
4 /* Allocore --
5  Multimedia / virtual environment application class library
6 
7  Copyright (C) 2009. AlloSphere Research Group, Media Arts & Technology,
8  UCSB. Copyright (C) 2012. The Regents of the University of California. All
9  rights reserved.
10 
11  Redistribution and use in source and binary forms, with or without
12  modification, are permitted provided that the following conditions are
13  met:
14 
15  Redistributions of source code must retain the above copyright
16  notice, this list of conditions and the following disclaimer.
17 
18  Redistributions in binary form must reproduce the above
19  copyright notice, this list of conditions and the following disclaimer in the
20  documentation and/or other materials provided with the
21  distribution.
22 
23  Neither the name of the University of California nor the names
24  of its contributors may be used to endorse or promote products derived from
25  this software without specific prior written permission.
26 
27  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
28  IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
29  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30  PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
31  CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
32  EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
33  PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
34  OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
35  WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
36  OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
37  ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38 
39  File description:
40  Objects for reading, writing, sending, and receiving
41  OSC (Open Sound Control) packets.
42 
43  File author(s):
44  Lance Putnam, 2010, putnam.lance@gmail.com
45  Graham Wakefield, 2010, grrrwaaa@gmail.com
46  Keehong Youn, 2017, younkeehong@gmail.com
47 */
48 
49 #include <cinttypes>
50 #include <condition_variable>
51 #include <iostream>
52 #include <map>
53 #include <memory>
54 #include <mutex>
55 #include <thread>
56 #include <vector>
57 
58 #include "al/io/al_Socket.hpp"
59 #include "al/types/al_SingleRWRingBuffer.hpp"
60 #include "al/types/al_ValueSource.hpp"
61 
62 namespace al {
63 
64 class Message {
65 public:
66  Message(uint8_t *message, size_t length) : mData(message), mSize(length) {}
67  std::string getString() {
68  size_t startIndex = mReadIndex;
69  while (mReadIndex < mSize && mData[mReadIndex] != 0x00) {
70  mReadIndex++;
71  }
72  std::string s;
73  if (mReadIndex < mSize && mReadIndex != startIndex) {
74  s.resize(mReadIndex - startIndex);
75  s.assign((const char *)&mData[startIndex]);
76  }
77  mReadIndex++; // skip final null
78  return s;
79  }
80 
81  bool empty() { return mSize == 0 || mReadIndex == mSize; }
82 
83  uint8_t getByte() {
84  uint8_t val = mData[mReadIndex];
85  mReadIndex++;
86  return val;
87  }
88 
89  template <typename DataType> DataType get() {
90  DataType val;
91  memcpy(&val, &mData[mReadIndex], sizeof(DataType));
92  mReadIndex += sizeof(DataType);
93  return val;
94  }
95 
96  uint32_t getUint32() {
97  uint32_t val;
98  memcpy(&val, &mData[mReadIndex], 4);
99  mReadIndex += 4;
100  return val;
101  }
102 
103  std::vector<std::string> getVectorString() {
104  std::vector<std::string> vs;
105  uint8_t count = mData[mReadIndex];
106  mReadIndex++;
107  for (uint8_t i = 0; i < count; i++) {
108  vs.push_back(getString());
109  }
110  return vs;
111  }
112 
113  void print() {
114  std::string s;
115  if (mReadIndex < mSize && mSize > 0) {
116  s.resize(mSize - mReadIndex);
117  strncpy((char *)s.data(), (char *)&mData[mReadIndex], mSize - mReadIndex);
118  std::cout << s << std::endl;
119  }
120  }
121 
122  uint8_t *data() { return mData + mReadIndex; }
123  size_t size() { return mSize; }
124  size_t remainingBytes() { return mSize - mReadIndex; }
125 
126  void pushReadIndex(size_t numBytes) {
127  mReadIndex += numBytes;
128  if (mReadIndex > mSize) {
129  std::cerr << "Message pushed too far" << std::endl;
130  }
131  }
132  void setReadIndex(size_t numBytes) { mReadIndex = numBytes; }
133 
134 private:
135  uint8_t *mData;
136  const size_t mSize;
137  size_t mReadIndex{0};
138 };
139 
141 public:
142  typedef enum {
143  HANDSHAKE = 1,
144  HANDSHAKE_ACK,
145  GOODBYE,
146  GOODBYE_ACK,
147  PING,
148  PONG,
149  COMMAND_QUIT,
150  COMMAND_LAST_INTERNAL = 32,
151  } InternalCommands;
152 
153  virtual bool start(uint16_t port, const char *addr) = 0;
154  virtual void stop();
155 
156  virtual bool processIncomingMessage(Message &message, Socket *src) {
157  auto command = message.getByte();
158  if (command == PONG) {
159  if (mVerbose) {
160  std::cout << __FILE__ << "Got pong for " << src->address() << ":"
161  << src->port() << std::endl;
162  }
163  } else if (command == GOODBYE) {
164  std::cerr << __FILE__ << "Goodbye not implemented" << std::endl;
165  } else if (command == HANDSHAKE) {
166  std::cerr << __FILE__ << "Unexpected handshake received" << std::endl;
167  }
168  return false;
169  };
170 
180  virtual bool sendMessage(uint8_t *message, size_t length,
181  Socket *dst = nullptr, ValueSource *src = nullptr) {
182  return true;
183  }
184 
185  virtual void onReceiveError() {}
186 
187  void setVerbose(bool verbose) { mVerbose = verbose; }
188 
189 protected:
190  virtual void onConnection(Socket *newConnection){};
191 
192  uint16_t mVersion = 0,
193  mRevision = 0; // Subclasses must set these to ensure compatibility
194 
195  typedef enum { SERVER, CLIENT, NONE } BarrierState;
196  BarrierState mState{BarrierState::NONE};
197  std::mutex mConnectionsLock;
198 
199  bool mRunning{false};
200  std::vector<std::unique_ptr<std::thread>> mConnectionThreads;
201  std::vector<std::unique_ptr<std::thread>> mDataThreads;
202  std::vector<std::shared_ptr<al::Socket>>
203  mServerConnections; // Only available on server.
204  std::vector<std::pair<uint16_t, uint16_t>> mConnectionVersions;
205  al::Socket mSocket; // Bootstrap socket for server, main socket for client.
206  bool mVerbose{false};
207 };
208 
210 public:
211  bool start(uint16_t serverPort = 34450,
212  const char *serverAddr = "localhost") override;
213  void stop() override;
214 
219  uint16_t waitForConnections(uint16_t connectionCount, double timeout = 60.0);
220 
229  std::vector<float> ping(double timeoutSecs = 1.0);
230 
231  size_t connectionCount();
232 
233  std::vector<std::pair<std::string, uint16_t>> connections();
242  bool sendMessage(uint8_t *message, size_t length, Socket *dst = nullptr,
243  ValueSource *src = nullptr) override;
244 
245 protected:
246 private:
247  std::unique_ptr<std::thread> mBootstrapServerThread;
248 
249  uint16_t mPortOffset = 12000;
250 };
251 
253 public:
254  bool start(uint16_t serverPort = 34450,
255  const char *serverAddr = "localhost") override;
256 
257  bool sendMessage(uint8_t *message, size_t length, Socket *dst = nullptr,
258  al::ValueSource *src = nullptr) override;
259 
260  bool isConnected() { return mRunning && mSocket.opened(); }
261 
262 protected:
263  void clientHandlePing(Socket &client);
264 
265 private:
266  uint16_t mPortOffset = 12000;
267 };
268 
269 // bool NetworkBarrier::pingClients(double timeoutSecs) {
270 // if (mState != BarrierState::SERVER) {
271 // std::cerr << "ERROR: Should not call ping from server" << std::endl;
272 // return false;
273 // }
274 
275 // bool allResponded = true;
276 
277 // mConnectionsLock.lock();
278 // for (auto listener : mServerConnections) {
279 // std::cout << "pinging " << listener->address() << ":" <<
280 // listener->port()
281 // << std::endl;
282 // auto startTime = al_steady_time();
283 // unsigned char message[8] = {0, 0, 0, 0, 0, 0, 0, 0};
284 
285 // message[0] = 1 << COMMAND_PING;
286 // listener->send((const char *)message, 8);
287 // size_t bytes = 0;
288 // auto previousTimeout = listener->timeout();
289 // listener->timeout(timeoutSecs);
290 // bytes = listener->recv((char *)message, 8);
291 
292 // auto endTime = al_steady_time();
293 // if (bytes == 8) {
294 // std::cout << "Reply from " << listener->address() << ":"
295 // << listener->port() << " in " << (endTime - startTime) *
296 // 1000.0
297 // << " ms" << std::endl;
298 // } else {
299 // std::cout << "No response from: " << listener->address() << ":"
300 // << listener->port() << std::endl;
301 // }
302 // listener->timeout(previousTimeout);
303 // }
304 
305 // mConnectionsLock.unlock();
306 
307 // return allResponded;
308 //}
309 
310 // bool NetworkBarrier::trigger(uint32_t id, double waitTimeoutSecs) {
311 // if (mState == BarrierState::SERVER) {
312 // mConnectionsLock.lock();
313 // for (auto listener : mServerConnections) {
314 // // std::cout << "synchronizing " << listener->address() <<
315 // ":"
316 // // << listener->port() << std::endl;
317 // auto startTime = al_steady_time();
318 // unsigned char message[8] = {0, 0, 0, 0, 0, 0, 0, 0};
319 
320 // message[0] = 1 << COMMAND_TRIGGER;
321 
322 // auto b = Convert::to_bytes(id);
323 // message[1] = b[0];
324 // message[2] = b[1];
325 // message[3] = b[2];
326 // message[4] = b[3];
327 // listener->send((const char *)message, 8);
328 // // size_t bytes = 0;
329 // // auto previousTimeout = listener->timeout();
330 // // listener->timeout(waitTimeoutSecs);
331 // // bytes = listener->recv((char *)message, 8);
332 
333 // // auto endTime = al_steady_time();
334 // // if (bytes == 8) {
335 // // uint32_t id;
336 // // Convert::from_bytes((const uint8_t *)&message[1], id);
337 
338 // // // std::cout << "Synced: " << listener->address()
339 // << ":"
340 // // // << listener->port() << " id " << id
341 // <<
342 // // std::endl;
343 // // } else {
344 // // std::cout << "No response from: " << listener->address()
345 // << ":"
346 // // << listener->port() << std::endl;
347 // // }
348 // // listener->timeout(previousTimeout);
349 // }
350 
351 // mConnectionsLock.unlock();
352 
353 // } else if (mState == BarrierState::CLIENT) {
354 // // std::cout << "sync lock" << std::endl;
355 // if (mClientMessageLock.find(id) == mClientMessageLock.end()) {
356 // mClientMessageLock[id] = {std::make_unique<std::mutex>(),
357 // std::make_unique<std::condition_variable>()};
358 // }
359 // auto &lock = mClientMessageLock[id].first;
360 // auto &conditionVar = mClientMessageLock[id].second;
361 // std::unique_lock<std::mutex> lk(*lock);
362 // conditionVar->wait(lk);
363 // // std::cout << "sync continue" << std::endl;
364 // }
365 // return false;
366 //}
367 
368 // bool NetworkBarrier::synchronize(uint32_t id, double waitTimeoutSecs) {
369 // if (mState == BarrierState::SERVER) {
370 // mConnectionsLock.lock();
371 // unsigned char message[8] = {0, 0, 0, 0, 0, 0, 0, 0};
372 // // Notify all
373 // for (auto listener : mServerConnections) {
374 // // std::cout << "synchronizing " << listener->address() <<
375 // ":"
376 // // << listener->port() << std::endl;
377 // // auto startTime = al_steady_time();
378 
379 // message[0] = 1 << COMMAND_SYNC_REQ;
380 
381 // auto b = Convert::to_bytes(id);
382 // message[1] = b[0];
383 // message[2] = b[1];
384 // message[3] = b[2];
385 // message[4] = b[3];
386 // auto previousTimeout = listener->timeout();
387 // size_t bytes = listener->send((const char *)message, 8);
388 // listener->timeout(previousTimeout);
389 // if (bytes != 8) {
390 // std::cerr << "Error sending to " << listener->address() << ":"
391 // << listener->port() << std::endl;
392 // }
393 // }
394 
395 // // Get reply from all
396 // for (auto listener : mServerConnections) {
397 // size_t bytes = 0;
398 // auto previousTimeout = listener->timeout();
399 // listener->timeout(waitTimeoutSecs);
400 // bytes = listener->recv((char *)message, 8);
401 
402 // // auto endTime = al_steady_time();
403 // if (bytes == 8 && message[0] == 1 << COMMAND_SYNC_ACK) {
404 // uint32_t ackid;
405 // Convert::from_bytes((const uint8_t *)&message[1], ackid);
406 // if (ackid == id) {
407 // std::cout << "Synced: " << listener->address() << ":"
408 // << listener->port() << " id " << id <<
409 // std::endl;
410 // } else {
411 // std::cerr << "Unexpected ack id" << std::endl;
412 // }
413 
414 // } else {
415 // std::cout << "No response from: " << listener->address() <<
416 // ":"
417 // << listener->port() << std::endl;
418 // }
419 // listener->timeout(previousTimeout);
420 // }
421 // // Unlock all
422 // for (auto listener : mServerConnections) {
423 // // std::cout << "synchronizing " << listener->address() <<
424 // ":"
425 // // << listener->port() << std::endl;
426 // // auto startTime = al_steady_time();
427 
428 // message[0] = 1 << COMMAND_BARRIER_UNLOCK;
429 
430 // auto b = Convert::to_bytes(id);
431 // message[1] = b[0];
432 // message[2] = b[1];
433 // message[3] = b[2];
434 // message[4] = b[3];
435 // auto previousTimeout = listener->timeout();
436 // size_t bytes = listener->send((const char *)message, 8);
437 // listener->timeout(previousTimeout);
438 // if (bytes != 8) {
439 // std::cerr << "Error sending to " << listener->address() << ":"
440 // << listener->port() << std::endl;
441 // }
442 // }
443 
444 // mConnectionsLock.unlock();
445 
446 // } else if (mState == BarrierState::CLIENT) {
447 // // std::cout << "sync lock" << std::endl;
448 // if (mRunning) {
449 // if (mClientMessageLock.find(id) == mClientMessageLock.end()) {
450 
451 // mClientMessageLock[id] = {std::make_unique<std::mutex>(),
452 // std::make_unique<std::condition_variable>()};
453 // }
454 // auto &lock = mClientMessageLock[id].first;
455 // auto &conditionVar = mClientMessageLock[id].second;
456 // std::unique_lock<std::mutex> lk(*lock);
457 // if (waitTimeoutSecs > 0) {
458 // conditionVar->wait_for(
459 // lk, std::chrono::milliseconds((int)(waitTimeoutSecs *
460 // 1000.0)));
461 // } else {
462 // conditionVar->wait(lk);
463 // }
464 // }
465 // // std::cout << "sync continue" << std::endl;
466 // }
467 // return false;
468 //}
469 
470 } // namespace al
471 
472 #endif // COMMANDCONNECTION_HPP
bool sendMessage(uint8_t *message, size_t length, Socket *dst=nullptr, al::ValueSource *src=nullptr) override
sendMessage
virtual bool sendMessage(uint8_t *message, size_t length, Socket *dst=nullptr, ValueSource *src=nullptr)
sendMessage
std::vector< float > ping(double timeoutSecs=1.0)
Wait for timeoutSecs for reply from all.
bool sendMessage(uint8_t *message, size_t length, Socket *dst=nullptr, ValueSource *src=nullptr) override
sendMessage
uint16_t waitForConnections(uint16_t connectionCount, double timeout=60.0)
Block until connectionCount connections are established.
const std::string & address() const
Get IP address string.
bool opened() const
Returns whether socket is open.
uint16_t port() const
Get port number.
Definition: al_App.hpp:23