Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python3 

2# 

3# Copyright (C) 2020 Vates SAS - ronan.abhamon@vates.fr 

4# 

5# This program is free software: you can redistribute it and/or modify 

6# it under the terms of the GNU General Public License as published by 

7# the Free Software Foundation, either version 3 of the License, or 

8# (at your option) any later version. 

9# This program is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

12# GNU General Public License for more details. 

13# 

14# You should have received a copy of the GNU General Public License 

15# along with this program. If not, see <https://www.gnu.org/licenses/>. 

16 

17from sm_typing import List, override 

18 

19from linstorjournaler import LinstorJournaler 

20from linstorvolumemanager import LinstorVolumeManager 

21 

22from concurrent.futures import ThreadPoolExecutor 

23import base64 

24import errno 

25import json 

26import socket 

27import threading 

28import time 

29 

30from cowutil import CowImageInfo, CowUtil, getCowUtil 

31import util 

32import xs_errors 

33 

34from vditype import VdiType 

35 

36MANAGER_PLUGIN = 'linstor-manager' 

37 

38 

39def call_remote_method(session, host_ref, method, args): 

40 try: 

41 response = session.xenapi.host.call_plugin( 

42 host_ref, MANAGER_PLUGIN, method, args 

43 ) 

44 except Exception as e: 

45 util.SMlog('call-plugin on {} ({} with {}) exception: {}'.format( 

46 host_ref, method, args, e 

47 )) 

48 raise util.SMException(str(e)) 

49 

50 util.SMlog('call-plugin on {} ({} with {}) returned: {}'.format( 

51 host_ref, method, args, response 

52 )) 

53 

54 return response 

55 

56 

57class LinstorCallException(util.SMException): 

58 def __init__(self, cmd_err): 

59 self.cmd_err = cmd_err 

60 

61 @override 

62 def __str__(self) -> str: 

63 return str(self.cmd_err) 

64 

65 

66class ErofsLinstorCallException(LinstorCallException): 

67 pass 

68 

69 

70class NoPathLinstorCallException(LinstorCallException): 

71 pass 

72 

73def log_successful_call(target_host, device_path, vdi_uuid, remote_method, response): 

74 util.SMlog('Successful access on {} for device {} ({}): `{}` => {}'.format( 

75 target_host, device_path, vdi_uuid, remote_method, str(response) 

76 ), priority=util.LOG_DEBUG) 

77 

78def log_failed_call(target_host, next_target, device_path, vdi_uuid, remote_method, e): 

79 util.SMlog('Failed to call method on {} for device {} ({}): {}. Trying accessing on {}... (cause: {})'.format( 

80 target_host, device_path, vdi_uuid, remote_method, next_target, e 

81 ), priority=util.LOG_DEBUG) 

82 

83def linstorhostcall(local_method, remote_method=None): 

84 if not remote_method: 

85 remote_method = local_method 

86 

87 def decorated(response_parser): 

88 def wrapper(*args, **kwargs): 

89 self = args[0] 

90 vdi_uuid = args[1] 

91 

92 device_path = self._linstor.build_device_path( 

93 self._linstor.get_volume_name(vdi_uuid) 

94 ) 

95 

96 if not self._session: 

97 return self._call_local_method(local_method, device_path, *args[2:], **kwargs) 

98 

99 remote_args = { 

100 'devicePath': device_path, 

101 'groupName': self._linstor.group_name, 

102 'vdiType': self._vdi_type 

103 } 

104 remote_args.update(**kwargs) 

105 remote_args = {str(key): str(value) for key, value in remote_args.items()} 

106 

107 this_host_ref = util.get_this_host_ref(self._session) 

108 def call_method(host_label, host_ref): 

109 if host_ref == this_host_ref: 

110 return self._call_local_method(local_method, device_path, *args[2:], **kwargs) 

111 response = call_remote_method(self._session, host_ref, remote_method, remote_args) 

112 log_successful_call(host_label, device_path, vdi_uuid, remote_method, response) 

113 return response_parser(self, vdi_uuid, response) 

114 

115 # 1. Try on attached host. 

116 try: 

117 host_ref_attached = next(iter(util.get_hosts_attached_on(self._session, [vdi_uuid])), None) 

118 if host_ref_attached: 

119 return call_method('attached host', host_ref_attached) 

120 except Exception as e: 

121 log_failed_call('attached host', 'master', device_path, vdi_uuid, remote_method, e) 

122 

123 # 2. Try on master host. 

124 try: 

