37 #include <netinet/in.h>
38 #include <sys/types.h>
83 {nodeP->Delete(
Cluster.STMutex);
86 if (!
Cluster.Drop(nodeEnt, nodeInst,
this))
delete this;
91 nodeEnt(0), nodeInst(0)
95 nodeEnt(nid), nodeInst(inst)
111 memset((
void *)NodeTab, 0,
sizeof(NodeTab));
112 memset((
void *)AltMans, (
int)
' ',
sizeof(AltMans));
121 peerMask = ~peerHost;
129 const char *theNID,
const char *theIF)
133 const char *act =
"";
137 int tmp, Slot, Free = -1, Bump1 = -1, Bump2 = -1, Bump3 = -1, aSet = 0;
139 bool SpecAlt = (Special && !(Status &
CMS_isSuper));
149 for (Slot = 0; Slot <
STMax; Slot++)
151 {
if (NodeTab[Slot]->isNode(lp, theNID, port))
break;
152 if (NodeTab[Slot]->isConn)
153 {
if (!NodeTab[Slot]->isPerm && Special)
156 if ( NodeTab[Slot]->isPerm)
157 {
if (Bump3 < 0 && Special) Bump3 = Slot;}
160 }
else if (Free < 0) Free = Slot;
165 {
if (NodeTab[Slot] && NodeTab[Slot]->isBound)
166 {
Say.
Emsg(
"Cluster", lp->
ID,
"already logged in.");
184 {
if (!(nP = AddAlt(cidP, lp, port, Status, sport, theNID, theIF)))
186 aSet = 1; Slot = nP->NodeID;
187 if (nP != NodeTab[Slot]) {Hidden =
true; act =
"Alternate ";}
194 {
if (Free >= 0) Slot = Free;
195 else {
if (Bump1 >= 0) Slot = Bump1;
196 else Slot = (Bump2 >= 0 ? Bump2 : Bump3);
199 "failed; too many subscribers.");
201 DEBUG(lp->
ID <<
" redirected; too many subscribers.");
206 if (Status &
CMS_isMan) {setAltMan(Slot, lp, sport); aSet=1;}
208 sendAList(NodeTab[Slot]->Link);
210 DEBUG(lp->
ID <<
" bumps " << NodeTab[Slot]->Ident <<
" #" <<Slot);
211 NodeTab[Slot]->Lock();
212 Remove(
"redirected", NodeTab[Slot], -1);
215 NodeTab[Slot] = nP =
new XrdCmsNode(lp, theIF, theNID, port, 0, Slot);
217 if ((cidP->
AddNode(nP, SpecAlt))) nP->cidP = cidP;
218 else {
delete nP; NodeTab[Slot] = 0;
return 0;}
227 if (!aSet && (Status &
CMS_isSuper)) setAltMan(Slot, lp, sport);
228 if (Slot > STHi) STHi = Slot;
236 nP->subsPort = sport;
245 }
else nP->
isMan |= 0x02;
249 if (nP->
isPeer) peerHost |= nP->NodeMask;
250 else peerHost &= ~nP->NodeMask;
251 peerMask = ~peerHost;
256 {
DEBUG(act <<nP->
Ident <<
" to cluster " <<nP->myNID <<
" slot "
257 <<Slot <<
'.' <<nP->Instance <<
" (nodecnt=" <<NodeCnt
281 int port,
int Status,
int sport,
282 const char *theNID,
const char *theIF)
287 int slot = cidP->Slot();
292 {
Say.
Emsg(epname, lp->ID,
"already logged in.");
299 {nP =
new XrdCmsNode(lp, theIF, theNID, port, 0, slot);
300 if (!(cidP->
AddNode(nP,
true))) {
delete nP; nP = 0;}
306 {
Say.
Emsg(epname,
"Add alternate manager", lp->ID,
307 "failed; too many subscribers.");
313 if ((pP = NodeTab[slot]) && !(pP->
isBound))
314 {setAltMan(nP->NodeID, nP->Link, sport);
334 const char *etxt =
"blacklisted.";
344 for (i = 0; i <= STHi; i++)
345 {
if ((nP = NodeTab[i]))
354 etxt =
"blacklisted; redirect unsupported.";
355 else etxt =
"blacklisted with redirect.";
357 nP->
Send((
char *)&discRequest,
sizeof(discRequest));
362 Say.
Emsg(
"Manager", nP->
Name(),
"removed from blacklist.");
375 int iovcnt,
int iotot)
385 bmask = smask & peerMask;
391 for (i = 0; i <= STHi; i++)
392 {
if ((nP = NodeTab[i]) && nP->
isNode(bmask))
396 if (nP->
Send(iod, iovcnt, iotot) < 0)
397 {unQueried |= nP->
Mask();
412 char *Data,
int Dlen)
414 struct iovec ioV[3], *iovP = &ioV[1];
421 Blen =
XrdOucPup::Pack(&iovP, Data, Temp, (Dlen ? strlen(Data)+1 : Dlen));
422 Hdr.
datalen = htons(
static_cast<unsigned short>(Blen));
426 ioV[0].iov_base = (
char *)&Hdr; ioV[0].iov_len =
sizeof(Hdr);
427 return Broadcast(smask, ioV, 3, Blen+
sizeof(Hdr));
433 void *Data,
int Dlen)
435 struct iovec ioV[2] = {{(
char *)&Hdr,
sizeof(Hdr)},
436 {(
char *)Data, (
size_t)Dlen}};
440 Hdr.
datalen = htons(
static_cast<unsigned short>(Dlen));
441 return Broadcast(smask, ioV, 2, Dlen+
sizeof(Hdr));
451 void *Data,
int Dlen)
454 static int Start = 0;
456 struct iovec ioV[2] = {{(
char *)&Hdr,
sizeof(Hdr)},
457 {(
char *)Data, (
size_t)Dlen}};
458 int i, Beg, Fin, ioTot = Dlen+
sizeof(Hdr);
462 Hdr.
datalen = htons(
static_cast<unsigned short>(Dlen));
468 Beg = Start = (Start <= STHi ? Start+1 : 0);
475 do{
for (i = Beg; i <= Fin; i++)
476 {
if ((nP = NodeTab[i]) && nP->
isNode(Who))
480 if (nP->
Send(ioV, 2, ioTot) >= 0) {nP->
unRef();
return 1;}
487 Fin = Beg-1; Beg = 0;
512 for (i = 0; i <= STHi; i++)
513 if ((nP = NodeTab[i]) && nP->
isNode(addr))
514 {smask = nP->NodeMask;
break;}
541 bool retName = (
opts & LS_IDNT) != 0;
542 bool retAny = (
opts & LS_ANY ) != 0;
543 bool retDest = retName || (
opts & LS_IPO);
549 for (i = 0; i <= STHi; i++)
550 if ((nP=NodeTab[i]) && (nP->NodeMask & mask))
553 {
if (nP->netIF.
HasDest(ifType)) ifGet = ifType;
554 else if (!retAny)
continue;
556 if (!nP->netIF.
HasDest(ifGet))
continue;
560 if (retDest) destLen = nP->netIF.
GetDest(sip->Ident, iSize,
563 else {strcpy(sip->Ident, nP->myName); destLen = nP->myNlen;}
564 if (!destLen) {
delete sip;
continue;}
566 sip->IdentLen = destLen;
567 sip->Mask = nP->NodeMask;
568 sip->Id = nP->NodeID;
569 sip->Port = nP->netIF.
Port();
570 sip->RefTotW = nP->RefTotW;
571 sip->RefTotR = nP->RefTotR;
572 sip->Shrin = nP->Shrin;
573 sip->Share = nP->Share;
607 else {
if (*(Sel.
Path.
Val+1) ==
'\0')
608 {Sel.
Vec.hf = ~0LL; Sel.
Vec.pf = Sel.
Vec.wf = 0;
617 {Sel.
Vec.hf = Sel.
Vec.pf = Sel.
Vec.wf = 0;
641 amask = pmask = pinfo.
rovec;
644 if (!(retc = SelDFS(Sel, amask, pmask, smask, 1)))
647 if (retc < 0)
return NotFound;
659 qfVec = pinfo.
rovec; Sel.
Vec.hf = 0;
660 }
else qfVec = Sel.
Vec.bf;
664 if ((!qfVec && retc >= 0) || (Sel.
Vec.hf && Sel.
InfoP)) retc = 0;
688 struct iovec ioV[] = {{(
char *)&
Usage,
sizeof(
Usage)}};
689 int ioVnum =
sizeof(ioV)/
sizeof(
struct iovec);
690 int ioVtot =
sizeof(
Usage);
698 Broadcast(allNodes, ioV, ioVnum, ioVtot);
710 int snooze_interval = 60, snooze_total = 0;
711 int rCnt = 0, wCnt = 0;
712 bool resetW, resetR, resetRW;
719 int totR = 0, totW = 0;
722 for (
int i = 0; i <= STHi; i++)
723 {
if ((nP = NodeTab[i]))
724 {totR += nP->RefTotR;
730 rCnt += (totR - SelRtot); SelRtot = totR;
731 wCnt += (totW - SelWtot); SelWtot = totW;
732 snooze_total += snooze_interval;
736 resetRW = (snooze_total >=
Config.
RefReset && (resetW || resetR));
739 if (resetR) rCnt = 0;
740 if (resetW) wCnt = 0;
779 : myMutex(mtx), myNode(node), hasLK(immed < 0),
782 myNID = node->
ID(myInst);
795 myNode->DropTime = 0;
800 if (!hasLK) myMutex->
UnLock();
802 } LockHandler(&STMutex, theNode, immed);
805 int Inst, NodeID = theNode->
ID(Inst);
812 if (LockHandler.myNID != NodeID || LockHandler.myInst != Inst)
813 {
Say.
Emsg(
"Manager", LockHandler.myIdent,
"removal aborted.");
814 DEBUG(LockHandler.myIdent <<
" node " <<NodeID <<
'.' <<Inst <<
" != "
815 << LockHandler.myNID <<
'.' <<LockHandler.myInst <<
" at entry.");
829 {theNode->
Disc(reason, 0);
836 if (!(NodeTab[NodeID] == theNode))
837 {
const char *why = (theNode->
isMan ?
"dropped as alternate."
838 :
"dropped and redirected.");
840 LockHandler.doDrop =
true;
860 if (theNode->
isMan && theNode->cidP && !(theNode->cidP->
IsSingle())
861 && (altNode = theNode->cidP->
RemNode(theNode)))
862 {
if (altNode->
isBound) NodeCnt++;
863 NodeTab[NodeID] = altNode;
868 setAltMan(altNode->NodeID, altNode->Link, altNode->subsPort);
870 LockHandler.doDrop =
true;
879 LockHandler.myNode = 0;
888 if (theNode->DropJob) theNode->DropJob->
nodeInst = Inst;
889 else theNode->DropJob =
new XrdCmsDrop(NodeID, Inst);
894 Say.
Emsg(
"Manager", theNode->
Ident,
"scheduled for removal;", reason);
895 else DEBUG(theNode->
Ident <<
" node " <<NodeID <<
'.' <<Inst);
905 bool doAll (nMask == 0);
909 if (!isLocked) STMutex.ReadLock();
914 for (
int i = 0; i <= STHi; i++)
915 {
if ((nP = NodeTab[i]) && (doAll || nP->
isNode(nMask)))
918 nP->Shrem = nP->Share;
924 if (!isLocked) STMutex.UnLock();
942 {isRW = 1; Amode =
"write";
948 else {isRW = 0; Amode =
"read"; fRD = 1;}
954 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
955 "No servers %s %s access to the file",
971 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
972 "Too many DFS %s attempts; operation terminated", Amode)+1;
980 if (!(retc = SelDFS(Sel, amask, pmask, smask, isRW)))
982 if (retc < 0)
return retc;
983 }
else if (noSel)
return 0;
984 return SelNode(Sel, pmask, smask);
1000 {pmask = amask & ~(Sel.
Vec.hf | Sel.
Vec.bf); smask = 0;
1001 if (!pmask && !Sel.
Vec.bf)
return SelFail(Sel,eNoRep);
1003 else if (Sel.
Vec.bf) pmask = smask = 0;
1004 else if (Sel.
Vec.hf)
1008 && maxBits(Sel.
Vec.hf,2))
return SelFail(Sel,eDups);
1010 != (Sel.
Vec.hf & pinfo.
rovec))
return SelFail(Sel,eROfs);
1012 if (!(pmask = Sel.
Vec.hf & amask))
return SelFail(Sel,eNoSel);
1016 {pmask = amask; smask = 0;}
1017 else if ((smask = pinfo.
ssvec & amask)) pmask = 0;
1018 else pmask = smask = 0;
1020 pmask = Sel.
Vec.hf & amask;
1022 else smask = (retc < 0 ? 0 : pinfo.
ssvec & amask);
1028 Sel.
Vec.hf = Sel.
Vec.pf = pmask = smask = 0;
1034 dowt = (!pmask && !smask);
1048 if (dowt)
return retc;
1049 }
else if (dowt && retc < 0 && !noSel)
1061 if (noSel)
return 0;
1065 if (dowt)
return Unuseable(Sel);
1073 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
1074 "Too many attempts to stage %s access to the file", Amode)+1;
1082 return SelNode(Sel, pmask, smask);
1088 int isrw,
int isMulti,
int ifWant)
1090 static const SMask_t smLow(255);
1099 if (!pmask)
return 0;
1119 {STMutex.ReadLock();
1122 : SelbyLoadR(pmask, selR));
1124 if (nP) hlen = nP->netIF.
GetName(hbuff, port, nType) + 1;
1133 do {
if (!(tmask = pmask & smLow)) Snum += 8;
1134 else {
while((tmask = tmask>>1)) Snum++;
break;}
1135 }
while((pmask = pmask >> 8));
1140 if ((nP = NodeTab[Snum]))
1141 {
if (nP->
isBad) nP = 0;
1147 else {nP->RefTotW++; nP->RefW++;}
1148 else {nP->RefTotR++; nP->RefR++;}
1155 {hlen = nP->netIF.
GetName(hbuff, port, nType) + 1;
1171 const char *etext, *Item =
"file";
1175 etext =
"Unable to create %s; it already exists.";
1178 case eROfs: etext =
"Unable to modify %s; r/o copy already exists.";
1181 case eDups: etext =
"Unable to modify %s; multiple copies exist.";
1184 case eNoRep: etext =
"Unable to replicate %s; no new sites available.";
1187 case eNoSel:
if (Sel.
Vec.hf & Sel.
nmask)
1188 {etext =
"Unable to access %s; eligible servers shunned.";
1192 {etext =
"Unable to write %s; r/w exports not found.";
1194 etext =
"Unable to access %s; it does not exist.";
1200 default: etext =
"Unable to access %s; it does not exist.";
1205 int n = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data), etext, Item);
1206 if (n < (
int)
sizeof(Sel.
Resp.Data)) Sel.
Resp.DLen = n+1;
1207 else Sel.
Resp.DLen =
sizeof(Sel.
Resp.Data);
1226 bmask = smask & peerMask;
1230 for (i = 0; i <= STHi; i++)
1232 {
if (doAll || !sData.
Total)
1258 static const char statfmt1[] =
"<stats id=\"cms\">"
1259 "<role>%s</role></stats>";
1264 if (!bfr)
return sizeof(statfmt1) + 8;
1270 if ((bln -= mlen) <= 0)
return 0;
1280 static const char statfmt0[] =
"</stats>";
1281 static const char statfmt1[] =
"<stats id=\"cmsm\">"
1282 "<role>%s</role><sel><t>%lld</t><r>%lld</r><w>%lld</w></sel>"
1284 static const char statfmt2[] =
"<stats id=\"%d\">"
1285 "<host>%s</host><role>%s</role>"
1286 "<run>%s</run><ref><r>%d</r><w>%d</w></ref>%s</stats>";
1287 static const char statfmt3[] =
"<shr>%d<use>%d</use></shr>";
1288 static const char statfmt4[] =
"</node>";
1289 static const char statfmt5[] =
1290 "<frq><add>%lld<d>%lld</d></add><rsp>%lld<m>%lld</m></rsp>"
1291 "<lf>%lld</lf><ls>%lld</ls><rf>%lld</rf><rs>%lld</rs></frq>";
1299 int mlen, tlen, n = 0;
1300 char shrBuff[80],
stat[6], *stp;
1308 while((xsp = sp)) {sp = sp->
next;
delete xsp;}
1315 {n =
sizeof(statfmt0) +
1316 sizeof(statfmt1) + 12*3 + 3 + 3 +
1317 (
sizeof(statfmt2) + 10*2 + 256 + 16) *
STMax +
sizeof(statfmt4);
1318 if (AddShr) n +=
sizeof(statfmt3) + 12;
1319 if (AddFrq) n +=
sizeof(statfmt4) + (10*8);
1330 while(sp) {n++; sp = sp->
next;}
1335 long long lclTcnt = SelTcnt, lclRtot = SelRtot, lclWtot = SelWtot;
1336 mlen = snprintf(bfr, bln, statfmt1,
1339 if ((bln -= mlen) <= 0)
return 0;
1340 tlen = mlen; bfr += mlen; n = 0; *shrBuff = 0;
1342 while(sp && bln > 0)
1351 if (AddShr) snprintf(shrBuff,
sizeof(shrBuff), statfmt3,
1353 mlen = snprintf(bfr, bln, statfmt2, n, sp->
Ident,
1356 bfr += mlen; bln -= mlen; tlen += mlen;
1360 if (bln <= (
int)
sizeof(statfmt4))
return 0;
1361 strcpy(bfr, statfmt4); mlen =
sizeof(statfmt4) - 1;
1362 bfr += mlen; bln -= mlen; tlen += mlen;
1364 if (AddFrq && bln > 0)
1365 {mlen = snprintf(bfr, bln, statfmt5, Frq.
Add2Q, Frq.
PBack, Frq.
Resp,
1367 bfr += mlen; bln -= mlen; tlen += mlen;
1372 if (sp || bln < (
int)
sizeof(statfmt0))
return 0;
1373 strcpy(bfr, statfmt0);
1374 return tlen +
sizeof(statfmt0) - 1;
1388 ?
"no eligible servers reachable for"
1389 :
"no eligible servers for");
1392 selR.
reason =
"no eligible servers have space for";
1395 selR.
reason =
"eligible servers overloaded for";
1398 selR.
reason =
"eligible servers suspended for";
1401 selR.
reason =
"eligible servers offline for";
1404 selR.
reason =
"server selection error for";
1419 int XrdCmsCluster::Drop(
int sent,
int sinst,
XrdCmsDrop *djp)
1427 if (djp) STMutex.WriteLock();
1431 if (!(nP = NodeTab[sent]) || nP->Inst() != sinst)
1432 {
if (nP && djp == nP->DropJob) {nP->DropJob = 0; nP->DropTime = 0;}
1433 if (djp) STMutex.UnLock();
1434 DEBUG(sent <<
'.' <<sinst <<
" cancelled.");
1440 if (djp && time(0) < nP->DropTime)
1442 if (djp) STMutex.UnLock();
1460 if (nP->
isPeer) {peerHost &= nP->NodeMask; peerMask = ~peerHost;}
1465 {memset((
void *)&AltMans[sent*AltSize], (
int)
' ', AltSize);
1466 if (sent == AltMent)
1468 while(AltMent >= 0 && NodeTab[AltMent]
1469 && !NodeTab[AltMent]->isMan) AltMent--;
1470 if (AltMent < 0) AltMend = AltMans;
1471 else AltMend = AltMans + ((AltMent+1)*AltSize);
1477 if (sent == STHi)
while(STHi >= 0 && !NodeTab[STHi]) STHi--;
1481 if (nP->NodeMask)
Cache.
Drop(nP->NodeMask, sent, STHi);
1487 if (djp) {STMutex.UnLock(); nP->
Delete(STMutex);}
1492 Say.
Emsg(
"Drop_Node", hname,
"dropped.");
1500 int XrdCmsCluster::Multiple(
SMask_t mVec)
1502 static const unsigned long long Left32 = 0xffffffff00000000LL;
1503 static const unsigned long long Right32 = 0x00000000ffffffffLL;
1504 static const unsigned long long Left16 = 0x00000000ffff0000LL;
1505 static const unsigned long long Right16 = 0x000000000000ffffLL;
1506 static const unsigned long long Left08 = 0x000000000000ff00LL;
1507 static const unsigned long long Right08 = 0x00000000000000ffLL;
1508 static const unsigned long long Left04 = 0x00000000000000f0LL;
1509 static const unsigned long long Right04 = 0x000000000000000fLL;
1511 static const int isMult[16] = {0,0,0,1,0,1,1,1,0,1,1,1,1,1,1,1};
1513 if (mVec & Left32) {
if (mVec & Right32)
return 1;
1514 else mVec = mVec >> 32LL;
1516 if (mVec & Left16) {
if (mVec & Right16)
return 1;
1517 else mVec = mVec >> 16LL;
1519 if (mVec & Left08) {
if (mVec & Right08)
return 1;
1520 else mVec = mVec >> 8LL;
1522 if (mVec & Left04) {
if (mVec & Right04)
return 1;
1523 else mVec = mVec >> 4LL;
1525 return isMult[mVec];
1532 bool XrdCmsCluster::maxBits(
SMask_t mVec,
int mbits)
1539 {mVec &= (mVec - 1);
1541 if (count >= mbits)
return true;
1553 void XrdCmsCluster::Record(
char *path,
const char *reason,
bool force)
1556 static
int msgcnt = 255;
1560 DEBUG(reason <<path);
1562 msgcnt++; skipmsg = msgcnt & (force ? 0x0f : 0xff);
1565 if (!skipmsg)
Say.Emsg(epname, "client deferred;", reason, path);
1576 int affsel = 1, count = 0, isalt = 0, pass = 2;
1584 selR.needNet =
XrdNetIF::Mask(nType);
1590 ? Sel.AltHash : Sel.Path.Hash);
1592 for (count = 0; sVec; count++) sVec &= (sVec - 1);
1593 if (count > 1) selR.
selPack = affsel = (theHash % count) + 1;
1609 mask = pmask & peerMask;
1613 ? SelbyRef(mask,selR)
1615 : SelbyLoadR(pmask, selR));
1626 {
TRACE(Redirect,
"affinity " <<affsel <<
'/' <<count <<
'/'
1628 <<nP->
Name() <<
' ' <<Sel.Path.Val);
1636 Sel.Resp.DLen = nP->netIF.
GetName(Sel.Resp.Data, Sel.Resp.Port, nType);
1637 if (!Sel.Resp.DLen) {nP->
UnLock();
return Unreachable(Sel,
false);}
1638 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1642 if (Sel.iovN && Sel.iovP) nP->
Send(Sel.iovP, Sel.iovN);
1658 {
if (isalt) act = (Sel.iovN ?
" staging " :
" assigned ");
1659 else act =
" serving ";
1661 TRACE(Stage, Sel.Resp.Data <<act <<Sel.Path.Val);
1670 Record(Sel.Path.Val,
"insufficient number of nodes",
true);
1678 Record(Sel.Path.Val, selR.
reason);
1687 {
const char *reason1 = selR.
reason;
1688 int delay1 = selR.
delay;
1689 bool noNet = selR.
xNoNet;
1690 if ((mask = (pmask | amask) & peerHost)) nP = SelbyCost(mask, selR);
1693 Sel.Resp.DLen = nP->netIF.
GetName(Sel.Resp.Data,Sel.Resp.Port,nType);
1694 if (!Sel.Resp.DLen) {nP->
UnLock();
return Unreachable(Sel,
false);}
1695 Sel.Resp.DLen++; Sel.smask = nP->NodeMask;
1696 if (Sel.iovN && Sel.iovP) nP->
Send(Sel.iovP, Sel.iovN);
1698 TRACE(Stage,
"Peer " <<Sel.Resp.Data <<
" handling " <<Sel.Path.Val);
1712 {Record(Sel.Path.Val, selR.
reason);
1718 if (selR.
xNoNet)
return Unreachable(Sel,
true);
1719 return Unuseable(Sel);
1729 #define RefCount(sP, sPMulti, NeedSpace) \
1730 if (NeedSpace) {sP->RefTotW++; sP->RefW++;} \
1731 else {sP->RefTotR++; sP->RefR++;} \
1732 if (sPMulti && sP->Share && !sP->Shrem--) \
1733 {sP->RefW += sP->Shrip; sP->RefR += sP->Shrip; \
1734 sP->Shrem = sP->Share; sP->Shrin++; \
1753 selR.
Reset(); SelTcnt++;
1754 for (
int i = 0; i <= STHi; i++)
1755 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1759 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1762 else{
if (abs(sp->myCost - np->myCost) <=
Config.
P_fuzz)
1771 else if (sp->RefR > np->RefR) sp=np;
1773 else if (sp->myCost > np->myCost) sp=np;
1780 if (!sp)
return calcDelay(selR);
1798 selR.
Reset(); SelTcnt++;
1799 for (
int i = 0; i <= STHi; i++)
1800 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1804 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1808 {selR.
xFull =
true;
continue;}
1813 else if (sp->myMass > np->myMass) sp=np;
1820 else if (sp->RefR > np->RefR) sp=np;
1822 else if (sp->myLoad > np->myLoad) sp=np;
1830 if (!sp)
return calcDelay(selR);
1843 static std::random_device rand_dev;
1844 static std::default_random_engine generator(rand_dev());
1856 for (
int i = 0; i <= STHi; ++i) {
1859 if (!((np = NodeTab[i]) && (np->NodeMask & mask)))
1867 if (np->
isBad) { selR.
xSusp =
true;
continue; }
1879 NodeWeight[i] = totWeight;
1882 std::uniform_int_distribution<int> distr(1, totWeight);
1883 int selected = distr(generator);
1885 for (
int i = 0; i <= STHi; ++i) {
1886 if (NodeWeight[i] < selected)
1893 return sp ? sp : calcDelay(selR);
1909 selR.
Reset(); SelTcnt++;
1910 for (
int i = 0; i <= STHi; i++)
1911 if ((np = NodeTab[i]) && (np->NodeMask & mask))
1915 if (np->
isBad) {selR.
xSusp =
true;
continue;}
1918 {selR.
xFull =
true;
continue;}
1927 else if (sp->RefR > np->RefR) sp=np;
1933 if (!sp)
return calcDelay(selR);
1946 static const SMask_t allNodes(~0);
1966 Sel.
Vec.hf = amask; Sel.
Vec.wf = (isRW ? amask : 0);
1984 if (isRW && Sel.
Vec.hf)
1994 return SelFail(Sel, eNoEnt);
2004 void XrdCmsCluster::sendAList(
XrdLink *lp)
2007 static int HdrSize =
sizeof(Req.
Hdr) +
sizeof(Req.
sLen);
2008 static char *AltNext = AltMans;
2009 static struct iovec
iov[4] = {{(caddr_t)&Req, (
size_t)HdrSize},
2012 {(caddr_t)
"\0", 1}};
2017 AltNext = AltNext + AltSize;
2018 if (AltNext >= AltMend)
2021 iov[2].iov_len = dlen = AltMend - AltMans;
2023 iov[1].iov_base = (caddr_t)AltNext;
2024 iov[1].iov_len = AltMend - AltNext;
2025 iov[2].iov_len = AltNext - AltMans;
2026 dlen =
iov[1].iov_len +
iov[2].iov_len;
2032 Req.
Hdr.
datalen = htons(
static_cast<unsigned short>(dlen+
sizeof(Req.
sLen)));
2033 Req.
sLen = htons(
static_cast<unsigned short>(dlen));
2037 lp->
Send(
iov, 4, dlen+HdrSize);
2046 void XrdCmsCluster::setAltMan(
int snum,
XrdLink *lp,
int port)
2049 char *ap = &AltMans[snum*AltSize];
2055 memset(ap,
int(
' '), AltSize);
2069 if (ap >= AltMend) {AltMend = ap + AltSize; AltMent = snum;}
2083 {Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
2084 "No servers are reachable via %s network to %s%s the file.",
2087 Sel.
Resp.DLen = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data)-1,
2088 "Eligible server is unreachable via %s network to %s%s the file.",
2105 int n = snprintf(Sel.
Resp.Data,
sizeof(Sel.
Resp.Data),
2106 "No servers are available to %s%s the %s.",
2107 Xmode, Amode, EType);
2108 if (n < (
int)
sizeof(Sel.
Resp.Data)) Sel.
Resp.DLen = n+1;
2109 else Sel.
Resp.DLen =
sizeof(Sel.
Resp.Data);
void Usage(const char *msg)
#define RefCount(sP, sPMulti, NeedSpace)
unsigned long long SMask_t
int stat(const char *path, struct stat *buf)
int Exists(XrdCmsRRData &Arg, XrdCmsPInfo &Who, int noLim=0)
static int Present(const char *hName, XrdOucTList *bList=0, char *rbuff=0, int rblen=0)
int GetFile(XrdCmsSelect &Sel, SMask_t mask)
int AddFile(XrdCmsSelect &Sel, SMask_t mask)
int UnkFile(XrdCmsSelect &Sel, SMask_t mask)
void Drop(SMask_t mask, int SNum, int xHi)
int WT4File(XrdCmsSelect &Sel, SMask_t mask)
static XrdCmsClustID * AddID(const char *cID)
static SMask_t Mask(const char *cID)
XrdCmsNode * RemNode(XrdCmsNode *nP)
static XrdCmsClustID * Find(const char *cID)
bool AddNode(XrdCmsNode *nP, bool isMan)
SMask_t getMask(const XrdNetAddr *addr)
void Space(XrdCms::SpaceData &sData, SMask_t smask)
int Broadsend(SMask_t smask, XrdCms::CmsRRHdr &Hdr, void *Data, int Dlen)
int Select(XrdCmsSelect &Sel)
int Locate(XrdCmsSelect &Sel)
void ResetRef(SMask_t smask, bool isLocked=false)
SMask_t Broadcast(SMask_t, const struct iovec *, int, int tot=0)
XrdCmsSelected * List(SMask_t mask, CmsLSOpts opts, bool &oksel)
XrdCmsNode * Add(XrdLink *lp, int dport, int Status, int sport, const char *theNID, const char *theIF)
void Remove(XrdCmsNode *theNode)
int Stats(char *bfr, int bln)
virtual void BlackList(XrdOucTList *blP)
int Statt(char *bfr, int bln)
static const int RepStat_shr
static const int RepStat_frq
XrdCmsDrop(int nid, int inst)
XrdCmsDrop(XrdCmsNode *nP)
static const char allowsRW
void Delete(XrdSysRWLock &gMutex)
void n2gLock(XrdSysRWLock &gMutex, bool rdlock=false)
int Send(const char *buff, int blen=0)
static const char allowsSS
static const char isDisabled
int isNode(SMask_t smask)
void g2nLock(XrdSysRWLock &gMutex)
static const char isSuspend
void Disc(const char *reason=0, int needLock=1)
void setName(XrdLink *lnkp, const char *theIF, int port)
static const char isDoomed
static const char isBlisted
int Find(const char *pname, XrdCmsPInfo &masks)
void Statistics(Info &Data)
static const char * Type(RoleID rid)
struct XrdCmsSelect::@94 Resp
struct XrdCmsSelect::@93 Vec
static const int IdentSize
void Update(StateType StateT, int ActivVal, int StageVal=0)
const XrdNetAddr * NetAddr() const
char * ID
Pointer to the client's link identity.
int Send(const char *buff, int blen)
static const int prefipv4
Use if mapped IPV4 actual format.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
@ fmtName
Hostname if it is resolvable o/w use fmtAddr.
bool HasDest(ifType ifT=PublicV6)
static const char * Name(ifType ifT)
int GetName(const char *&name, ifType ifT=PublicV6)
int GetDest(char *dest, int dlen, ifType ifT=PublicV6, bool prefn=false)
ifType
The enum that is used to index into ifData to get appropriate interface.
static int Pack(struct iovec **, const char *, unsigned short &buff)
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
static void Snooze(int seconds)
static struct XrdCl::None none
ZipListImpl< false > List(Ctx< ZipArchive > zip)
Factory for creating ZipStatImpl objects.
static const unsigned char kYR_Version
static const int CMS_isSuper
static const int CMS_noStage
static const int CMS_isMan
static const int CMS_isPeer
static const int CMS_Suspend