Coverage for drivers/linstorcowutil.py : 23%
Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python3
2#
3# Copyright (C) 2020 Vates SAS - ronan.abhamon@vates.fr
4#
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program. If not, see <https://www.gnu.org/licenses/>.
17from sm_typing import List, override
19from linstorjournaler import LinstorJournaler
20from linstorvolumemanager import LinstorVolumeManager
22from concurrent.futures import ThreadPoolExecutor
23import base64
24import errno
25import json
26import socket
27import threading
28import time
30from cowutil import CowImageInfo, CowUtil, getCowUtil
31import util
32import xs_errors
34from vditype import VdiType
36MANAGER_PLUGIN = 'linstor-manager'
39def call_remote_method(session, host_ref, method, args):
40 try:
41 response = session.xenapi.host.call_plugin(
42 host_ref, MANAGER_PLUGIN, method, args
43 )
44 except Exception as e:
45 util.SMlog('call-plugin on {} ({} with {}) exception: {}'.format(
46 host_ref, method, args, e
47 ))
48 raise util.SMException(str(e))
50 util.SMlog('call-plugin on {} ({} with {}) returned: {}'.format(
51 host_ref, method, args, response
52 ))
54 return response
57class LinstorCallException(util.SMException):
58 def __init__(self, cmd_err):
59 self.cmd_err = cmd_err
61 @override
62 def __str__(self) -> str:
63 return str(self.cmd_err)
66class ErofsLinstorCallException(LinstorCallException):
67 pass
70class NoPathLinstorCallException(LinstorCallException):
71 pass
73def log_successful_call(target_host, device_path, vdi_uuid, remote_method, response):
74 util.SMlog('Successful access on {} for device {} ({}): `{}` => {}'.format(
75 target_host, device_path, vdi_uuid, remote_method, str(response)
76 ), priority=util.LOG_DEBUG)
78def log_failed_call(target_host, next_target, device_path, vdi_uuid, remote_method, e):
79 util.SMlog('Failed to call method on {} for device {} ({}): {}. Trying accessing on {}... (cause: {})'.format(
80 target_host, device_path, vdi_uuid, remote_method, next_target, e
81 ), priority=util.LOG_DEBUG)
83def linstorhostcall(local_method, remote_method=None):
84 if not remote_method:
85 remote_method = local_method
87 def decorated(response_parser):
88 def wrapper(*args, **kwargs):
89 self = args[0]
90 vdi_uuid = args[1]
92 device_path = self._linstor.build_device_path(
93 self._linstor.get_volume_name(vdi_uuid)
94 )
96 if not self._session:
97 return self._call_local_method(local_method, device_path, *args[2:], **kwargs)
99 remote_args = {
100 'devicePath': device_path,
101 'groupName': self._linstor.group_name,
102 'vdiType': self._vdi_type
103 }
104 remote_args.update(**kwargs)
105 remote_args = {str(key): str(value) for key, value in remote_args.items()}
107 this_host_ref = util.get_this_host_ref(self._session)
108 def call_method(host_label, host_ref):
109 if host_ref == this_host_ref:
110 return self._call_local_method(local_method, device_path, *args[2:], **kwargs)
111 response = call_remote_method(self._session, host_ref, remote_method, remote_args)
112 log_successful_call(host_label, device_path, vdi_uuid, remote_method, response)
113 return response_parser(self, vdi_uuid, response)
115 # 1. Try on attached host.
116 try:
117 host_ref_attached = next(iter(util.get_hosts_attached_on(self._session, [vdi_uuid])), None)
118 if host_ref_attached:
119 return call_method('attached host', host_ref_attached)
120 except Exception as e:
121 log_failed_call('attached host', 'master', device_path, vdi_uuid, remote_method, e)
123 # 2. Try on master host.
124 try:
125 return call_method('master', util.get_master_ref(self._session))
126 except Exception as e:
127 log_failed_call('master', 'primary', device_path, vdi_uuid, remote_method, e)
129 # 3. Try on a primary.
130 hosts = self._get_hosts(remote_method, device_path)
132 nodes, primary_hostname = self._linstor.find_up_to_date_diskful_nodes(vdi_uuid)
133 if primary_hostname:
134 try:
135 return call_method('primary', self._find_host_ref_from_hostname(hosts, primary_hostname))
136 except Exception as remote_e:
137 self._raise_openers_exception(device_path, remote_e)
139 log_failed_call('primary', 'another node', device_path, vdi_uuid, remote_method, 'no primary')
141 # 4. Try on any host with local data.
142 try:
143 return call_method('another node', next(filter(None,
144 (self._find_host_ref_from_hostname(hosts, hostname) for hostname in nodes)
145 ), None))
146 except Exception as remote_e:
147 self._raise_openers_exception(device_path, remote_e)
149 return wrapper
150 return decorated
153def linstormodifier():
154 def decorated(func):
155 def wrapper(*args, **kwargs):
156 self = args[0]
158 ret = func(*args, **kwargs)
159 self._linstor.invalidate_resource_cache()
160 return ret
161 return wrapper
162 return decorated
165class LinstorCowUtil(object):
166 def __init__(self, session, linstor, vdi_type: str):
167 self._session = session
168 self._linstor = linstor
169 self._cowutil = getCowUtil(vdi_type)
170 self._vdi_type = vdi_type
172 @property
173 def cowutil(self) -> CowUtil:
174 return self._cowutil
176 def create_chain_paths(self, vdi_uuid, readonly=False):
177 # OPTIMIZE: Add a limit_to_first_allocated_block param to limit cowutil calls.
178 # Useful for the snapshot code algorithm.
180 leaf_vdi_path = self._linstor.get_device_path(vdi_uuid)
181 path = leaf_vdi_path
182 while True:
183 if not util.pathexists(path):
184 raise xs_errors.XenError(
185 'VDIUnavailable', opterr='Could not find: {}'.format(path)
186 )
188 # Diskless path can be created on the fly, ensure we can open it.
189 def check_volume_usable():
190 while True:
191 try:
192 with open(path, 'r' if readonly else 'r+'):
193 pass
194 except IOError as e:
195 if e.errno == errno.ENODATA:
196 time.sleep(2)
197 continue
198 if e.errno == errno.EROFS or e.errno == errno.EMEDIUMTYPE:
199 util.SMlog('Volume not attachable because used. Openers: {}'.format(
200 self._linstor.get_volume_openers(vdi_uuid)
201 ))
202 raise
203 break
204 util.retry(check_volume_usable, 15, 2)
206 vdi_uuid = self.get_info(vdi_uuid).parentUuid
207 if not vdi_uuid:
208 break
209 path = self._linstor.get_device_path(vdi_uuid)
210 readonly = True # Non-leaf is always readonly.
212 return leaf_vdi_path
214 # --------------------------------------------------------------------------
215 # Getters: read locally and try on another host in case of failure.
216 # --------------------------------------------------------------------------
218 def check(self, vdi_uuid, ignore_missing_footer=False, fast=False):
219 kwargs = {
220 'ignoreMissingFooter': ignore_missing_footer,
221 'fast': fast
222 }
223 return self._check(vdi_uuid, **kwargs)
225 @linstorhostcall('check')
226 def _check(self, vdi_uuid, response):
227 return CowUtil.CheckResult(response)
229 def get_info(self, vdi_uuid, include_parent=True):
230 kwargs = {
231 'includeParent': include_parent,
232 'resolveParent': False
233 }
235 try:
236 return self._get_info(vdi_uuid, self._extract_uuid, **kwargs)
237 except Exception as e:
238 # Backward compatibility with non-QCOW2 versions.
239 if str(e).startswith("['UNKNOWN_XENAPI_PLUGIN_FUNCTION', 'getInfo']"):
240 return self._get_vhd_info(vdi_uuid, self._extract_uuid, **kwargs)
241 raise
243 @linstorhostcall('getInfo')
244 def _get_info(self, vdi_uuid, response):
245 return self._get_info_impl(vdi_uuid, response)
247 # Backward compatibility with non-QCOW2 versions.
248 @linstorhostcall('getVHDInfo')
249 def _get_vhd_info(self, vdi_uuid, response):
250 return self._get_info_impl(vdi_uuid, response)
252 def _get_info_impl(self, vdi_uuid, response):
253 obj = json.loads(response)
255 image_info = CowImageInfo(vdi_uuid)
256 image_info.sizeVirt = obj['sizeVirt']
257 image_info.sizePhys = obj['sizePhys']
258 if 'parentPath' in obj:
259 image_info.parentPath = obj['parentPath']
260 image_info.parentUuid = obj['parentUuid']
261 image_info.hidden = obj['hidden']
262 image_info.path = obj['path']
264 return image_info
266 @linstorhostcall('hasParent')
267 def has_parent(self, vdi_uuid, response):
268 return util.strtobool(response)
270 def get_parent(self, vdi_uuid):
271 return self._get_parent(vdi_uuid, self._extract_uuid)
273 @linstorhostcall('getParent')
274 def _get_parent(self, vdi_uuid, response):
275 return response
277 @linstorhostcall('getSizeVirt')
278 def get_size_virt(self, vdi_uuid, response):
279 return int(response)
281 @linstorhostcall('getMaxResizeSize')
282 def get_max_resize_size(self, vdi_uuid, response):
283 return int(response)
285 @linstorhostcall('getSizePhys')
286 def get_size_phys(self, vdi_uuid, response):
287 return int(response)
289 @linstorhostcall('getAllocatedSize')
290 def get_allocated_size(self, vdi_uuid, response):
291 return int(response)
293 @linstorhostcall('getDepth')
294 def get_depth(self, vdi_uuid, response):
295 return int(response)
297 @linstorhostcall('getKeyHash')
298 def get_key_hash(self, vdi_uuid, response):
299 return response or None
301 @linstorhostcall('getBlockBitmap')
302 def get_block_bitmap(self, vdi_uuid, response):
303 return base64.b64decode(response)
305 @linstorhostcall('_get_drbd_size', 'getDrbdSize')
306 def get_drbd_size(self, vdi_uuid, response):
307 return int(response)
309 def _get_drbd_size(self, path):
310 (ret, stdout, stderr) = util.doexec(['blockdev', '--getsize64', path])
311 if ret == 0:
312 return int(stdout.strip())
313 raise util.SMException('Failed to get DRBD size: {}'.format(stderr))
315 # --------------------------------------------------------------------------
316 # Setters: only used locally.
317 # --------------------------------------------------------------------------
319 @linstormodifier()
320 def create(self, path, size, static, msize=0):
321 return self._call_local_method_or_fail(self._cowutil.create, path, size, static, msize)
323 @linstormodifier()
324 def set_size_phys(self, path, size, debug=True):
325 return self._call_local_method_or_fail(self._cowutil.setSizePhys, path, size, debug)
327 @linstormodifier()
328 def set_parent(self, path, parentPath, parentRaw=False):
329 return self._call_local_method_or_fail(self._cowutil.setParent, path, parentPath, parentRaw)
331 @linstormodifier()
332 def set_hidden(self, path, hidden=True):
333 return self._call_local_method_or_fail(self._cowutil.setHidden, path, hidden)
335 @linstormodifier()
336 def set_key(self, path, key_hash):
337 return self._call_local_method_or_fail(self._cowutil.setKey, path, key_hash)
339 @linstormodifier()
340 def kill_data(self, path):
341 return self._call_local_method_or_fail(self._cowutil.killData, path)
343 @linstormodifier()
344 def snapshot(self, path, parent, parentRaw, msize=0, checkEmpty=True):
345 return self._call_local_method_or_fail(self._cowutil.snapshot, path, parent, parentRaw, msize, checkEmpty)
347 def inflate(self, journaler, vdi_uuid, vdi_path, new_size, old_size):
348 # Only inflate if the LINSTOR volume capacity is not enough.
349 new_size = LinstorVolumeManager.round_up_volume_size(new_size)
350 if new_size <= old_size:
351 return
353 util.SMlog(
354 'Inflate {} (size={}, previous={})'
355 .format(vdi_path, new_size, old_size)
356 )
358 journaler.create(
359 LinstorJournaler.INFLATE, vdi_uuid, old_size
360 )
361 self._linstor.resize_volume(vdi_uuid, new_size)
363 result_size = self.get_drbd_size(vdi_uuid)
364 if result_size < new_size:
365 util.SMlog(
366 'WARNING: Cannot inflate volume to {}B, result size: {}B'
367 .format(new_size, result_size)
368 )
370 self._zeroize(vdi_path, result_size - self._cowutil.getFooterSize())
371 self.set_size_phys(vdi_path, result_size, False)
372 journaler.remove(LinstorJournaler.INFLATE, vdi_uuid)
374 def deflate(self, vdi_path, new_size, old_size, zeroize=False):
375 if zeroize:
376 assert old_size > self._cowutil.getFooterSize()
377 self._zeroize(vdi_path, old_size - self._cowutil.getFooterSize())
379 new_size = LinstorVolumeManager.round_up_volume_size(new_size)
380 if new_size >= old_size:
381 return
383 util.SMlog(
384 'Deflate {} (new size={}, previous={})'
385 .format(vdi_path, new_size, old_size)
386 )
388 self.set_size_phys(vdi_path, new_size)
389 # TODO: Change the LINSTOR volume size using linstor.resize_volume.
391 # --------------------------------------------------------------------------
392 # Remote setters: write locally and try on another host in case of failure.
393 # --------------------------------------------------------------------------
395 @linstormodifier()
396 def set_size_virt(self, path, size, jFile):
397 kwargs = {
398 'size': size,
399 'jFile': jFile
400 }
401 return self._call_method(self._cowutil.setSizeVirt, 'setSizeVirt', path, use_parent=False, **kwargs)
403 @linstormodifier()
404 def set_size_virt_fast(self, path, size):
405 kwargs = {
406 'size': size
407 }
408 return self._call_method(self._cowutil.setSizeVirtFast, 'setSizeVirtFast', path, use_parent=False, **kwargs)
410 @linstormodifier()
411 def force_parent(self, path, parentPath, parentRaw=False):
412 kwargs = {
413 'parentPath': str(parentPath),
414 'parentRaw': parentRaw
415 }
416 return self._call_method(self._cowutil.setParent, 'setParent', path, use_parent=False, **kwargs)
418 @linstormodifier()
419 def force_coalesce(self, path):
420 return int(self._call_method(self._cowutil.coalesce, 'coalesce', path, use_parent=True))
422 @linstormodifier()
423 def force_repair(self, path):
424 return self._call_method(self._cowutil.repair, 'repair', path, use_parent=False)
426 @linstormodifier()
427 def force_deflate(self, path, newSize, oldSize, zeroize):
428 kwargs = {
429 'newSize': newSize,
430 'oldSize': oldSize,
431 'zeroize': zeroize
432 }
433 return self._call_method('_force_deflate', 'deflate', path, use_parent=False, **kwargs)
435 def _force_deflate(self, path, newSize, oldSize, zeroize):
436 self.deflate(path, newSize, oldSize, zeroize)
438 # --------------------------------------------------------------------------
439 # Helpers.
440 # --------------------------------------------------------------------------
442 def compute_volume_size(self, virtual_size: int) -> int:
443 if VdiType.isCowImage(self._vdi_type):
444 # All LINSTOR VDIs have the metadata area preallocated for
445 # the maximum possible virtual size (for fast online VDI.resize).
446 meta_overhead = self._cowutil.calcOverheadEmpty(
447 max(virtual_size, self._cowutil.getDefaultPreallocationSizeVirt())
448 )
449 bitmap_overhead = self._cowutil.calcOverheadBitmap(virtual_size)
450 virtual_size += meta_overhead + bitmap_overhead
451 else:
452 raise Exception('Invalid image type: {}'.format(self._vdi_type))
454 return LinstorVolumeManager.round_up_volume_size(virtual_size)
456 def _extract_uuid(self, device_path):
457 # TODO: Remove new line in the vhdutil module. Not here.
458 return self._linstor.get_volume_uuid_from_device_path(
459 device_path.rstrip('\n')
460 )
462 def _get_hosts(self, remote_method, device_path):
463 try:
464 return self._session.xenapi.host.get_all_records()
465 except Exception as e:
466 raise xs_errors.XenError(
467 'VDIUnavailable',
468 opterr='Unable to get host list to run cowutil command `{}` (path={}): {}'
469 .format(remote_method, device_path, e)
470 )
472 # --------------------------------------------------------------------------
474 @staticmethod
475 def _find_host_ref_from_hostname(hosts, hostname):
476 return next((ref for ref, rec in hosts.items() if rec['hostname'] == hostname), None)
478 def _raise_openers_exception(self, device_path, e):
479 if isinstance(e, util.CommandException):
480 e_str = 'cmd: `{}`, code: `{}`, reason: `{}`'.format(e.cmd, e.code, e.reason)
481 else:
482 e_str = str(e)
484 try:
485 volume_uuid = self._linstor.get_volume_uuid_from_device_path(
486 device_path
487 )
488 e_wrapper = Exception(
489 e_str + ' (openers: {})'.format(
490 self._linstor.get_volume_openers(volume_uuid)
491 )
492 )
493 except Exception as illformed_e:
494 e_wrapper = Exception(
495 e_str + ' (unable to get openers: {})'.format(illformed_e)
496 )
497 util.SMlog('raise opener exception: {}'.format(e_wrapper))
498 raise e_wrapper # pylint: disable = E0702
500 def _sanitize_local_method(self, local_method):
501 if isinstance(local_method, str):
502 return getattr(self if local_method.startswith('_') else self._cowutil, local_method)
503 return local_method
505 def _call_local_method(self, local_method, device_path, *args, **kwargs):
506 local_method = self._sanitize_local_method(local_method)
508 try:
509 def local_call():
510 try:
511 return local_method(device_path, *args, **kwargs)
512 except util.CommandException as e:
513 if e.code == errno.EROFS or e.code == errno.EMEDIUMTYPE:
514 raise ErofsLinstorCallException(e) # Break retry calls.
515 if e.code == errno.ENOENT:
516 raise NoPathLinstorCallException(e)
517 raise e
518 # Retry only locally if it's not an EROFS exception.
519 return util.retry(local_call, 5, 2, exceptions=[util.CommandException])
520 except util.CommandException as e:
521 util.SMlog('failed to execute locally CowUtil (sys {})'.format(e.code))
522 raise e
524 def _call_local_method_or_fail(self, local_method, device_path, *args, **kwargs):
525 try:
526 return self._call_local_method(local_method, device_path, *args, **kwargs)
527 except ErofsLinstorCallException as e:
528 # Volume is locked on a host, find openers.
529 self._raise_openers_exception(device_path, e.cmd_err)
531 def _call_method(self, local_method, remote_method, device_path, use_parent, *args, **kwargs):
532 # Note: `use_parent` exists to know if the COW image parent is used by the local/remote method.
533 # Normally in case of failure, if the parent is unused we try to execute the method on
534 # another host using the DRBD opener list. In the other case, if the parent is required,
535 # we must check where this last one is open instead of the child.
537 local_method = self._sanitize_local_method(local_method)
539 # A. Try to write locally...
540 try:
541 return self._call_local_method(local_method, device_path, *args, **kwargs)
542 except Exception:
543 pass
545 util.SMlog('unable to execute `{}` locally, retry using a writable host...'.format(remote_method))
547 # B. Execute the command on another host.
548 # B.1. Get host list.
549 hosts = self._get_hosts(remote_method, device_path)
551 # B.2. Prepare remote args.
552 remote_args = {
553 'devicePath': device_path,
554 'groupName': self._linstor.group_name,
555 'vdiType': self._vdi_type
556 }
557 remote_args.update(**kwargs)
558 remote_args = {str(key): str(value) for key, value in remote_args.items()}
560 volume_uuid = self._linstor.get_volume_uuid_from_device_path(
561 device_path
562 )
563 parent_volume_uuid = None
564 if use_parent:
565 parent_volume_uuid = self.get_parent(volume_uuid)
567 openers_uuid = parent_volume_uuid if use_parent else volume_uuid
569 # B.3. Call!
570 def remote_call():
571 try:
572 all_openers = self._linstor.get_volume_openers(openers_uuid)
573 except Exception as e:
574 raise xs_errors.XenError(
575 'VDIUnavailable',
576 opterr='Unable to get DRBD openers to run CowUtil command `{}` (path={}): {}'
577 .format(remote_method, device_path, e)
578 )
580 no_host_found = True
581 for hostname, openers in all_openers.items():
582 if not openers:
583 continue
585 host_ref = self._find_host_ref_from_hostname(hosts, hostname)
586 if not host_ref:
587 continue
589 no_host_found = False
590 try:
591 return call_remote_method(self._session, host_ref, remote_method, remote_args)
592 except Exception:
593 pass
595 if no_host_found:
596 try:
597 return local_method(device_path, *args, **kwargs)
598 except Exception as e:
599 self._raise_openers_exception(device_path, e)
601 raise xs_errors.XenError(
602 'VDIUnavailable',
603 opterr='No valid host found to run CowUtil command `{}` (path=`{}`, openers=`{}`)'
604 .format(remote_method, device_path, openers)
605 )
606 return util.retry(remote_call, 5, 2)
608 def _zeroize(self, path, size):
609 if not util.zeroOut(path, size, self._cowutil.getFooterSize()):
610 raise xs_errors.XenError(
611 'EIO',
612 opterr='Failed to zero out COW image footer {}'.format(path)
613 )
615class MultiLinstorCowUtil:
616 class ExecutorData(threading.local):
617 def __init__(self):
618 self.clear()
620 def clear(self):
621 self.session = None
622 self.linstor = None
623 self.vdi_type_to_cowutil = {}
625 class Load:
626 def __init__(self, session):
627 self.session = session
629 def cleanup(self):
630 if self.session:
631 self.session.xenapi.session.logout()
632 self.session = None
634 def __init__(self, uri, group_name) -> None:
635 self._uri = uri
636 self._group_name = group_name
637 self._loads: List[MultiLinstorCowUtil.Load] = []
638 self._executor_data = self.ExecutorData()
640 def __del__(self):
641 self._cleanup()
643 def run(self, func, user_data_list):
644 def wrapper(func, user_data):
645 if not self._executor_data.session:
646 self._init_executor_thread()
647 return func(user_data, self)
649 with ThreadPoolExecutor(thread_name_prefix="CowUtil") as executor:
650 return executor.map(lambda user_data: wrapper(func, user_data), user_data_list)
652 def get_local_cowutil(self, vdi_type):
653 instance = self._executor_data.vdi_type_to_cowutil.get(vdi_type)
654 if not instance:
655 instance = LinstorCowUtil(
656 self._executor_data.session,
657 self._executor_data.linstor,
658 vdi_type
659 )
660 self._executor_data.vdi_type_to_cowutil[vdi_type] = instance
661 return instance
663 def _init_executor_thread(self):
664 session = util.get_localAPI_session()
665 load = self.Load(session)
666 try:
667 linstor = LinstorVolumeManager(
668 self._uri,
669 self._group_name,
670 repair=False,
671 logger=util.SMlog
672 )
673 self._executor_data.linstor = linstor
674 self._executor_data.session = session
675 except:
676 self._executor_data.clear()
677 load.cleanup()
678 raise
680 self._loads.append(load)
682 def _cleanup(self):
683 for load in self._loads:
684 try:
685 load.cleanup()
686 except Exception as e:
687 util.SMlog(f"Failed to clean load executor: {e}")
688 self._loads.clear()