125 return call_method('master', util.get_master_ref(self._session)) 

126 except Exception as e: 

127 log_failed_call('master', 'primary', device_path, vdi_uuid, remote_method, e) 

128 

129 # 3. Try on a primary. 

130 hosts = self._get_hosts(remote_method, device_path) 

131 

132 nodes, primary_hostname = self._linstor.find_up_to_date_diskful_nodes(vdi_uuid) 

133 if primary_hostname: 

134 try: 

135 return call_method('primary', self._find_host_ref_from_hostname(hosts, primary_hostname)) 

136 except Exception as remote_e: 

137 self._raise_openers_exception(device_path, remote_e) 

138 

139 log_failed_call('primary', 'another node', device_path, vdi_uuid, remote_method, 'no primary') 

140 

141 # 4. Try on any host with local data. 

142 try: 

143 return call_method('another node', next(filter(None, 

144 (self._find_host_ref_from_hostname(hosts, hostname) for hostname in nodes) 

145 ), None)) 

146 except Exception as remote_e: 

147 self._raise_openers_exception(device_path, remote_e) 

148 

149 return wrapper 

150 return decorated 

151 

152 

153def linstormodifier(): 

154 def decorated(func): 

155 def wrapper(*args, **kwargs): 

156 self = args[0] 

157 

158 ret = func(*args, **kwargs) 

159 self._linstor.invalidate_resource_cache() 

160 return ret 

161 return wrapper 

162 return decorated 

163 

164 

165class LinstorCowUtil(object): 

166 def __init__(self, session, linstor, vdi_type: str): 

167 self._session = session 

168 self._linstor = linstor 

169 self._cowutil = getCowUtil(vdi_type) 

170 self._vdi_type = vdi_type 

171 

172 @property 

173 def cowutil(self) -> CowUtil: 

174 return self._cowutil 

175 

176 def create_chain_paths(self, vdi_uuid, readonly=False): 

177 # OPTIMIZE: Add a limit_to_first_allocated_block param to limit cowutil calls. 

178 # Useful for the snapshot code algorithm. 

179 

180 leaf_vdi_path = self._linstor.get_device_path(vdi_uuid) 

181 path = leaf_vdi_path 

182 while True: 

183 if not util.pathexists(path): 

184 raise xs_errors.XenError( 

185 'VDIUnavailable', opterr='Could not find: {}'.format(path) 

186 ) 

187 

188 # Diskless path can be created on the fly, ensure we can open it. 

189 def check_volume_usable(): 

190 while True: 

191 try: 

192 with open(path, 'r' if readonly else 'r+'): 

193 pass 

194 except IOError as e: 

195 if e.errno == errno.ENODATA: 

196 time.sleep(2) 

197 continue 

198 if e.errno == errno.EROFS or e.errno == errno.EMEDIUMTYPE: 

199 util.SMlog('Volume not attachable because used. Openers: {}'.format( 

200 self._linstor.get_volume_openers(vdi_uuid) 

201 )) 

202 raise 

203 break 

204 util.retry(check_volume_usable, 15, 2) 

205 

206 vdi_uuid = self.get_info(vdi_uuid).parentUuid 

207 if not vdi_uuid: 

208 break 

209 path = self._linstor.get_device_path(vdi_uuid) 

210 readonly = True # Non-leaf is always readonly. 

211 

212 return leaf_vdi_path 

213 

214 # -------------------------------------------------------------------------- 

215 # Getters: read locally and try on another host in case of failure. 

216 # -------------------------------------------------------------------------- 

217 

218 def check(self, vdi_uuid, ignore_missing_footer=False, fast=False): 

219 kwargs = { 

220 'ignoreMissingFooter': ignore_missing_footer, 

221 'fast': fast 

222 } 

223 return self._check(vdi_uuid, **kwargs) 

224 

225 @linstorhostcall('check') 

226 def _check(self, vdi_uuid, response): 

227 return CowUtil.CheckResult(response) 

228 

229 def get_info(self, vdi_uuid, include_parent=True): 

230 kwargs = { 

231 'includeParent': include_parent, 

232 'resolveParent': False 

233 } 

234 

235 try: 

236 return self._get_info(vdi_uuid, self._extract_uuid, **kwargs) 

237 except Exception as e: 

238 # Backward compatibility with non-QCOW2 versions. 

239 if str(e).startswith("['UNKNOWN_XENAPI_PLUGIN_FUNCTION', 'getInfo']"): 

240 return self._get_vhd_info(vdi_uuid, self._extract_uuid, **kwargs) 

241 raise 

242 

243 @linstorhostcall('getInfo') 

244 def _get_info(self, vdi_uuid, response): 

245 return self._get_info_impl(vdi_uuid, response) 

246 

247 # Backward compatibility with non-QCOW2 versions. 

248 @linstorhostcall('getVHDInfo') 

249 def _get_vhd_info(self, vdi_uuid, response): 

250 return self._get_info_impl(vdi_uuid, response) 

251 

252 def _get_info_impl(self, vdi_uuid, response): 

253 obj = json.loads(response) 

254 

255 image_info = CowImageInfo(vdi_uuid) 

256 image_info.sizeVirt = obj['sizeVirt'] 

257 image_info.sizePhys = obj['sizePhys'] 

258 if 'parentPath' in obj: 

259 image_info.parentPath = obj['parentPath'] 

260 image_info.parentUuid = obj['parentUuid'] 

261 image_info.hidden = obj['hidden'] 

262 image_info.path = obj['path'] 

263 

264 return image_info 

265 

266 @linstorhostcall('hasParent') 

267 def has_parent(self, vdi_uuid, response): 

268 return util.strtobool(response) 

269 

270 def get_parent(self, vdi_uuid): 

271 return self._get_parent(vdi_uuid, self._extract_uuid) 

272 

273 @linstorhostcall('getParent') 

274 def _get_parent(self, vdi_uuid, response): 

275 return response 

276 

277 @linstorhostcall('getSizeVirt') 

278 def get_size_virt(self, vdi_uuid, response): 

279 return int(response) 

280 

281 @linstorhostcall('getMaxResizeSize') 

282 def get_max_resize_size(self, vdi_uuid, response): 

283 return int(response) 

284 

285 @linstorhostcall('getSizePhys') 

286 def get_size_phys(self, vdi_uuid, response): 

287 return int(response) 

288 

289 @linstorhostcall('getAllocatedSize') 

290 def get_allocated_size(self, vdi_uuid, response): 

291 return int(response) 

292 

293 @linstorhostcall('getDepth') 

294 def get_depth(self, vdi_uuid, response): 

295 return int(response) 

296 

297 @linstorhostcall('getKeyHash') 

298 def get_key_hash(self, vdi_uuid, response): 

299 return response or None 

300 

301 @linstorhostcall('getBlockBitmap') 

302 def get_block_bitmap(self, vdi_uuid, response): 

303 return base64.b64decode(response) 

304 

305 @linstorhostcall('_get_drbd_size', 'getDrbdSize') 

306 def get_drbd_size(self, vdi_uuid, response): 

307 return int(response) 

308 

309 def _get_drbd_size(self, path): 

310 (ret, stdout, stderr) = util.doexec(['blockdev', '--getsize64', path]) 

311 if ret == 0: 

312 return int(stdout.strip()) 

313 raise util.SMException('Failed to get DRBD size: {}'.format(stderr)) 

314 

315 # -------------------------------------------------------------------------- 

316 # Setters: only used locally. 

317 # -------------------------------------------------------------------------- 

318 

319 @linstormodifier() 

320 def create(self, path, size, static, msize=0): 

321 return self._call_local_method_or_fail(self._cowutil.create, path, size, static, msize) 

322 

323 @linstormodifier() 

324 def set_size_phys(self, path, size, debug=True): 

325 return self._call_local_method_or_fail(self._cowutil.setSizePhys, path, size, debug) 

326 

327 @linstormodifier() 

328 def set_parent(self, path, parentPath, parentRaw=False): 

329 return self._call_local_method_or_fail(self._cowutil.setParent, path, parentPath, parentRaw) 

330 

331 @linstormodifier() 

332 def set_hidden(self, path, hidden=True): 

333 return self._call_local_method_or_fail(self._cowutil.setHidden, path, hidden) 

334 

335 @linstormodifier() 

336 def set_key(self, path, key_hash): 

337 return self._call_local_method_or_fail(self._cowutil.setKey, path, key_hash) 

338 

339 @linstormodifier() 

340 def kill_data(self, path): 

341 return self._call_local_method_or_fail(self._cowutil.killData, path) 

342 

343 @linstormodifier() 

344 def snapshot(self, path, parent, parentRaw, msize=0, checkEmpty=True): 

345 return self._call_local_method_or_fail(self._cowutil.snapshot, path, parent, parentRaw, msize, checkEmpty) 

346 

