XRootD
XrdThrottleFileSystemConfig.cc
Go to the documentation of this file.
1 
2 #include <fcntl.h>
3 
4 #include "XrdSys/XrdSysPlugin.hh"
5 #include "XrdOuc/XrdOuca2x.hh"
6 #include "XrdOuc/XrdOucEnv.hh"
7 #include "XrdOuc/XrdOucStream.hh"
8 
11 
12 using namespace XrdThrottle;
13 
14 #define OFS_NAME "libXrdOfs.so"
15 
16 /*
17  * Note nothing in this file is thread-safe.
18  */
19 
20 static XrdSfsFileSystem *
21 LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file){
22  // Load the library
23  XrdSysPlugin ofsLib(&eDest, fslib.c_str(), "fslib", NULL);
24  XrdSfsFileSystem *fs;
25  if (fslib == OFS_NAME)
26  {
28  XrdSysLogger *lp,
29  const char *configfn,
30  XrdOucEnv *EnvInfo);
31 
32  if (!(fs = XrdSfsGetDefaultFileSystem(0, eDest.logger(), config_file.c_str(), 0)))
33  {
34  eDest.Emsg("Config", "Unable to load OFS filesystem.");
35  }
36  }
37  else
38  {
39  XrdSfsFileSystem *(*ep)(XrdSfsFileSystem *, XrdSysLogger *, const char *);
40  if (!(ep = (XrdSfsFileSystem *(*)(XrdSfsFileSystem *,XrdSysLogger *,const char *))
41  ofsLib.getPlugin("XrdSfsGetFileSystem")))
42  return NULL;
43  if (!(fs = (*ep)(0, eDest.logger(), config_file.c_str())))
44  {
45  eDest.Emsg("Config", "Unable to create file system object via", fslib.c_str());
46  return NULL;
47  }
48  }
49  ofsLib.Persist();
50 
51  return fs;
52 }
53 
54 namespace XrdThrottle {
57  XrdSysLogger *lp,
58  const char *configfn,
59  XrdOucEnv *envP)
60 {
61  FileSystem* fs = NULL;
62  FileSystem::Initialize(fs, native_fs, lp, configfn, envP);
63  return fs;
64 }
65 }
66 
67 // Export the symbol necessary for this to be dynamically loaded.
68 extern "C" {
71  XrdSysLogger *lp,
72  const char *configfn)
73 {
74  return XrdSfsGetFileSystem_Internal(native_fs, lp, configfn, nullptr);
75 }
76 
79  XrdSysLogger *lp,
80  const char *configfn,
81  XrdOucEnv *envP)
82 {
83  return XrdSfsGetFileSystem_Internal(native_fs, lp, configfn, envP);
84 }
85 }
86 
89 
90 FileSystem* FileSystem::m_instance = 0;
91 
92 FileSystem::FileSystem()
93  : m_eroute(0), m_trace(&m_eroute), m_sfs_ptr(0), m_initialized(false), m_throttle(&m_eroute, &m_trace)
94 {
95  myVersion = &XrdVERSIONINFOVAR(XrdSfsGetFileSystem);
96 }
97 
98 FileSystem::~FileSystem() {}
99 
100 void
101 FileSystem::Initialize(FileSystem *&fs,
102  XrdSfsFileSystem *native_fs,
103  XrdSysLogger *lp,
104  const char *configfn,
105  XrdOucEnv *envP)
106 {
107  fs = NULL;
108  if (m_instance == NULL && !(m_instance = new FileSystem()))
109  {
110  return;
111  }
112  fs = m_instance;
113  if (!fs->m_initialized)
114  {
115  fs->m_config_file = configfn;
116  fs->m_eroute.logger(lp);
117  fs->m_eroute.Say("Initializing a Throttled file system.");
118  if (fs->Configure(fs->m_eroute, native_fs, envP))
119  {
120  fs->m_eroute.Say("Initialization of throttled file system failed.");
121  fs = NULL;
122  return;
123  }
124  fs->m_throttle.Init();
125  fs->m_initialized = true;
126  }
127 }
128 
129 #define TS_Xeq(key, func) NoGo = (strcmp(key, var) == 0) ? func(Config) : 0
130 int
131 FileSystem::Configure(XrdSysError & log, XrdSfsFileSystem *native_fs, XrdOucEnv *envP)
132 {
133  XrdOucEnv myEnv;
134  XrdOucStream Config(&m_eroute, getenv("XRDINSTANCE"), &myEnv, "(Throttle Config)> ");
135  int cfgFD;
136  if (m_config_file.length() == 0)
137  {
138  log.Say("No filename specified.");
139  return 1;
140  }
141  if ((cfgFD = open(m_config_file.c_str(), O_RDONLY)) < 0)
142  {
143  log.Emsg("Config", errno, "Unable to open configuration file", m_config_file.c_str());
144  return 1;
145  }
146  Config.Attach(cfgFD);
147  static const char *cvec[] = { "*** throttle (ofs) plugin config:", 0 };
148  Config.Capture(cvec);
149 
150  std::string fslib = OFS_NAME;
151 
152  char *var, *val;
153  int NoGo = 0;
154  while( (var = Config.GetMyFirstWord()) )
155  {
156  if (strcmp("throttle.fslib", var) == 0)
157  {
158  val = Config.GetWord();
159  if (!val || !val[0]) {log.Emsg("Config", "fslib not specified."); continue;}
160  fslib = val;
161  }
162  TS_Xeq("throttle.max_open_files", xmaxopen);
163  TS_Xeq("throttle.max_active_connections", xmaxconn);
164  TS_Xeq("throttle.throttle", xthrottle);
165  TS_Xeq("throttle.loadshed", xloadshed);
166  TS_Xeq("throttle.trace", xtrace);
167  if (NoGo)
168  {
169  log.Emsg("Config", "Throttle configuration failed.");
170  }
171  }
172 
173  // Load the filesystem object.
174  m_sfs_ptr = native_fs ? native_fs : LoadFS(fslib, m_eroute, m_config_file);
175  if (!m_sfs_ptr) return 1;
176 
177  // Overwrite the environment variable saying that throttling is the fslib.
178  XrdOucEnv::Export("XRDOFSLIB", fslib.c_str());
179 
180  if (envP)
181  {
182  auto gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("Throttle.gStream*"));
183  log.Say("Config", "Throttle g-stream has", gstream ? "" : " NOT", " been configured via xrootd.mongstream directive");
184  m_throttle.SetMonitor(gstream);
185  }
186 
187  // The Feature function is not a virtual but implemented by the base class to
188  // look at a protected member. Thus, to forward the call, we need to copy
189  // from the underlying filesystem
190  FeatureSet = m_sfs_ptr->Features();
191  return 0;
192 }
193 
194 /******************************************************************************/
195 /* x m a x o p e n */
196 /******************************************************************************/
197 
198 /* Function: xmaxopen
199 
200  Purpose: Parse the directive: throttle.max_open_files <limit>
201 
202  <limit> maximum number of open file handles for a unique entity.
203 
204  Output: 0 upon success or !0 upon failure.
205 */
206 int
207 FileSystem::xmaxopen(XrdOucStream &Config)
208 {
209  auto val = Config.GetWord();
210  if (!val || val[0] == '\0')
211  {m_eroute.Emsg("Config", "Max open files not specified! Example usage: throttle.max_open_files 16000");}
212  long long max_open = -1;
213  if (XrdOuca2x::a2sz(m_eroute, "max open files value", val, &max_open, 1)) return 1;
214 
215  m_throttle.SetMaxOpen(max_open);
216  return 0;
217 }
218 
219 
220 /******************************************************************************/
221 /* x m a x c o n n */
222 /******************************************************************************/
223 
224 /* Function: xmaxconn
225 
226  Purpose: Parse the directive: throttle.max_active_connections <limit>
227 
228  <limit> maximum number of connections with at least one open file for a given entity
229 
230  Output: 0 upon success or !0 upon failure.
231 */
232 int
233 FileSystem::xmaxconn(XrdOucStream &Config)
234 {
235  auto val = Config.GetWord();
236  if (!val || val[0] == '\0')
237  {m_eroute.Emsg("Config", "Max active cconnections not specified! Example usage: throttle.max_active_connections 4000");}
238  long long max_conn = -1;
239  if (XrdOuca2x::a2sz(m_eroute, "max active connections value", val, &max_conn, 1)) return 1;
240 
241  m_throttle.SetMaxConns(max_conn);
242  return 0;
243 }
244 
245 
246 /******************************************************************************/
247 /* x t h r o t t l e */
248 /******************************************************************************/
249 
250 /* Function: xthrottle
251 
252  Purpose: To parse the directive: throttle [data <drate>] [iops <irate>] [concurrency <climit>] [interval <rint>]
253 
254  <drate> maximum bytes per second through the server.
255  <irate> maximum IOPS per second through the server.
256  <climit> maximum number of concurrent IO connections.
257  <rint> minimum interval in milliseconds between throttle re-computing.
258 
259  Output: 0 upon success or !0 upon failure.
260 */
261 int
262 FileSystem::xthrottle(XrdOucStream &Config)
263 {
264  long long drate = -1, irate = -1, rint = 1000, climit = -1;
265  char *val;
266 
267  while ((val = Config.GetWord()))
268  {
269  if (strcmp("data", val) == 0)
270  {
271  if (!(val = Config.GetWord()))
272  {m_eroute.Emsg("Config", "data throttle limit not specified."); return 1;}
273  if (XrdOuca2x::a2sz(m_eroute,"data throttle value",val,&drate,1)) return 1;
274  }
275  else if (strcmp("iops", val) == 0)
276  {
277  if (!(val = Config.GetWord()))
278  {m_eroute.Emsg("Config", "IOPS throttle limit not specified."); return 1;}
279  if (XrdOuca2x::a2sz(m_eroute,"IOPS throttle value",val,&irate,1)) return 1;
280  }
281  else if (strcmp("rint", val) == 0)
282  {
283  if (!(val = Config.GetWord()))
284  {m_eroute.Emsg("Config", "recompute interval not specified."); return 1;}
285  if (XrdOuca2x::a2sp(m_eroute,"recompute interval value",val,&rint,10)) return 1;
286  }
287  else if (strcmp("concurrency", val) == 0)
288  {
289  if (!(val = Config.GetWord()))
290  {m_eroute.Emsg("Config", "Concurrency limit not specified."); return 1;}
291  if (XrdOuca2x::a2sz(m_eroute,"Concurrency limit value",val,&climit,1)) return 1;
292  }
293  else
294  {
295  m_eroute.Emsg("Config", "Warning - unknown throttle option specified", val, ".");
296  }
297  }
298 
299  m_throttle.SetThrottles(drate, irate, climit, static_cast<float>(rint)/1000.0);
300  return 0;
301 }
302 
303 /******************************************************************************/
304 /* x l o a d s h e d */
305 /******************************************************************************/
306 
307 /* Function: xloadshed
308 
309  Purpose: To parse the directive: loadshed host <hostname> [port <port>] [frequency <freq>]
310 
311  <hostname> hostname of server to shed load to. Required
312  <port> port of server to shed load to. Defaults to 1094
313  <freq> A value from 1 to 100 specifying how often to shed load
314  (1 = 1% chance; 100 = 100% chance; defaults to 10).
315 
316  Output: 0 upon success or !0 upon failure.
317 */
318 int FileSystem::xloadshed(XrdOucStream &Config)
319 {
320  long long port = 0, freq = 0;
321  char *val;
322  std::string hostname;
323 
324  while ((val = Config.GetWord()))
325  {
326  if (strcmp("host", val) == 0)
327  {
328  if (!(val = Config.GetWord()))
329  {m_eroute.Emsg("Config", "loadshed hostname not specified."); return 1;}
330  hostname = val;
331  }
332  else if (strcmp("port", val) == 0)
333  {
334  if (!(val = Config.GetWord()))
335  {m_eroute.Emsg("Config", "Port number not specified."); return 1;}
336  if (XrdOuca2x::a2sz(m_eroute,"Port number",val,&port,1, 65536)) return 1;
337  }
338  else if (strcmp("frequency", val) == 0)
339  {
340  if (!(val = Config.GetWord()))
341  {m_eroute.Emsg("Config", "Loadshed frequency not specified."); return 1;}
342  if (XrdOuca2x::a2sz(m_eroute,"Loadshed frequency",val,&freq,1,100)) return 1;
343  }
344  else
345  {
346  m_eroute.Emsg("Config", "Warning - unknown loadshed option specified", val, ".");
347  }
348  }
349 
350  if (hostname.empty())
351  {
352  m_eroute.Emsg("Config", "must specify hostname for loadshed parameter.");
353  return 1;
354  }
355 
356  m_throttle.SetLoadShed(hostname, port, freq);
357  return 0;
358 }
359 
360 /******************************************************************************/
361 /* x t r a c e */
362 /******************************************************************************/
363 
364 /* Function: xtrace
365 
366  Purpose: To parse the directive: trace <events>
367 
368  <events> the blank separated list of events to trace. Trace
369  directives are cummalative.
370 
371  Output: 0 upon success or 1 upon failure.
372 */
373 
374 int FileSystem::xtrace(XrdOucStream &Config)
375 {
376  char *val;
377  static const struct traceopts {const char *opname; int opval;} tropts[] =
378  {
379  {"all", TRACE_ALL},
380  {"off", TRACE_NONE},
381  {"none", TRACE_NONE},
382  {"debug", TRACE_DEBUG},
383  {"iops", TRACE_IOPS},
384  {"bandwidth", TRACE_BANDWIDTH},
385  {"ioload", TRACE_IOLOAD},
386  {"files", TRACE_FILES},
387  {"connections",TRACE_CONNS},
388  };
389  int i, neg, trval = 0, numopts = sizeof(tropts)/sizeof(struct traceopts);
390 
391  if (!(val = Config.GetWord()))
392  {
393  m_eroute.Emsg("Config", "trace option not specified");
394  return 1;
395  }
396  while (val)
397  {
398  if (!strcmp(val, "off"))
399  {
400  trval = 0;
401  }
402  else
403  {
404  if ((neg = (val[0] == '-' && val[1])))
405  {
406  val++;
407  }
408  for (i = 0; i < numopts; i++)
409  {
410  if (!strcmp(val, tropts[i].opname))
411  {
412  if (neg)
413  {
414  if (tropts[i].opval) trval &= ~tropts[i].opval;
415  else trval = TRACE_ALL;
416  }
417  else if (tropts[i].opval) trval |= tropts[i].opval;
418  else trval = TRACE_NONE;
419  break;
420  }
421  }
422  if (i >= numopts)
423  {
424  m_eroute.Say("Config warning: ignoring invalid trace option '", val, "'.");
425  }
426  }
427  val = Config.GetWord();
428  }
429  m_trace.What = trval;
430  return 0;
431 }
432 
static XrdSysError eDest(0,"crypto_")
XrdSfsFileSystem * XrdSfsGetDefaultFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *EnvInfo)
Definition: XrdOfsFS.cc:49
int open(const char *path, int oflag,...)
static XrdSfsFileSystem * LoadFS(const std::string &fslib, XrdSysError &eDest, const std::string &config_file)
#define OFS_NAME
XrdSfsFileSystem * XrdSfsGetFileSystem2(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)
#define TS_Xeq(key, func)
XrdSfsFileSystem * XrdSfsGetFileSystem(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn)
XrdVERSIONINFO(XrdSfsGetFileSystem, FileSystem)
#define TRACE_IOLOAD
#define TRACE_BANDWIDTH
#define TRACE_FILES
#define TRACE_CONNS
#define TRACE_IOPS
#define TRACE_NONE
Definition: XrdTrace.hh:34
#define TRACE_DEBUG
Definition: XrdTrace.hh:36
#define TRACE_ALL
Definition: XrdTrace.hh:35
static int Export(const char *Var, const char *Val)
Definition: XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition: XrdOucEnv.cc:263
static int a2sp(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:213
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition: XrdOuca2x.cc:257
uint64_t FeatureSet
Adjust features at initialization.
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Definition: XrdSysError.cc:141
XrdSysLogger * logger(XrdSysLogger *lp=0)
Definition: XrdSysError.hh:141
void * getPlugin(const char *pname, int optional=0)
void * Persist()
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMonitor(XrdXrootdGStream *gstream)
void SetMaxConns(unsigned long max_conns)
XrdVersionInfo myVersion
XrdCmsConfig Config
XrdOucEnv * envP
Definition: XrdPss.cc:109
XrdSfsFileSystem * XrdSfsGetFileSystem_Internal(XrdSfsFileSystem *native_fs, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP)