source: trunk/shared_serial/src/server.cpp @ 8

Last change on this file since 8 was 8, checked in by wcaarls, 10 years ago

Imported shared_serial at revision 990

File size: 8.1 KB
Line 
1#include <ros/assert.h>
2#include <shared_serial/server.h>
3#include <shared_serial/watchdog.h>
4
5SerialServerLock::SerialServerLock() :
6  socket_(0), last_socket_(0)
7{
8  pthread_mutex_init(&mutex_, NULL);
9  pthread_cond_init(&condition_, NULL);
10}
11
12void SerialServerLock::checkTimeout()
13{
14  pthread_mutex_lock(&mutex_);
15
16  struct timeval tv;
17  gettimeofday(&tv, NULL);
18 
19  // Check timeout
20  if (socket_ && (tv.tv_sec > timeout_.tv_sec ||
21                  (tv.tv_sec == timeout_.tv_sec && tv.tv_usec > timeout_.tv_usec)))
22  {
23    ROS_DEBUG_STREAM("Lock " << socket_ << " expired");
24    socket_ = 0;
25
26    pthread_cond_signal(&condition_);
27  }
28 
29  pthread_mutex_unlock(&mutex_);
30}
31
32int SerialServerLock::lock(int socket, float timeout)
33{
34  pthread_mutex_lock(&mutex_);
35
36  // Check validity
37  if (socket && socket != socket_)
38  {
39    ROS_ERROR_STREAM("Unknown lock " << socket);
40    pthread_mutex_unlock(&mutex_);
41    return -1;
42  }
43 
44  if (!socket)
45  {
46    // Wait for port to become free
47    while (socket_)
48    {
49      ROS_DEBUG_STREAM("Waiting for lock");
50      pthread_cond_wait(&condition_, &mutex_);
51    }
52   
53    // New connection
54    socket_ = last_socket_ = std::max(last_socket_+1, 1);
55   
56    ROS_DEBUG_STREAM("New lock " << socket_);
57  }
58
59  if (timeout)
60  {
61    // Set timeout
62    struct timeval tv;
63    gettimeofday(&tv, NULL);
64 
65    tv.tv_sec += (int)timeout;
66    tv.tv_usec += (int)((timeout-(int)timeout)*1000000);
67    if (tv.tv_usec > 1000000)
68    {
69      tv.tv_sec++;
70      tv.tv_usec -= 1000000;
71    }
72    timeout_ = tv;
73   
74    ROS_DEBUG_STREAM("Lock " << socket_ << " will expire after " << timeout << " seconds");
75  }
76  else
77  {
78    ROS_DEBUG_STREAM("Lock " << socket_ << " expired");
79
80    timeout_.tv_sec = timeout_.tv_usec = socket_ = 0;
81  }
82 
83  return socket_;
84}
85
86void SerialServerLock::unlock()
87{
88  if (!socket_)
89    pthread_cond_signal(&condition_);
90  pthread_mutex_unlock(&mutex_);
91}
92
93void SerialServer::advertiseServices()
94{
95  ROS_INFO("Registering services");
96
97  send_topic_ = nh_.subscribe("send", 10, &SerialServer::callbackSend, this);
98  close_topic_ = nh_.subscribe("close", 1, &SerialServer::callbackClose, this);
99  flush_topic_ = nh_.subscribe("flush", 1, &SerialServer::callbackFlush, this);
100  connect_service_ = nh_.advertiseService("connect", &SerialServer::callbackConnect, this);
101  sendto_service_ = nh_.advertiseService("sendto", &SerialServer::callbackSendTo, this);
102  recv_service_ = nh_.advertiseService("recv", &SerialServer::callbackRecv, this);
103  sendrecv_service_ = nh_.advertiseService("sendrecv", &SerialServer::callbackSendRecv, this);
104}
105
106void SerialServer::readParameters()
107{
108  std::string port_name, port_type;
109  LxSerial::PortType lx_port_type;
110  int baud_rate;
111  double watchdog_interval;
112 
113  ROS_INFO("Reading parameters");
114 
115  ROS_ASSERT(nh_.getParam("port_name", port_name));
116  ROS_ASSERT(nh_.getParam("port_type", port_type));
117  ROS_ASSERT(nh_.getParam("baud_rate", baud_rate));
118  nh_.param<double>("watchdog_interval", watchdog_interval, 1);
119
120  if (port_type == "RS232")
121    lx_port_type = LxSerial::RS232;
122  else if (port_type == "RS485_FTDI")
123    lx_port_type = LxSerial::RS485_FTDI;
124  else if (port_type == "RS485_EXAR")
125    lx_port_type = LxSerial::RS485_EXAR;
126  else if (port_type == "RS485_SMSC")
127    lx_port_type = LxSerial::RS485_SMSC;
128  else if (port_type == "TCP")
129    lx_port_type = LxSerial::TCP;
130  else
131  {
132    ROS_FATAL_STREAM("Unknown port type " << port_type);
133    ROS_BREAK();
134    return;
135  }
136
137  ROS_ASSERT(serial_port_.port_open(port_name, lx_port_type));
138  ROS_ASSERT(serial_port_.set_speed_int(baud_rate));
139  ROS_ASSERT(watchdog_.set(watchdog_interval));
140}
141
142void SerialServer::callbackSend(const shared_serial::Send::ConstPtr& msg)
143{
144  int socket = lock_.lock(msg->socket, msg->timeout);
145  if (socket < 0) return;
146   
147  unsigned char *buf = new unsigned char[msg->data.size()];
148  for (unsigned int ii=0; ii != msg->data.size(); ++ii)
149    buf[ii] = msg->data[ii];
150 
151  int n = serial_port_.port_write(buf, msg->data.size());
152  delete buf;
153 
154  if (n != (int)msg->data.size())
155  {
156    ROS_ERROR("Truncated send, flushing port");
157    serial_port_.flush_buffer();
158   
159    lock_.unlock();
160    return;
161  }
162
163  ROS_DEBUG_STREAM("Sent " << n << " bytes");
164   
165  lock_.unlock();
166  return;
167}
168
169void SerialServer::callbackClose(const shared_serial::Close::ConstPtr& msg)
170{
171  int socket = lock_.lock(msg->socket, 0);
172  if (socket < 0) return;
173
174  lock_.unlock();
175  return;
176}
177
178void SerialServer::callbackFlush(const shared_serial::Flush::ConstPtr& msg)
179{
180  int socket = lock_.lock(msg->socket, msg->timeout);
181  if (socket < 0) return;
182   
183  serial_port_.flush_buffer();
184  usleep(10);
185  serial_port_.flush_buffer();
186
187  ROS_INFO_STREAM("Flushed port");
188   
189  lock_.unlock();
190  return;
191}
192
193bool SerialServer::callbackConnect(shared_serial::Connect::Request& req, shared_serial::Connect::Response& res)
194{
195  int socket = lock_.lock(0, req.timeout);
196  if (socket < 0) return false;
197 
198  res.socket = socket;
199
200  lock_.unlock(); 
201  return true;
202}
203
204bool SerialServer::callbackSendTo(shared_serial::SendTo::Request& req, shared_serial::SendTo::Response& res)
205{
206  int socket = lock_.lock(req.socket, req.timeout);
207  if (socket < 0) return false;
208
209  unsigned char *buf = new unsigned char[req.data.size()];
210  for (unsigned int ii=0; ii != req.data.size(); ++ii)
211    buf[ii] = req.data[ii];
212   
213  int n = serial_port_.port_write(buf, req.data.size());
214  delete buf;
215 
216  if (n != (int)req.data.size())
217  {
218    ROS_ERROR("Truncated send, flushing port");
219    serial_port_.flush_buffer();
220   
221    lock_.unlock();
222    return false;
223  }
224
225  ROS_DEBUG_STREAM("Sent " << n << " bytes");
226 
227  res.socket = socket;
228
229  lock_.unlock();
230  return true;
231}
232
233bool SerialServer::callbackRecv(shared_serial::Recv::Request& req, shared_serial::Recv::Response& res)
234{
235  int socket = lock_.lock(req.socket, req.sock_timeout);
236  if (socket < 0) return false;
237
238  ROS_ASSERT(req.length <= 65536);
239
240  unsigned char *buf = new unsigned char[req.length];
241  int n = serial_port_.port_read(buf, req.length, (int)req.recv_timeout, (int)((req.recv_timeout-(int)req.recv_timeout)*1000000));
242 
243  if (n < 0)
244  {
245    ROS_ERROR_STREAM("Error " << n << " while reading serial port");
246    delete buf;
247
248    serial_port_.flush_buffer();
249    usleep(10);
250    serial_port_.flush_buffer();
251   
252    lock_.unlock();
253    return false;
254  }
255 
256  ROS_DEBUG_STREAM("Read " << n << " bytes");
257 
258  res.data.resize(n);
259  for (int ii=0; ii != n; ++ii)
260    res.data[ii] = buf[ii];
261  delete buf;
262 
263  res.socket = socket;
264   
265  lock_.unlock();
266  return true;
267}
268
269bool SerialServer::callbackSendRecv(shared_serial::SendRecv::Request& req, shared_serial::SendRecv::Response& res)
270{
271  int socket = lock_.lock(req.socket, req.sock_timeout);
272  if (socket < 0) return false;
273
274  // *** Send ***
275  unsigned char *buf = new unsigned char[req.send_data.size()];
276  for (unsigned int ii=0; ii != req.send_data.size(); ++ii)
277    buf[ii] = req.send_data[ii];
278   
279  int n = serial_port_.port_write(buf, req.send_data.size());
280  delete buf;
281 
282  if (n != (int)req.send_data.size())
283  {
284    ROS_ERROR("Truncated send, flushing port");
285    serial_port_.flush_buffer();
286   
287    lock_.unlock();
288    return false;
289  }
290
291  ROS_DEBUG_STREAM("Sent " << n << " bytes");
292
293  // *** Receive ***
294  ROS_ASSERT(req.length <= 65536);
295
296  buf = new unsigned char[req.length];
297  n = serial_port_.port_read(buf, req.length, (int)req.recv_timeout, (int)((req.recv_timeout-(int)req.recv_timeout)*1000000));
298 
299  if (n < 0)
300  {
301    ROS_ERROR("Error while reading serial port");
302    delete buf;
303
304    serial_port_.flush_buffer();
305    usleep(10);
306    serial_port_.flush_buffer();
307   
308    lock_.unlock();
309    return false;
310  }
311 
312  ROS_DEBUG_STREAM("Read " << n << " bytes");
313 
314  res.recv_data.resize(n);
315  for (int ii=0; ii != n; ++ii)
316    res.recv_data[ii] = buf[ii];
317  delete buf;
318 
319  res.socket = socket;
320   
321  lock_.unlock();   
322  return true;
323}
324
325int main(int argc, char **argv)
326{
327  ros::init(argc, argv, "shared_serial_server");
328 
329  SerialServer serialServer;
330
331  ROS_INFO("Spinning");
332 
333  ros::AsyncSpinner spinner(8);
334  spinner.start();
335 
336  ros::Rate loop_rate(10);
337  while (ros::ok())
338  {
339    serialServer.checkTimeout();
340    serialServer.kickWatchdog();
341    loop_rate.sleep();
342  }
343 
344  return 0;
345}
Note: See TracBrowser for help on using the repository browser.