347 def inflate(self, journaler, vdi_uuid, vdi_path, new_size, old_size): 

348 # Only inflate if the LINSTOR volume capacity is not enough. 

349 new_size = LinstorVolumeManager.round_up_volume_size(new_size) 

350 if new_size <= old_size: 

351 return 

352 

353 util.SMlog( 

354 'Inflate {} (size={}, previous={})' 

355 .format(vdi_path, new_size, old_size) 

356 ) 

357 

358 journaler.create( 

359 LinstorJournaler.INFLATE, vdi_uuid, old_size 

360 ) 

361 self._linstor.resize_volume(vdi_uuid, new_size) 

362 

363 result_size = self.get_drbd_size(vdi_uuid) 

364 if result_size < new_size: 

365 util.SMlog( 

366 'WARNING: Cannot inflate volume to {}B, result size: {}B' 

367 .format(new_size, result_size) 

368 ) 

369 

370 self._zeroize(vdi_path, result_size - self._cowutil.getFooterSize()) 

371 self.set_size_phys(vdi_path, result_size, False) 

372 journaler.remove(LinstorJournaler.INFLATE, vdi_uuid) 

373 

374 def deflate(self, vdi_path, new_size, old_size, zeroize=False): 

375 if zeroize: 

376 assert old_size > self._cowutil.getFooterSize() 

377 self._zeroize(vdi_path, old_size - self._cowutil.getFooterSize()) 

378 

379 new_size = LinstorVolumeManager.round_up_volume_size(new_size) 

380 if new_size >= old_size: 

381 return 

382 

383 util.SMlog( 

384 'Deflate {} (new size={}, previous={})' 

385 .format(vdi_path, new_size, old_size) 

386 ) 

387 

388 self.set_size_phys(vdi_path, new_size) 

389 # TODO: Change the LINSTOR volume size using linstor.resize_volume. 

390 

391 # -------------------------------------------------------------------------- 

392 # Remote setters: write locally and try on another host in case of failure. 

393 # -------------------------------------------------------------------------- 

394 

395 @linstormodifier() 

396 def set_size_virt(self, path, size, jFile): 

397 kwargs = { 

398 'size': size, 

399 'jFile': jFile 

400 } 

401 return self._call_method(self._cowutil.setSizeVirt, 'setSizeVirt', path, use_parent=False, **kwargs) 

402 

403 @linstormodifier() 

404 def set_size_virt_fast(self, path, size): 

405 kwargs = { 

406 'size': size 

407 } 

408 return self._call_method(self._cowutil.setSizeVirtFast, 'setSizeVirtFast', path, use_parent=False, **kwargs) 

409 

410 @linstormodifier() 

411 def force_parent(self, path, parentPath, parentRaw=False): 

412 kwargs = { 

413 'parentPath': str(parentPath), 

414 'parentRaw': parentRaw 

415 } 

416 return self._call_method(self._cowutil.setParent, 'setParent', path, use_parent=False, **kwargs) 

417 

418 @linstormodifier() 

419 def force_coalesce(self, path): 

420 return int(self._call_method(self._cowutil.coalesce, 'coalesce', path, use_parent=True)) 

421 

422 @linstormodifier() 

423 def force_repair(self, path): 

424 return self._call_method(self._cowutil.repair, 'repair', path, use_parent=False) 

425 

426 @linstormodifier() 

427 def force_deflate(self, path, newSize, oldSize, zeroize): 

428 kwargs = { 

429 'newSize': newSize, 

430 'oldSize': oldSize, 

431 'zeroize': zeroize 

432 } 

433 return self._call_method('_force_deflate', 'deflate', path, use_parent=False, **kwargs) 

434 

435 def _force_deflate(self, path, newSize, oldSize, zeroize): 

436 self.deflate(path, newSize, oldSize, zeroize) 

437 

438 # -------------------------------------------------------------------------- 

439 # Helpers. 

440 # -------------------------------------------------------------------------- 

441 

442 def compute_volume_size(self, virtual_size: int) -> int: 

443 if VdiType.isCowImage(self._vdi_type): 

444 # All LINSTOR VDIs have the metadata area preallocated for 

445 # the maximum possible virtual size (for fast online VDI.resize). 

446 meta_overhead = self._cowutil.calcOverheadEmpty( 

447 max(virtual_size, self._cowutil.getDefaultPreallocationSizeVirt()) 

448 ) 

449 bitmap_overhead = self._cowutil.calcOverheadBitmap(virtual_size) 

