Allolib  1.0
C++ Components For Interactive Multimedia
al_StateDistributionDomain.hpp
1 #ifndef STATEDISTRIBUTIONDOMAIN_H
2 #define STATEDISTRIBUTIONDOMAIN_H
3 
4 #include <cassert>
5 #include <cstring>
6 #include <functional>
7 #include <iostream>
8 #include <memory>
9 #include <mutex>
10 #include <stack>
11 #include <vector>
12 
13 #include "al/app/al_SimulationDomain.hpp"
14 #include "al/protocol/al_OSC.hpp"
15 #include "al/spatial/al_Pose.hpp"
16 
17 namespace al {
18 
19 struct DefaultState {
20  Pose pose;
21 };
22 
23 template <class TSharedState> class StateReceiveDomain;
24 
25 template <class TSharedState> class StateSendDomain;
26 
27 template <class TSharedState> class StateSimulationDomain;
28 
36 template <class TSharedState = DefaultState>
37 class StateDistributionDomain : public StateSimulationDomain<TSharedState> {
38 public:
39  std::shared_ptr<StateSendDomain<TSharedState>>
40  addStateSender(std::string id = "",
41  std::shared_ptr<TSharedState> statePtr = nullptr);
42 
43  std::shared_ptr<StateReceiveDomain<TSharedState>>
44  addStateReceiver(std::string id = "",
45  std::shared_ptr<TSharedState> statePtr = nullptr);
46 
47  bool isSender() { return mIsSender; }
48 
49  void disconnect() {
50  for (auto sendrecv : mSendRecvDomains) {
51  this->removeSubDomain(sendrecv);
52  }
53  }
54 
55 protected:
56  bool mIsSender{false};
57 
58  std::vector<std::shared_ptr<SynchronousDomain>> mSendRecvDomains;
59 
60 private:
61 };
62 
63 template <class TSharedState = DefaultState>
65 public:
66  bool init(ComputationDomain *parent = nullptr) override;
67 
68  bool tick() override {
69  tickSubdomains(true);
70 
71  assert(mState); // State must have been set at this point
72  mRecvLock.lock();
73  if (newMessages > 0) {
74  mQueuedStates = newMessages;
75  std::memcpy(mState.get(), buf.get(), sizeof(TSharedState));
76  newMessages = 0;
77  }
78  mRecvLock.unlock();
79  tickSubdomains(false);
80  return true;
81  }
82 
83  bool cleanup(ComputationDomain *parent = nullptr) override {
84  cleanupSubdomains(true);
85  mRecv = nullptr;
86  mState = nullptr;
87 
88  // std::cerr << "Not using Cuttlebone. Ignoring" << std::endl;
89  cleanupSubdomains(false);
90  return true;
91  }
92 
93  void configure(uint16_t port = 10100, std::string id = "state",
94  std::string address = "0.0.0.0", uint16_t packetSize = 1400) {
95  mPort = port;
96  mId = id;
97  mAddress = address;
98  mPacketSize = packetSize;
99  }
100 
101  std::shared_ptr<TSharedState> state() { return mState; }
102 
103  void setStatePointer(std::shared_ptr<TSharedState> ptr) { mState = ptr; }
104 
105  void lockState() { mRecvLock.lock(); }
106  void unlockState() { mRecvLock.unlock(); }
107  int newStates() { return mQueuedStates; }
108 
109  std::string id() const { return mId; }
110 
111  void setId(const std::string &id) { mId = id; }
112 
113 protected:
114  std::shared_ptr<TSharedState> mState;
115  int mQueuedStates{1};
116  std::string mAddress{"localhost"};
117  uint16_t mPort = 10100;
118  uint16_t mPacketSize = 1400;
119 
120 private:
121  std::string mId;
122 
123  class Handler : public osc::PacketHandler {
124  public:
125  StateReceiveDomain *mOscDomain;
126  void onMessage(osc::Message &m) override {
127  // m.print();
128  if (m.addressPattern() == "/_state" && m.typeTags() == "sb") {
129  std::string id;
130  m >> id;
131  if (id == mOscDomain->mId) {
132  osc::Blob inBlob;
133  m >> inBlob;
134  if (sizeof(TSharedState) == inBlob.size) {
135  mOscDomain->mRecvLock.lock();
136  memcpy(mOscDomain->buf.get(), inBlob.data, sizeof(TSharedState));
137  mOscDomain->newMessages++;
138  mOscDomain->mRecvLock.unlock();
139  } else {
140  std::cerr << "ERROR: received state size mismatch" << std::endl;
141  }
142  }
143  }
144  }
145  } mHandler;
146 
147  std::unique_ptr<unsigned char[]> buf;
148 
149  uint16_t newMessages = 0;
150  std::mutex mRecvLock;
151  std::unique_ptr<osc::Recv> mRecv;
152 };
153 
154 template <class TSharedState>
156  initializeSubdomains(true);
157  assert(parent != nullptr);
158 
159  buf = std::make_unique<unsigned char[]>(sizeof(TSharedState));
160  mRecv = std::make_unique<osc::Recv>();
161  if (!mRecv || !mRecv->open(mPort, mAddress.c_str())) {
162  std::cerr << "Error opening server" << std::endl;
163  return false;
164  }
165  mHandler.mOscDomain = this;
166  mRecv->handler(mHandler);
167  if (!mRecv->start()) {
168  std::cerr << "Failed to start receiver. " << std::endl;
169  return false;
170  }
171 
172  std::cout << "Opened " << mAddress << ":" << mPort << std::endl;
173  initializeSubdomains(false);
174  return true;
175 }
176 
177 template <class TSharedState = DefaultState>
179 public:
180  bool init(ComputationDomain *parent = nullptr) override {
181  initializeSubdomains(true);
182 
183  initializeSubdomains(false);
184  return true;
185  }
186 
187  bool tick() override {
188  tickSubdomains(true);
189 
190  assert(mState); // State must have been set at this point
191  // assert(mSend);
192 
193  // osc::Blob b(&mState, sizeof(mState));
194  // mSend->send("/_state", b);
195 
196  mStateLock.lock();
197  osc::Blob b(mState.get(), sizeof(TSharedState));
198  osc::Send s(mPort, mAddress.c_str());
199  // std::cout << mAddress << ":" << mPort << std::endl;
200  s.send("/_state", mId, b);
201 
202  mStateLock.unlock();
203 
204  tickSubdomains(false);
205  return true;
206  }
207 
208  bool cleanup(ComputationDomain *parent = nullptr) override {
209  cleanupSubdomains(true);
210  mState = nullptr;
211  // std::cerr << "Not using Cuttlebone. Ignoring" << std::endl;
212  cleanupSubdomains(false);
213  return true;
214  }
215 
216  void configure(uint16_t port, std::string id = "state",
217  std::string address = "localhost",
218  uint16_t packetSize = 1400) {
219  mPort = port;
220  mId = id;
221  mAddress = address;
222  mPacketSize = packetSize;
223  }
224 
225  std::shared_ptr<TSharedState> state() { return mState; }
226 
227  // void lockState() { mStateLock.lock(); }
228  // void unlockState() {mStateLock.unlock();}
229 
230  void setStatePointer(std::shared_ptr<TSharedState> ptr) {
231  mStateLock.lock();
232  mState = ptr;
233  mStateLock.unlock();
234  }
235 
236  int newStates() { return mQueuedStates; }
237 
238  std::string id() const { return mId; }
239 
240  void setId(const std::string &id) { mId = id; }
241 
242  void setAddress(std::string address) { mAddress = address; };
243 
244 protected:
245  std::shared_ptr<TSharedState> mState;
246  std::mutex mStateLock;
247  int mQueuedStates{0};
248  uint16_t mPort = 10100;
249  std::string mAddress{"localhost"};
250  uint16_t mPacketSize = 1400;
251 
252 private:
253  std::unique_ptr<osc::Send> mSend;
254 
255  std::string mId = "";
256 };
257 
258 template <class TSharedState>
259 std::shared_ptr<StateSendDomain<TSharedState>>
260 StateDistributionDomain<TSharedState>::addStateSender(
261  std::string id, std::shared_ptr<TSharedState> statePtr) {
262  auto newDomain =
263  this->template newSubDomain<StateSendDomain<TSharedState>>(false);
264  newDomain->setId(id);
265  newDomain->setStatePointer(statePtr);
266  mSendRecvDomains.push_back(newDomain);
267  return newDomain;
268 }
269 
270 template <class TSharedState>
271 std::shared_ptr<StateReceiveDomain<TSharedState>>
272 StateDistributionDomain<TSharedState>::addStateReceiver(
273  std::string id, std::shared_ptr<TSharedState> statePtr) {
274  auto newDomain =
275  this->template newSubDomain<StateReceiveDomain<TSharedState>>(true);
276  newDomain->setId(id);
277  newDomain->setStatePointer(statePtr);
278  mSendRecvDomains.push_back(newDomain);
279  return newDomain;
280 }
281 
282 } // namespace al
283 
284 #endif // STATEDISTRIBUTIONDOMAIN_H
ComputationDomain class.
bool cleanupSubdomains(bool pre=false)
cleanup subdomains
bool initializeSubdomains(bool pre=false)
initializeSubdomains should be called within the domain's initialization function
void removeSubDomain(std::shared_ptr< SynchronousDomain > subDomain)
Remove a subdomain.
bool tickSubdomains(bool pre=false)
execute subdomains
A local coordinate frame.
Definition: al_Pose.hpp:63
Domain for distributing state for a simulation domain.
bool cleanup(ComputationDomain *parent=nullptr) override
cleanup
bool tick() override
Execute a pass of the domain.
bool init(ComputationDomain *parent=nullptr) override
initialize
bool init(ComputationDomain *parent=nullptr) override
initialize
bool tick() override
Execute a pass of the domain.
bool cleanup(ComputationDomain *parent=nullptr) override
cleanup
size_t send()
Send and clear current packet contents.
Definition: al_App.hpp:23