XRootD
XrdHttpTpcState.cc
Go to the documentation of this file.
1 
2 #include <algorithm>
3 #include <sstream>
4 #include <stdexcept>
5 
6 #include "XrdVersion.hh"
9 
10 #include <curl/curl.h>
11 
12 #include "XrdHttpTpcState.hh"
13 #include "XrdHttpTpcStream.hh"
14 
15 using namespace TPC;
16 
17 
19  if (m_headers) {
20  curl_slist_free_all(m_headers);
21  m_headers = NULL;
22  if (m_curl) {curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);}
23  }
24 }
25 
26 
27 void State::Move(State &other)
28 {
29  m_push = other.m_push;
30  m_recv_status_line = other.m_recv_status_line;
31  m_recv_all_headers = other.m_recv_all_headers;
32  m_offset = other.m_offset;
33  m_start_offset = other.m_start_offset;
34  m_status_code = other.m_status_code;
35  m_content_length = other.m_content_length;
36  m_push_length = other.m_push_length;
37  m_stream = other.m_stream;
38  m_curl = other.m_curl;
39  m_headers = other.m_headers;
40  m_headers_copy = other.m_headers_copy;
41  m_resp_protocol = other.m_resp_protocol;
42  m_is_transfer_state = other.m_is_transfer_state;
43  curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this);
44  if (m_is_transfer_state) {
45  if (m_push) {
46  curl_easy_setopt(m_curl, CURLOPT_READDATA, this);
47  } else {
48  curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this);
49  }
50  }
51  tpcForwardCreds = other.tpcForwardCreds;
52  other.m_headers_copy.clear();
53  other.m_curl = NULL;
54  other.m_headers = NULL;
55  other.m_stream = NULL;
56 }
57 
58 
59 bool State::InstallHandlers(CURL *curl) {
60  curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION);
61  curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB);
62  curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
63  if(m_is_transfer_state) {
64  if (m_push) {
65  curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
66  curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB);
67  curl_easy_setopt(curl, CURLOPT_READDATA, this);
68  struct stat buf;
69  if (SFS_OK == m_stream->Stat(&buf)) {
70  m_push_length = buf.st_size;
71  curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
72  }
73  } else {
74  curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB);
75  curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
76  }
77  }
78  curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
79  if(tpcForwardCreds) {
80  curl_easy_setopt(curl,CURLOPT_UNRESTRICTED_AUTH,1L);
81  }
82 
83  // Only use low-speed limits with libcurl v7.38 or later.
84  // Older versions have poor transfer performance, corrected in curl commit cacdc27f.
85  curl_version_info_data *curl_ver = curl_version_info(CURLVERSION_NOW);
86  if (curl_ver->age > 0 && curl_ver->version_num >= 0x072600) {
87  // Require a minimum speed from the transfer: 2 minute average must at least 10KB/s
88  curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, 2*60);
89  curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, 10*1024);
90  }
91  return true;
92 }
93 
102  struct curl_slist *list = NULL;
103  for (std::map<std::string, std::string>::const_iterator hdr_iter = req.headers.begin();
104  hdr_iter != req.headers.end();
105  hdr_iter++) {
106  if (!strcasecmp(hdr_iter->first.c_str(),"copy-header")) {
107  list = curl_slist_append(list, hdr_iter->second.c_str());
108  m_headers_copy.emplace_back(hdr_iter->second);
109  }
110  // Note: len("TransferHeader") == 14
111  if (!strncasecmp(hdr_iter->first.c_str(),"transferheader",14)) {
112  std::stringstream ss;
113  ss << hdr_iter->first.substr(14) << ": " << hdr_iter->second;
114  list = curl_slist_append(list, ss.str().c_str());
115  m_headers_copy.emplace_back(ss.str());
116  }
117  }
118 
119  if (m_is_transfer_state && m_push && m_push_length > 0) {
120  // On libcurl 8.5.0 - 8.9.1, we've observed bugs causing failures whenever
121  // `Expect: 100-continue` is not used. Older versions of libcurl unconditionally
122  // set `Expect` whenever PUT is used (likely an older bug). To workaround the issue,
123  // we force `Expect` to be set, triggering the older libcurl behavior.
124  // See: https://github.com/xrootd/xrootd/issues/2470
125  // See: https://github.com/curl/curl/issues/17004
126  list = curl_slist_append(list, "Expect: 100-continue");
127  }
128 
129  if (list != NULL) {
130  curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, list);
131  m_headers = list;
132  }
133 }
134 
136  m_offset = 0;
137  m_status_code = -1;
138  m_content_length = -1;
139  m_push_length = -1;
140  m_recv_all_headers = false;
141  m_recv_status_line = false;
142 }
143 
144 size_t State::HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata)
145 {
146  State *obj = static_cast<State*>(userdata);
147  std::string header(buffer, size*nitems);
148  return obj->Header(header);
149 }
150 
151 int State::Header(const std::string &header) {
152  //printf("Received remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
153  if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
154  m_recv_all_headers = false;
155  m_recv_status_line = false;
156  }
157  if (!m_recv_status_line) {
158  std::stringstream ss(header);
159  std::string item;
160  if (!std::getline(ss, item, ' ')) return 0;
161  m_resp_protocol = item;
162  //printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
163  if (!std::getline(ss, item, ' ')) return 0;
164  try {
165  m_status_code = std::stol(item);
166  } catch (...) {
167  return 0;
168  }
169  m_recv_status_line = true;
170  } else if (header.size() == 0 || header == "\n" || header == "\r\n") {
171  m_recv_all_headers = true;
172  }
173  else if (header != "\r\n") {
174  // Parse the header
175  std::size_t found = header.find(":");
176  if (found != std::string::npos) {
177  std::string header_name = header.substr(0, found);
178  std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower);
179  std::string header_value = header.substr(found+1);
180  if (header_name == "content-length")
181  {
182  try {
183  m_content_length = std::stoll(header_value);
184  } catch (...) {
185  // Header unparseable -- not a great sign, fail request.
186  //printf("Content-length header unparseable\n");
187  return 0;
188  }
189  }
190  } else {
191  // Non-empty header that isn't the status line, but no ':' present --
192  // malformed request?
193  //printf("Malformed header: %s\n", header.c_str());
194  return 0;
195  }
196  }
197  return header.size();
198 }
199 
200 size_t State::WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) {
201  State *obj = static_cast<State*>(userdata);
202  if (obj->GetStatusCode() < 0) {
203  return 0;
204  } // malformed request - got body before headers.
205  if (obj->GetStatusCode() >= 400) {
206  obj->m_error_buf += std::string(static_cast<char*>(buffer),
207  std::min(static_cast<size_t>(1024), size*nitems));
208  // Record error messages until we hit a KB; at that point, fail out.
209  if (obj->m_error_buf.size() >= 1024)
210  return 0;
211  else
212  return size*nitems;
213  } // Status indicates failure.
214  return obj->Write(static_cast<char*>(buffer), size*nitems);
215 }
216 
217 ssize_t State::Write(char *buffer, size_t size) {
218  ssize_t retval = m_stream->Write(m_start_offset + m_offset, buffer, size, false);
219  if (retval == SFS_ERROR) {
220  m_error_buf = m_stream->GetErrorMessage();
221  m_error_code = 1;
222  return -1;
223  }
224  m_offset += retval;
225  return retval;
226 }
227 
229  if (m_push) {
230  return 0;
231  }
232 
233  ssize_t retval = m_stream->Write(m_start_offset + m_offset, 0, 0, true);
234  if (retval == SFS_ERROR) {
235  m_error_buf = m_stream->GetErrorMessage();
236  m_error_code = 2;
237  return -1;
238  }
239  m_offset += retval;
240  return retval;
241 }
242 
243 size_t State::ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) {
244  State *obj = static_cast<State*>(userdata);
245  if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers.
246  if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure.
247  return obj->Read(static_cast<char*>(buffer), size*nitems);
248 }
249 
250 int State::Read(char *buffer, size_t size) {
251  int retval = m_stream->Read(m_start_offset + m_offset, buffer, size);
252  if (retval == SFS_ERROR) {
253  return -1;
254  }
255  m_offset += retval;
256  //printf("Read a total of %ld bytes.\n", m_offset);
257  return retval;
258 }
259 
261  CURL *curl = curl_easy_duphandle(m_curl);
262  if (!curl) {
263  throw std::runtime_error("Failed to duplicate existing curl handle.");
264  }
265 
266  State *state = new State(0, *m_stream, curl, m_push, tpcForwardCreds);
267 
268  if (m_headers) {
269  state->m_headers_copy.reserve(m_headers_copy.size());
270  for (std::vector<std::string>::const_iterator header_iter = m_headers_copy.begin();
271  header_iter != m_headers_copy.end();
272  header_iter++) {
273  state->m_headers = curl_slist_append(state->m_headers, header_iter->c_str());
274  state->m_headers_copy.push_back(*header_iter);
275  }
276  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, NULL);
277  curl_easy_setopt(curl, CURLOPT_HTTPHEADER, state->m_headers);
278  }
279 
280  return state;
281 }
282 
283 void State::SetTransferParameters(off_t offset, size_t size) {
284  m_start_offset = offset;
285  m_offset = 0;
286  m_content_length = size;
287  std::stringstream ss;
288  ss << offset << "-" << (offset+size-1);
289  curl_easy_setopt(m_curl, CURLOPT_RANGE, ss.str().c_str());
290 }
291 
293 {
294  return m_stream->AvailableBuffers();
295 }
296 
297 void State::DumpBuffers() const
298 {
299  m_stream->DumpBuffers();
300 }
301 
303 {
304  if (!m_stream->Finalize()) {
305  m_error_buf = m_stream->GetErrorMessage();
306  m_error_code = 3;
307  return false;
308  }
309  return true;
310 }
311 
313 {
314  // CURLINFO_PRIMARY_PORT is only defined for 7.21.0 or later; on older
315  // library versions, simply omit this information.
316 #if LIBCURL_VERSION_NUM >= 0x071500
317  char *curl_ip = NULL;
318  CURLcode rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_IP, &curl_ip);
319  if ((rc != CURLE_OK) || !curl_ip) {
320  return "";
321  }
322  long curl_port = 0;
323  rc = curl_easy_getinfo(m_curl, CURLINFO_PRIMARY_PORT, &curl_port);
324  if ((rc != CURLE_OK) || !curl_port) {
325  return "";
326  }
327  std::stringstream ss;
328  // libcurl returns IPv6 addresses of the form:
329  // 2600:900:6:1301:5054:ff:fe0b:9cba:8000
330  // However the HTTP-TPC spec says to use the form
331  // [2600:900:6:1301:5054:ff:fe0b:9cba]:8000
332  // Hence, we add '[' and ']' whenever a ':' is seen.
333  if (NULL == strchr(curl_ip, ':'))
334  ss << "tcp:" << curl_ip << ":" << curl_port;
335  else
336  ss << "tcp:[" << curl_ip << "]:" << curl_port;
337  return ss.str();
338 #else
339  return "";
340 #endif
341 }
void CURL
int stat(const char *path, struct stat *buf)
void getline(uchar *buff, int blen)
#define SFS_ERROR
#define SFS_OK
State * Duplicate()
void Move(State &other)
int GetStatusCode() const
void DumpBuffers() const
void ResetAfterRequest()
void SetTransferParameters(off_t offset, size_t size)
std::string GetConnectionDescription()
void SetupHeaders(XrdHttpExtReq &req)
int AvailableBuffers() const
int Read(off_t offset, char *buffer, size_t size)
ssize_t Write(off_t offset, const char *buffer, size_t size, bool force)
void DumpBuffers() const
std::string GetErrorMessage() const
size_t AvailableBuffers() const
int Stat(struct stat *)
std::map< std::string, std::string > & headers
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, uint16_t timeout=0)
Factory for creating WriteImpl objects.
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.