450 virtual_size += meta_overhead + bitmap_overhead 

451 else: 

452 raise Exception('Invalid image type: {}'.format(self._vdi_type)) 

453 

454 return LinstorVolumeManager.round_up_volume_size(virtual_size) 

455 

456 def _extract_uuid(self, device_path): 

457 # TODO: Remove new line in the vhdutil module. Not here. 

458 return self._linstor.get_volume_uuid_from_device_path( 

459 device_path.rstrip('\n') 

460 ) 

461 

462 def _get_hosts(self, remote_method, device_path): 

463 try: 

464 return self._session.xenapi.host.get_all_records() 

465 except Exception as e: 

466 raise xs_errors.XenError( 

467 'VDIUnavailable', 

468 opterr='Unable to get host list to run cowutil command `{}` (path={}): {}' 

469 .format(remote_method, device_path, e) 

470 ) 

471 

472 # -------------------------------------------------------------------------- 

473 

474 @staticmethod 

475 def _find_host_ref_from_hostname(hosts, hostname): 

476 return next((ref for ref, rec in hosts.items() if rec['hostname'] == hostname), None) 

477 

478 def _raise_openers_exception(self, device_path, e): 

479 if isinstance(e, util.CommandException): 

480 e_str = 'cmd: `{}`, code: `{}`, reason: `{}`'.format(e.cmd, e.code, e.reason) 

481 else: 

482 e_str = str(e) 

483 

484 try: 

485 volume_uuid = self._linstor.get_volume_uuid_from_device_path( 

486 device_path 

487 ) 

488 e_wrapper = Exception( 

489 e_str + ' (openers: {})'.format( 

490 self._linstor.get_volume_openers(volume_uuid) 

491 ) 

492 ) 

493 except Exception as illformed_e: 

494 e_wrapper = Exception( 

495 e_str + ' (unable to get openers: {})'.format(illformed_e) 

496 ) 

497 util.SMlog('raise opener exception: {}'.format(e_wrapper)) 

498 raise e_wrapper # pylint: disable = E0702 

499 

500 def _sanitize_local_method(self, local_method): 

501 if isinstance(local_method, str): 

502 return getattr(self if local_method.startswith('_') else self._cowutil, local_method) 

503 return local_method 

504 

505 def _call_local_method(self, local_method, device_path, *args, **kwargs): 

506 local_method = self._sanitize_local_method(local_method) 

507 

508 try: 

509 def local_call(): 

510 try: 

511 return local_method(device_path, *args, **kwargs) 

512 except util.CommandException as e: 

513 if e.code == errno.EROFS or e.code == errno.EMEDIUMTYPE: 

514 raise ErofsLinstorCallException(e) # Break retry calls. 

515 if e.code == errno.ENOENT: 

516 raise NoPathLinstorCallException(e) 

517 raise e 

518 # Retry only locally if it's not an EROFS exception. 

519 return util.retry(local_call, 5, 2, exceptions=[util.CommandException]) 

520 except util.CommandException as e: 

521 util.SMlog('failed to execute locally CowUtil (sys {})'.format(e.code)) 

522 raise e 

523 

524 def _call_local_method_or_fail(self, local_method, device_path, *args, **kwargs): 

525 try: 

526 return self._call_local_method(local_method, device_path, *args, **kwargs) 

527 except ErofsLinstorCallException as e: 

528 # Volume is locked on a host, find openers. 

529 self._raise_openers_exception(device_path, e.cmd_err) 

530 

531 def _call_method(self, local_method, remote_method, device_path, use_parent, *args, **kwargs): 

532 # Note: `use_parent` exists to know if the COW image parent is used by the local/remote method. 

533 # Normally in case of failure, if the parent is unused we try to execute the method on 

534 # another host using the DRBD opener list. In the other case, if the parent is required, 

535 # we must check where this last one is open instead of the child. 

536 

537 local_method = self._sanitize_local_method(local_method) 

538 

539 # A. Try to write locally... 

540 try: 

541 return self._call_local_method(local_method, device_path, *args, **kwargs) 

542 except Exception: 

543 pass 

544 

545 util.SMlog('unable to execute `{}` locally, retry using a writable host...'.format(remote_method)) 

546 

547 # B. Execute the command on another host. 

548 # B.1. Get host list. 

549 hosts = self._get_hosts(remote_method, device_path) 

550 

551 # B.2. Prepare remote args. 

552 remote_args = { 

553 'devicePath': device_path, 

554 'groupName': self._linstor.group_name, 

555 'vdiType': self._vdi_type 

556 } 

557 remote_args.update(**kwargs) 

558 remote_args = {str(key): str(value) for key, value in remote_args.items()} 

559 

560 volume_uuid = self._linstor.get_volume_uuid_from_device_path( 

561 device_path 

562 ) 

563 parent_volume_uuid = None 

564 if use_parent: 

565 parent_volume_uuid = self.get_parent(volume_uuid) 

566 

567 openers_uuid = parent_volume_uuid if use_parent else volume_uuid 

568 

569 # B.3. Call! 

570 def remote_call(): 

571 try: 

572 all_openers = self._linstor.get_volume_openers(openers_uuid) 

573 except Exception as e: 

574 raise xs_errors.XenError( 

575 'VDIUnavailable', 

576 opterr='Unable to get DRBD openers to run CowUtil command `{}` (path={}): {}' 

577 .format(remote_method, device_path, e) 

578 ) 

579 

580 no_host_found = True 

581 for hostname, openers in all_openers.items(): 

582 if not openers: 

583 continue 

584 

585 host_ref = self._find_host_ref_from_hostname(hosts, hostname) 

586 if not host_ref: 

587 continue 

588 

589 no_host_found = False 

590 try: 

591 return call_remote_method(self._session, host_ref, remote_method, remote_args) 

592 except Exception: 

593 pass 

594 

595 if no_host_found: 

596 try: 

597 return local_method(device_path, *args, **kwargs) 

598 except Exception as e: 

599 self._raise_openers_exception(device_path, e) 

600 

601 raise xs_errors.XenError( 

602 'VDIUnavailable', 

603 opterr='No valid host found to run CowUtil command `{}` (path=`{}`, openers=`{}`)' 

604 .format(remote_method, device_path, openers) 

605 ) 

606 return util.retry(remote_call, 5, 2) 

607 

608 def _zeroize(self, path, size): 

609 if not util.zeroOut(path, size, self._cowutil.getFooterSize()): 

610 raise xs_errors.XenError( 

611 'EIO', 

612 opterr='Failed to zero out COW image footer {}'.format(path) 

613 ) 

614 

615class MultiLinstorCowUtil: 

616 class ExecutorData(threading.local): 

617 def __init__(self): 

618 self.clear() 

619 

620 def clear(self): 

621 self.session = None 

622 self.linstor = None 

623 self.vdi_type_to_cowutil = {} 

624 

625 class Load: 

626 def __init__(self, session): 

627 self.session = session 

628 

629 def cleanup(self): 

630 if self.session: 

631 self.session.xenapi.session.logout() 

632 self.session = None 

633 

634 def __init__(self, uri, group_name) -> None: 

635 self._uri = uri 

636 self._group_name = group_name 

637 self._loads: List[MultiLinstorCowUtil.Load] = [] 

638 self._executor_data = self.ExecutorData() 

639 

640 def __del__(self): 

641 self._cleanup() 

642 

643 def run(self, func, user_data_list): 

644 def wrapper(func, user_data): 

645 if not self._executor_data.session: 

646 self._init_executor_thread() 

647 return func(user_data, self) 

648 

649 with ThreadPoolExecutor(thread_name_prefix="CowUtil") as executor: 

650 return executor.map(lambda user_data: wrapper(func, user_data), user_data_list) 

651 

652 def get_local_cowutil(self, vdi_type): 

653 instance = self._executor_data.vdi_type_to_cowutil.get(vdi_type) 

654 if not instance: 

655 instance = LinstorCowUtil( 

656 self._executor_data.session, 

657 self._executor_data.linstor, 

658 vdi_type 

659 ) 

660 self._executor_data.vdi_type_to_cowutil[vdi_type] = instance 

661 return instance 

662 

663 def _init_executor_thread(self): 

664 session = util.get_localAPI_session() 

665 load = self.Load(session) 

666 try: 

667 linstor = LinstorVolumeManager( 

668 self._uri, 

669 self._group_name, 

670 repair=False, 

671 logger=util.SMlog 

672 ) 

673 self._executor_data.linstor = linstor 

674 self._executor_data.session = session 

675 except: 

676 self._executor_data.clear() 

677 load.cleanup() 

678 raise 

679 

680 self._loads.append(load) 

681 

682 def _cleanup(self): 

683 for load in self._loads: 

684 try: 

685 load.cleanup() 

686 except Exception as e: 

687 util.SMlog(f"Failed to clean load executor: {e}") 

688 self._loads.clear()