pyppms.ppms
Core connection module for the PUMAPI communication.
1"""Core connection module for the PUMAPI communication.""" 2 3# pylint: disable-msg=dangerous-default-value 4 5# NOTE: the "pyppms" package is simply a wrapper for the existing API, so we can't make 6# any design decisions here - hence it is pointless to complain about the number 7# of instance attributes, public methods or other stuff: 8# pylint: disable-msg=too-many-instance-attributes 9# pylint: disable-msg=too-many-public-methods 10 11import os 12import os.path 13import shutil 14from io import open 15 16import requests 17from loguru import logger as log 18 19from .common import dict_from_single_response, parse_multiline_response 20from .user import PpmsUser 21from .system import PpmsSystem 22from .booking import PpmsBooking 23from .exceptions import NoDataError 24 25 26class PpmsConnection: 27 28 """Connection object to communicate with a PPMS instance. 29 30 Attributes 31 ---------- 32 url : str 33 The URL of the PUMAPI instance. 34 api_key : str 35 The API key used for authenticating against the PUMAPI. 36 timeout : float 37 The timeout value used in the ``requests.post`` calls. 38 cache_path : str 39 A path to a local directory used for caching responses. 40 cache_users_only : bool 41 Flag indicating that only PPMS user details will be stored in the 42 on-disk cache, nothing else. 43 last_served_from_cache : bool 44 Indicates if the last request was served from the cache or on-line. 45 users : dict 46 A dict with usernames as keys, mapping to the related 47 :py:class:`pyppms.user.PpmsUser` object, serves as a cache during the object's 48 lifetime (can be empty if no calls to :py:meth:`get_user()` have been done yet). 49 fullname_mapping : dict 50 A dict mapping a user's *fullname* ("``<LASTNAME> <FIRSTNAME>``") to the 51 corresponding username. Entries are filled in dynamically by the 52 :py:meth:`get_user()` method. 53 systems 54 A dict with system IDs as keys, mapping to the related 55 :py:class:`pyppms.system.PpmsSystem` object. Serves as a cache during the 56 object's lifetime (can be empty if no calls to the :py:meth:`get_systems()` have 57 been done yet). 58 status : dict 59 A dict with keys ``auth_state``, ``auth_response`` and 60 ``auth_httpstatus`` 61 """ 62 63 def __init__(self, url, api_key, timeout=10, cache="", cache_users_only=False): 64 """Constructor for the PPMS connection object. 65 66 Open a connection to the PUMAPI defined in `url` and try to authenticate 67 against it using the given API key (or use cache-only mode if key is an 68 empty string). If an optional path to a caching location is specified, 69 responses will be read from that location unless no matching file can be 70 found there, in which case an on-line request will be done (with the 71 response being saved to the cache path). 72 73 Parameters 74 ---------- 75 url : str 76 The URL of the PUMAPI to connect to. 77 api_key : str 78 The API key to use for authenticating against the PUMAPI. If 79 specified as '' authentication will be skipped and the connection is 80 running in cache-only (local) mode. 81 timeout : float, optional 82 How many seconds to wait for the PUMAPI server to send a response 83 before giving up, by default 10. 84 cache : str, optional 85 A path to a local directory for caching responses from PUMAPI in 86 individual text files. Useful for testing and for speeding up 87 slow requests like 'getusers'. By default empty, which will result 88 in no caching being done. 89 cache_users_only : bool, optional 90 If set to `True`, only `getuser` requests will be cached on disk. 91 This can be used in to speed up the slow requests (through the 92 cache), while everything else will be handled through online 93 requests. By default `False`. 94 95 Raises 96 ------ 97 requests.exceptions.ConnectionError 98 Raised in case authentication fails. 99 """ 100 self.url = url 101 self.api_key = api_key 102 self.timeout = timeout 103 self.users = {} 104 self.fullname_mapping = {} 105 self.systems = {} 106 self.status = { 107 "auth_state": "NOT_TRIED", 108 "auth_response": None, 109 "auth_httpstatus": -1, 110 } 111 self.cache_path = cache 112 self.cache_users_only = cache_users_only 113 self.last_served_from_cache = False 114 """Indicates if the last request was served from the cache or on-line.""" 115 116 # run in cache-only mode (e.g. for testing or off-line usage) if no API 117 # key has been specified, skip authentication then: 118 if api_key != "": 119 self.__authenticate() 120 elif cache == "": 121 raise RuntimeError( 122 "Neither API key nor cache path given, at least one is required!" 123 ) 124 125 def __authenticate(self): 126 """Try to authenticate to PPMS using the `auth` request. 127 128 Raises 129 ------ 130 requests.exceptions.ConnectionError 131 Raised in case authentication failed for any reason. 132 """ 133 log.trace( 134 "Attempting authentication against {} with key [{}...{}]", 135 self.url, 136 self.api_key[:2], 137 self.api_key[-2:], 138 ) 139 self.status["auth_state"] = "attempting" 140 response = self.request("auth") 141 log.trace(f"Authenticate response: {response.text}") 142 self.status["auth_response"] = response.text 143 self.status["auth_httpstatus"] = response.status_code 144 145 # NOTE: an unauthorized request has already been caught be the request() method 146 # above. Our legacy code was additionally testing for 'error' in the response 147 # text - however, it is unclear if PUMAPI ever returns this: 148 if "error" in response.text.lower(): 149 self.status["auth_state"] = "FAILED-ERROR" 150 msg = f"Authentication failed with an error: {response.text}" 151 log.error(msg) 152 raise requests.exceptions.ConnectionError(msg) 153 154 status_ok = requests.codes.ok # pylint: disable-msg=no-member 155 156 if response.status_code != status_ok: 157 # NOTE: branch excluded from coverage as we don't have a known way 158 # to produce such a response from the API 159 log.warning( 160 "Unexpected combination of response [{}] and status code [{}], it's " 161 "unclear if authentication succeeded (assuming it didn't)", 162 response.status_code, 163 response.text, 164 ) 165 self.status["auth_state"] = "FAILED-UNKNOWN" 166 167 msg = ( 168 f"Authenticating against {self.url} with key " 169 f"[{self.api_key[:2]}...{self.api_key[-2:]}] FAILED!" 170 ) 171 log.error(msg) 172 raise requests.exceptions.ConnectionError(msg) 173 174 log.trace( 175 "Authentication succeeded, response=[{}], http_status=[{}]", 176 response.text, 177 response.status_code, 178 ) 179 self.status["auth_state"] = "good" 180 181 def request(self, action, parameters={}, skip_cache=False): 182 """Generic method to submit a request to PPMS and return the result. 183 184 This convenience method deals with adding the API key to a given 185 request, submitting it to the PUMAPI and checking the response for some 186 specific keywords indicating an error. 187 188 Parameters 189 ---------- 190 action : str 191 The command to be submitted to the PUMAPI. 192 parameters : dict, optional 193 A dictionary with additional parameters to be submitted with the 194 request. 195 skip_cache : bool, optional 196 If set to True the request will NOT be served from the local cache, 197 independent whether a matching response file exists there, by 198 default False. 199 200 Returns 201 ------- 202 requests.Response 203 The response object created by posting the request. 204 205 Raises 206 ------ 207 requests.exceptions.ConnectionError 208 Raised in case the request is not authorized. 209 """ 210 req_data = {"action": action, "apikey": self.api_key} 211 req_data.update(parameters) 212 # log.debug("Request parameters: {}", parameters) 213 214 response = None 215 try: 216 if skip_cache: # pragma: no cover 217 raise LookupError("Skipping the cache has been requested") 218 response = self.__intercept_read(req_data) 219 self.last_served_from_cache = True 220 except LookupError as err: 221 log.trace(f"Doing an on-line request: {err}") 222 response = requests.post(self.url, data=req_data, timeout=self.timeout) 223 self.last_served_from_cache = False 224 225 # store the response if it hasn't been read from the cache before: 226 if not self.last_served_from_cache: # pragma: no cover 227 self.__intercept_store(req_data, response) 228 229 # NOTE: the HTTP status code returned is always `200` even if 230 # authentication failed, so we need to check the actual response *TEXT* 231 # to figure out if we have succeeded: 232 if "request not authorized" in response.text.lower(): 233 self.status["auth_state"] = "FAILED" 234 msg = f"Not authorized to run action `{req_data['action']}`" 235 log.error(msg) 236 raise requests.exceptions.ConnectionError(msg) 237 238 return response 239 240 def __interception_path(self, req_data, create_dir=False): 241 """Derive the path for a local cache file from a request's parameters. 242 243 Parameters 244 ---------- 245 req_data : dict 246 The request's parameters, used to derive the name of the cache file. 247 create_dir : bool, optional 248 If set to True the cache directory will be created if necessary. 249 Useful when adding responses to the cache. By default False. 250 251 Returns 252 ------- 253 str 254 The full path to a file name identified by all parameters of the 255 request (except credentials like 'apikey'). 256 """ 257 action = req_data["action"] 258 259 if self.cache_users_only and action != "getuser": 260 log.trace(f"NOT caching '{action}' (cache_users_only is set)") 261 return None 262 263 intercept_dir = os.path.join(self.cache_path, action) 264 if create_dir and not os.path.exists(intercept_dir): # pragma: no cover 265 try: 266 os.makedirs(intercept_dir) 267 log.trace(f"Created dir to store response: {intercept_dir}") 268 except Exception as err: # pylint: disable-msg=broad-except 269 log.warning(f"Failed creating [{intercept_dir}]: {err}") 270 return None 271 272 signature = "" 273 # different python versions are returning dict items in different order, so 274 # simply iterating over them will not always produce the same result - hence we 275 # build up a sorted list of keys first and use that one then: 276 keylist = list(req_data.keys()) 277 keylist.sort() 278 for key in keylist: 279 if key in ["action", "apikey"]: 280 continue 281 signature += f"__{key}--{req_data[key]}" 282 if signature == "": 283 signature = "__response" 284 signature = signature[2:] + ".txt" 285 intercept_file = os.path.join(intercept_dir, signature) 286 return intercept_file 287 288 def __intercept_read(self, req_data): 289 """Try to read a cached response from a local file. 290 291 Parameters 292 ---------- 293 req_data : dict 294 The request's parameters, used to derive the name of the cache file. 295 296 Returns 297 ------- 298 PseudoResponse 299 The response text read from the cache file wrapped in a 300 PseudoResponse object, or None in case no matching file was found in 301 the local cache. 302 303 Raises 304 ------ 305 LookupError 306 Raised in case no cache path has been set or no cache file matching 307 the request parameters could be found in the cache. 308 """ 309 310 # pylint: disable-msg=too-few-public-methods 311 class PseudoResponse: 312 """Dummy response object with attribs 'text' and 'status_code'.""" 313 314 def __init__(self, text, status_code): 315 self.text = text 316 self.status_code = int(status_code) 317 318 if self.cache_path == "": 319 raise LookupError("No cache path configured") 320 321 intercept_file = self.__interception_path(req_data, create_dir=False) 322 if not intercept_file or not os.path.exists(intercept_file): # pragma: no cover 323 raise LookupError(f"No cache hit for [{intercept_file}]") 324 325 with open(intercept_file, "r", encoding="utf-8") as infile: 326 text = infile.read() 327 log.debug( 328 "Read intercepted response text from [{}]", 329 intercept_file[len(str(self.cache_path)) :], 330 ) 331 332 status_code = 200 333 status_file = os.path.splitext(intercept_file)[0] + "_status-code.txt" 334 if os.path.exists(status_file): 335 with open(status_file, "r", encoding="utf-8") as infile: 336 status_code = infile.read() 337 log.debug(f"Read intercepted response status code from [{status_file}]") 338 return PseudoResponse(text, status_code) 339 340 def __intercept_store(self, req_data, response): # pragma: no cover 341 """Store the response in a local cache file named after the request. 342 343 Parameters 344 ---------- 345 req_data : dict 346 The request's parameters, used to derive the name of the cache file 347 so it can be matched later when running the same request again. 348 response : requests.Response 349 The response object to store in the local cache. 350 """ 351 # NOTE: this method is excluded from coverage measurements as it can only be 352 # triggered when testing in online mode with at least one request not being 353 # served from the cache (which is orthogonal to off-line testing) 354 if self.cache_path == "": 355 return 356 357 intercept_file = self.__interception_path(req_data, create_dir=True) 358 if not intercept_file: 359 log.trace("Not storing intercepted results in cache.") 360 return 361 362 try: 363 with open(intercept_file, "w", encoding="utf-8") as outfile: 364 outfile.write(response.text) 365 log.debug( 366 "Wrote response text to [{}] ({} lines)", 367 intercept_file, 368 len(response.text.splitlines()), 369 ) 370 except Exception as err: # pylint: disable-msg=broad-except 371 log.error("Storing response text in [{}] failed: {}", intercept_file, err) 372 log.error("Response text was:\n--------\n{}\n--------", response.text) 373 374 def flush_cache(self, keep_users=False): 375 """Flush the PyPPMS on-disk cache. 376 377 Optionally flushes everything *except* the `getuser` cache if the 378 `keep_users` flag is set to `True`, as this is clearly the most 379 time-consuming operation when fetching data from PUMAPI and therefore 380 might want to be retained. 381 382 Please note that the `getusers` cache (plural, including the `s` suffix) 383 will be flushed no matter what, as this is simply a list of user IDs 384 that can be fetched with a single request. In consequence this means 385 that using the `keep_users` flag will allow you to have reasonably fast 386 reaction times while still getting information on *new* users live from 387 PUMAPI at the only cost of possibly having outdated information on 388 *existing* users. 389 390 Parameters 391 ---------- 392 keep_users : bool, optional 393 If set to `True` the `getuser` sub-directory in the cache location 394 will be kept, by default `False`. 395 """ 396 if self.cache_path == "": 397 log.debug("No cache path configured, not flushing!") 398 return 399 400 dirs_to_remove = [self.cache_path] # by default remove the entire cache dir 401 keep_msg = "" 402 if keep_users: 403 keep_msg = " (keeping user details dirs)" 404 dirs_to_remove = [] 405 cache_dirs = os.listdir(self.cache_path) 406 for subdir in cache_dirs: 407 if subdir == "getuser": 408 continue 409 dirs_to_remove.append(os.path.join(self.cache_path, subdir)) 410 411 log.debug("Flushing the on-disk cache at [{}] {}...", self.cache_path, keep_msg) 412 for directory in dirs_to_remove: 413 try: 414 shutil.rmtree(directory) 415 log.trace("Removed directory [{}].", directory) 416 except Exception as ex: # pylint: disable-msg=broad-except 417 log.warning("Removing the cache at [{}] failed: {}", directory, ex) 418 419 def get_admins(self): 420 """Get all PPMS administrator users. 421 422 Returns 423 ------- 424 list(pyppms.user.PpmsUser) 425 A list with PpmsUser objects that are PPMS administrators. 426 """ 427 response = self.request("getadmins") 428 429 admins = response.text.splitlines() 430 users = [] 431 for username in admins: 432 user = self.get_user(username) 433 users.append(user) 434 log.trace("{} admins in the PPMS database: {}", len(admins), ", ".join(admins)) 435 return users 436 437 def get_booking(self, system_id, booking_type="get"): 438 """Get the current or next booking of a system. 439 440 WARNING: if the next booking is requested but it is too far in the future, 441 PUMAPI silently ignores it - the response is identical to a system that has no 442 future bookings and there is no error reported either. Currently it is unclear 443 where the cutoff is (e.g. lookups for a booking that is two years from now still 444 work fine, but a booking in about 10 years is silently skipped). 445 446 Parameters 447 ---------- 448 system_id : int or int-like 449 The ID of the system in PPMS. 450 booking_type : {'get', 'next'}, optional 451 The type of booking to request, one of `get` (requesting the 452 currently running booking) and `next` (requesting the next upcoming 453 booking), by default `get`. 454 NOTE: if `next` is requested the resulting booking object will **NOT** have 455 an end time (`endtime` will be `None`) as PUMAPI doesn't provide one in that 456 case! 457 458 Returns 459 ------- 460 pyppms.booking.PpmsBooking or None 461 The booking object, or None if there is no booking for the system or the 462 request is refused by PUMAPI (e.g. "not authorized"). 463 464 Raises 465 ------ 466 ValueError 467 Raised if the specified `booking_type` is invalid. 468 """ 469 valid = ["get", "next"] 470 if booking_type not in valid: 471 raise ValueError( 472 f"Value for 'booking_type' ({booking_type}) not in {valid}!" 473 ) 474 475 try: 476 response = self.request(booking_type + "booking", {"id": system_id}) 477 except requests.exceptions.ConnectionError: 478 log.error("Requesting booking status for system {} failed!", system_id) 479 return None 480 481 desc = "any future bookings" 482 if booking_type == "get": 483 desc = "a currently active booking" 484 if not response.text.strip(): 485 log.trace("System [{}] doesn't have {}", system_id, desc) 486 return None 487 488 return PpmsBooking(response.text, booking_type, system_id) 489 490 def get_current_booking(self, system_id): 491 """Wrapper for `get_booking()` with 'booking_type' set to 'get'.""" 492 return self.get_booking(system_id, "get") 493 494 def get_group(self, group_id): 495 """Fetch group details from PPMS and create a dict from them. 496 497 Parameters 498 ---------- 499 group_id : str 500 The group's identifier in PPMS, called 'unitlogin' there. 501 502 Returns 503 ------- 504 dict 505 A dict with the group details, keys being derived from the header 506 line of the PUMAPI response, values from the data line. 507 """ 508 response = self.request("getgroup", {"unitlogin": group_id}) 509 log.trace("Group details returned by PPMS (raw): {}", response.text) 510 511 if not response.text: 512 msg = f"Group [{group_id}] is unknown to PPMS" 513 log.error(msg) 514 raise KeyError(msg) 515 516 details = dict_from_single_response(response.text) 517 518 log.trace("Details of group {}: {}", group_id, details) 519 return details 520 521 def get_group_users(self, unitlogin): 522 """Get all members of a group in PPMS. 523 524 Parameters 525 ---------- 526 unitlogin : str 527 The group's login ("unique login or id" in the PPMS web interface). 528 529 Returns 530 ------- 531 list(pyppms.user.PpmsUser) 532 A list with PpmsUser objects that are members of this PPMS group. 533 """ 534 response = self.request("getgroupusers", {"unitlogin": unitlogin}) 535 536 members = response.text.splitlines() 537 users = [] 538 for username in members: 539 user = self.get_user(username) 540 users.append(user) 541 log.trace( 542 "{} members in PPMS group [{}]: {}", 543 len(members), 544 unitlogin, 545 ", ".join(members), 546 ) 547 return users 548 549 def get_groups(self): 550 """Get a list of all groups in PPMS. 551 552 Returns 553 ------- 554 list(str) 555 A list with the group identifiers in PPMS. 556 """ 557 response = self.request("getgroups") 558 559 groups = response.text.splitlines() 560 log.trace("{} groups in the PPMS database: {}", len(groups), ", ".join(groups)) 561 return groups 562 563 def get_next_booking(self, system_id): 564 """Wrapper for `get_booking()` with 'booking_type' set to 'next'.""" 565 return self.get_booking(system_id, "next") 566 567 def get_running_sheet( 568 self, core_facility_ref, date, ignore_uncached_users=False, localisation="" 569 ): 570 """Get the running sheet for a specific day on the given facility. 571 572 The so-called "running-sheet" consists of all bookings / reservations of 573 a facility on a specifc day. 574 575 WARNING: PUMAPI doesn't return a proper unique user identifier with the 576 'getrunningsheet' request, instead the so called "full name" is given to 577 identify the user - unfortunately this can lead to ambiguities as 578 multiple different accounts can have the same full name. 579 580 Parameters 581 ---------- 582 core_facility_ref : int or int-like 583 The core facility ID for PPMS. 584 date : datetime.datetime 585 The date to request the running sheet for, e.g. ``datetime.now()`` or 586 similar. Note that only the date part is relevant, time will be ignored. 587 ignore_uncached_users : bool, optional 588 If set to `True` any booking for a user that is not present in the instance 589 attribute `fullname_mapping` will be ignored in the resulting list. 590 localisation : str, optional 591 If given, the runningsheet will be limited to systems where the 592 `localisation` (~"room") field matches the given value. 593 594 Returns 595 ------- 596 list(pyppms.booking.PpmsBooking) 597 A list with `PpmsBooking` objects for the given day. Empty in case 598 there are no bookings or parsing the response failed. 599 """ 600 bookings = [] 601 parameters = { 602 "plateformid": f"{core_facility_ref}", 603 "day": date.strftime("%Y-%m-%d"), 604 } 605 log.trace("Requesting runningsheet for {}", parameters["day"]) 606 response = self.request("getrunningsheet", parameters) 607 try: 608 entries = parse_multiline_response(response.text, graceful=False) 609 except NoDataError: 610 # in case no bookings exist the response will be empty! 611 log.trace("Runningsheet for the given day was empty!") 612 return [] 613 except Exception as err: # pylint: disable-msg=broad-except 614 log.error("Parsing runningsheet details failed: {}", err) 615 log.trace("Runningsheet PUMPAI response was: >>>{}<<<", response.text) 616 return [] 617 618 for entry in entries: 619 full = entry["User"] 620 if full not in self.fullname_mapping: 621 if ignore_uncached_users: 622 log.debug(f"Ignoring booking for uncached / unknown user [{full}]") 623 continue 624 625 log.debug(f"Booking refers an uncached user ({full}), updating users!") 626 self.update_users() 627 628 if full not in self.fullname_mapping: 629 log.error("PPMS doesn't seem to know user [{}], skipping", full) 630 continue 631 632 log.trace( 633 f"Booking for user '{self.fullname_mapping[full]}' ({full}) found" 634 ) 635 system_name = entry["Object"] 636 # FIXME: add a test with one system name being a subset of another system 637 # (this will result in more than one result and should be fixed e.g. by 638 # adding an optional parameter "exact" to get_systems_matching() or 639 # similar) 640 system_ids = self.get_systems_matching(localisation, [system_name]) 641 if len(system_ids) < 1: 642 if localisation: 643 log.debug(f"Given criteria return zero systems for [{system_name}]") 644 else: 645 log.warning(f"No systems matching criteria for [{system_name}]") 646 continue 647 648 if len(system_ids) > 1: 649 # NOTE: more than one result should not happen as PPMS doesn't allow for 650 # multiple systems having the same name - no result might happen though! 651 log.error("Ignoring booking for unknown system [{}]", system_name) 652 continue 653 654 booking = PpmsBooking.from_runningsheet( 655 entry, 656 system_ids[0], 657 self.fullname_mapping[full], 658 date, 659 ) 660 bookings.append(booking) 661 662 return bookings 663 664 def get_systems(self, force_refresh=False): 665 """Get a dict with all systems in PPMS. 666 667 Parameters 668 ---------- 669 force_refresh : bool, optional 670 If `True` the list of systems will be refreshed even if the object's 671 attribute `self.systems` is non-empty, by default `False`. Please 672 note that this will NOT skip the on-disk cache in case that exists! 673 674 Returns 675 ------- 676 dict(pyppms.system.PpmsSystem) 677 A dict with `PpmsSystem` objects parsed from the PUMAPI response where 678 the system ID (int) is used as the dict's key. If parsing a system 679 fails for any reason, the system is skipped entirely. 680 """ 681 if self.systems and not force_refresh: 682 log.trace("Using cached details for {} systems", len(self.systems)) 683 else: 684 self.update_systems() 685 686 return self.systems 687 688 def get_systems_matching(self, localisation, name_contains): 689 """Query PPMS for systems with a specific location and name. 690 691 This method assembles a list of PPMS system IDs whose "localisation" 692 (room) field matches a given string and where the system name contains 693 at least one of the strings given as the `name_contains` parameter. 694 695 Parameters 696 ---------- 697 localisation : str 698 A string that the system's "localisation" (i.e. the "Room" field in 699 the PPMS web interface) has to match. Can be an empty string which 700 will result in no filtering being done on the "Room" attribute. 701 name_contains : list(str) 702 A list of valid names (categories) of which the system's name has to 703 match at least one for being included. Supply an empty list for 704 skipping this filter. 705 706 Returns 707 ------- 708 list(int) 709 A list with PPMS system IDs matching all of the given criteria. 710 711 Raises 712 ------ 713 TypeError 714 Raised in case the `name_contains` parameter is of type `str` (it 715 needs to be `list(str)` instead). 716 """ 717 if isinstance(name_contains, str): 718 raise TypeError("`name_contains` must be a list of str, not str!") 719 720 loc = localisation 721 loc_desc = f"with location matching [{localisation}]" 722 if localisation == "": 723 loc_desc = "(no location filter given)" 724 725 log.trace( 726 "Querying PPMS for systems {}, name matching any of {}", 727 loc_desc, 728 name_contains, 729 ) 730 system_ids = [] 731 systems = self.get_systems() 732 for sys_id, system in systems.items(): 733 if loc.lower() not in str(system.localisation).lower(): 734 log.trace( 735 "System [{}] location ({}) is NOT matching ({}), ignoring", 736 system.name, 737 system.localisation, 738 loc, 739 ) 740 continue 741 742 # log.trace('System [{}] is matching location [{}], checking if ' 743 # 'the name is matching any of the valid pattern {}', 744 # system.name, loc, name_contains) 745 for valid_name in name_contains: 746 if valid_name in system.name: 747 log.trace("System [{}] matches all criteria", system.name) 748 system_ids.append(sys_id) 749 break 750 751 # if sys_id not in system_ids: 752 # log.trace('System [{}] does NOT match a valid name: {}', 753 # system.name, name_contains) 754 755 log.trace("Found {} bookable systems {}", len(system_ids), loc_desc) 756 log.trace("IDs of matching bookable systems {}: {}", loc_desc, system_ids) 757 return system_ids 758 759 def get_user(self, login_name, skip_cache=False): 760 """Fetch user details from PPMS and create a PpmsUser object from it. 761 762 Parameters 763 ---------- 764 login_name : str 765 The user's PPMS login name. 766 skip_cache : bool, optional 767 Passed as-is to the :py:meth:`request()` method 768 769 Returns 770 ------- 771 pyppms.user.PpmsUser 772 The user object created from the PUMAPI response. The object will be 773 additionally stored in the self.users dict using the login_name as 774 the dict's key. 775 776 Raises 777 ------ 778 KeyError 779 Raised if the user doesn't exist in PPMS. 780 """ 781 response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache) 782 783 if not response.text: 784 msg = f"User [{login_name}] is unknown to PPMS" 785 log.debug(msg) 786 raise KeyError(msg) 787 788 user = PpmsUser(response.text) 789 self.users[user.username] = user # update / add to the cached user objs 790 self.fullname_mapping[user.fullname] = user.username 791 return user 792 793 def get_user_dict(self, login_name, skip_cache=False): 794 """Get details on a given user from PPMS. 795 796 Parameters 797 ---------- 798 login_name : str 799 The PPMS account / login name of the user to query. 800 skip_cache : bool, optional 801 Passed as-is to the :py:meth:`request()` method 802 803 Returns 804 ------- 805 dict 806 A dict with the user details returned by the PUMAPI. 807 808 Example 809 ------- 810 >>> conn.get_user_dict('pyppms') 811 ... { 812 ... u'active': True, 813 ... u'affiliation': u'', 814 ... u'bcode': u'', 815 ... u'email': u'pyppms@python-facility.example', 816 ... u'fname': u'PumAPI', 817 ... u'lname': u'Python', 818 ... u'login': u'pyppms', 819 ... u'mustchbcode': False, 820 ... u'mustchpwd': False', 821 ... u'phone': u'+98 (76) 54 3210', 822 ... u'unitlogin': u'pyppms' 823 ... } 824 825 Raises 826 ------ 827 KeyError 828 Raised in case the user account is unknown to PPMS. 829 ValueError 830 Raised if the user details can't be parsed from the PUMAPI response. 831 """ 832 response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache) 833 834 if not response.text: 835 msg = f"User [{login_name}] is unknown to PPMS" 836 log.error(msg) 837 raise KeyError(msg) 838 839 # EXAMPLE: 840 # response.text = ( 841 # u'login,lname,fname,email,' 842 # u'phone,bcode,affiliation,unitlogin,mustchpwd,mustchbcode,' 843 # u'active\r\n' 844 # u'"pyppms","Python","PumAPI","pyppms@python-facility.example",' 845 # u'"+98 (76) 54 3210","","","pyppms",false,false,' 846 # u'true\r\n' 847 # ) 848 details = dict_from_single_response(response.text) 849 log.trace("Details for user [{}]: {}", login_name, details) 850 return details 851 852 def get_user_experience(self, login=None, system_id=None): 853 """Get user experience ("User rights") from PPMS. 854 855 Parameters 856 ---------- 857 login : str, optional 858 An optional login name to request the experience / permissions for, 859 by default None 860 system_id : int, optional 861 An optional system ID to request the experience / permissions for, 862 by default None 863 864 Returns 865 ------- 866 list(dict) 867 A list with dicts parsed from the user experience response. 868 """ 869 data = {} 870 if login is not None: 871 data["login"] = login 872 if system_id is not None: 873 data["id"] = system_id 874 response = self.request("getuserexp", parameters=data) 875 876 parsed = parse_multiline_response(response.text) 877 log.trace( 878 "Received {} experience entries for filters [user:{}] and [id:{}]", 879 len(parsed), 880 login, 881 system_id, 882 ) 883 return parsed 884 885 def get_user_ids(self, active=False): 886 """Get a list with all user IDs in the PPMS system. 887 888 Parameters 889 ---------- 890 active : bool, optional 891 Request only users marked as active in PPMS, by default False. 892 NOTE: "active" is a tri-state parameter in PPMS: "true", "false" 893 or empty! 894 895 Returns 896 ------- 897 list 898 A list of all (or active-only) user IDs in PPMS. 899 """ 900 # TODO: describe format of returned list and / or give an example! 901 parameters = {} 902 if active: 903 parameters["active"] = "true" 904 905 response = self.request("getusers", parameters) 906 907 users = response.text.splitlines() 908 active_desc = "active " if active else "" 909 log.trace("{} {}users in the PPMS database", len(users), active_desc) 910 log.trace(", ".join(users)) 911 return users 912 913 def get_users(self, force_refresh=False, active_only=True): 914 """Get user objects for all (or cached) PPMS users. 915 916 Parameters 917 ---------- 918 force_refresh : bool, optional 919 Re-request information from PPMS even if user details have been 920 cached locally before, by default False. 921 active_only : bool, optional 922 If set to `False` also "inactive" users will be fetched from PPMS, 923 by default `True`. 924 925 Returns 926 ------- 927 dict(pyppms.user.PpmsUser) 928 A dict of PpmsUser objects with the username (login) as key. 929 """ 930 if self.users and not force_refresh: 931 log.trace("Using cached details for {} users", len(self.users)) 932 else: 933 self.update_users(active_only=active_only) 934 935 return self.users 936 937 def get_users_emails(self, users=None, active=False): 938 """Get a list of user email addresses. WARNING - very slow! 939 940 Parameters 941 ---------- 942 users : list(str), optional 943 A list of login names to retrieve the email addresses for, if 944 omitted addresses for all (or active ones) will be requested. 945 active : bool, optional 946 Request only addresses of users marked as active in PPMS, by default 947 False. Will be ignored if a list of usernames is given explicitly. 948 949 Returns 950 ------- 951 list(str) 952 Email addresses of the users requested. 953 """ 954 emails = [] 955 if users is None: 956 users = self.get_user_ids(active=active) 957 for user in users: 958 email = self.get_user_dict(user)["email"] 959 if not email: 960 log.warning("--- WARNING: no email for user [{}]! ---", user) 961 continue 962 # log.trace("{}: {}", user, email) 963 emails.append(email) 964 965 return emails 966 967 def get_users_with_access_to_system(self, system_id): 968 """Get a list of usernames allowed to book the system with the given ID. 969 970 Parameters 971 ---------- 972 system_id : int or int-like 973 The ID of the system to query permitted users for. 974 975 Returns 976 ------- 977 list(str) 978 A list of usernames ('login') with permissions to book the system 979 with the given ID in PPMS. 980 981 Raises 982 ------ 983 ValueError 984 Raised in case parsing the response failes for any reason. 985 """ 986 users = [] 987 988 response = self.request("getsysrights", {"id": system_id}) 989 # this response has a unique format, so parse it directly here: 990 try: 991 lines = response.text.splitlines() 992 for line in lines: 993 permission, username = line.split(":") 994 if permission.upper() == "D": 995 log.trace( 996 "User [{}] is deactivated for booking system [{}], skipping", 997 username, 998 system_id, 999 ) 1000 continue 1001 1002 log.trace( 1003 "User [{}] has permission to book system [{}]", username, system_id 1004 ) 1005 users.append(username) 1006 1007 except Exception as err: 1008 msg = ( 1009 f"Unable to parse data returned by PUMAPI: {response.text} - " 1010 f"ERROR: {err}" 1011 ) 1012 log.error(msg) 1013 raise ValueError(msg) from err 1014 1015 return users 1016 1017 def give_user_access_to_system(self, username, system_id): 1018 """Add permissions for a user to book a given system in PPMS. 1019 1020 Parameters 1021 ---------- 1022 username : str 1023 The username ('login') to allow for booking the system. 1024 system_id : int or int-like 1025 The ID of the system to add the permission for. 1026 1027 Returns 1028 ------- 1029 bool 1030 True in case the given username now has the permissions to book the 1031 system with the specified ID (or if the user already had them 1032 before), False otherwise. 1033 """ 1034 return self.set_system_booking_permissions(username, system_id, "A") 1035 1036 def new_user( # pylint: disable-msg=too-many-arguments 1037 self, login, lname, fname, email, ppms_group, phone=None, password=None 1038 ): 1039 """Create a new user in PPMS. 1040 1041 The method is asking PPMS to create a new user account with the given details. 1042 In case an account with that login name already exists, it will log a warning 1043 and return without sending any further requests to PPMS. 1044 1045 Parameters 1046 ---------- 1047 login : str 1048 The unique identifier for the user. 1049 lname : str 1050 The last name of the user. 1051 fname : str 1052 The first name of the user. 1053 email : str 1054 The email address of the user. 1055 ppms_group : str 1056 The unique identifier of the primary group of the new user. A new group will 1057 be created if no group with the given name exists. 1058 phone : str, optional 1059 The phone number of the user. 1060 password : str, optional 1061 The password for the user. If no password is set the user will not be able 1062 to log on to PPMS. 1063 1064 Raises 1065 ------ 1066 RuntimeError 1067 Will be raised in case creating the user fails. 1068 """ 1069 if self.user_exists(login): 1070 log.warning("NOT creating user [{}] as it already exists!", login) 1071 return 1072 1073 req_data = { 1074 "login": login, 1075 "lname": lname, 1076 "fname": fname, 1077 "email": email, 1078 "unitlogin": ppms_group, 1079 } 1080 if phone: 1081 req_data["phone"] = phone 1082 if password: 1083 req_data["pwd"] = password 1084 1085 response = self.request("newuser", req_data) 1086 if not "OK newuser" in response.text: 1087 msg = f"Creating new user failed: {response.text}" 1088 log.error(msg) 1089 raise RuntimeError(msg) 1090 1091 log.debug("Created user [{}] in PPMS.", login) 1092 log.trace("Response was: {}", response.text) 1093 1094 def remove_user_access_from_system(self, username, system_id): 1095 """Remove permissions for a user to book a given system in PPMS. 1096 1097 Parameters 1098 ---------- 1099 username : str 1100 The username ('login') to remove booking permissions on the system. 1101 system_id : int or int-like 1102 The ID of the system to modify the permission for. 1103 1104 Returns 1105 ------- 1106 bool 1107 True in case the given username now has the permissions to book the 1108 system with the specified ID (or if the user already had them 1109 before), False otherwise. 1110 """ 1111 return self.set_system_booking_permissions(username, system_id, "D") 1112 1113 def set_system_booking_permissions(self, login, system_id, permission): 1114 """Set permissions for a user on a given system in PPMS. 1115 1116 Parameters 1117 ---------- 1118 username : str 1119 The username ('login') to allow for booking the system. 1120 system_id : int or int-like 1121 The ID of the system to add the permission for. 1122 permission : {'D', 'A', 'N', 'S'} 1123 The permission level to set for the user, one of: 1124 - ``D`` : deactivated 1125 - ``A`` : autonomous 1126 - ``N`` : novice 1127 - ``S`` : superuser 1128 1129 Returns 1130 ------- 1131 bool 1132 True in case setting permissions for the given username on the 1133 system with the specified ID succeeded (or if the user already had 1134 those permissions before), False otherwise. 1135 """ 1136 1137 def permission_name(shortname): 1138 """Closure to validate a permission level and return its long name. 1139 1140 Parameters 1141 ---------- 1142 shortname : str 1143 A single character defining the permission level. 1144 1145 Returns 1146 ------- 1147 str 1148 The long (human-readable) name of the permission level. 1149 1150 Raises 1151 ------ 1152 KeyError 1153 Raised in case an invalid permission level was given. 1154 """ 1155 mapping = { 1156 "D": "deactivated", 1157 "A": "autonomous", 1158 "N": "novice", 1159 "S": "superuser", 1160 } 1161 try: 1162 return mapping[shortname] 1163 except KeyError as err: 1164 raise KeyError(f"Invalid permission [{shortname}] given") from err 1165 1166 log.debug( 1167 "Setting permission level [{}] for user [{}] on system [{}]", 1168 permission_name(permission), 1169 login, 1170 system_id, 1171 ) 1172 1173 parameters = {"id": system_id, "login": login, "type": permission} 1174 response = self.request("setright", parameters) 1175 1176 # NOTE: the 'setright' action will accept ANY permission type and return 'done' 1177 # on the request, so there is no way to check from the response if setting the 1178 # permission really worked!! 1179 # log.trace('Request returned text: {}', response.text) 1180 if response.text.lower().strip() == "done": 1181 log.trace( 1182 "User [{}] now has permission level [{}] on system [{}]", 1183 login, 1184 permission_name(permission), 1185 system_id, 1186 ) 1187 return True 1188 1189 if "invalid user" in response.text.lower(): 1190 log.warning("User [{}] doesn't seem to exist in PPMS", login) 1191 elif "system right not authorized" in response.text.lower(): 1192 log.error( 1193 "Unable to set permissions for system {}: {}", system_id, response.text 1194 ) 1195 else: 1196 log.error("Unexpected response, assuming request failed: {}", response.text) 1197 1198 return False 1199 1200 def update_systems(self): 1201 """Update cached details for all bookable systems from PPMS. 1202 1203 Get the details on all bookable systems from PPMS and store them in the local 1204 cache. If parsing the PUMAPI response for a system fails for any reason, the 1205 system is skipped entirely. 1206 """ 1207 log.trace("Updating list of bookable systems...") 1208 systems = {} 1209 parse_fails = 0 1210 response = self.request("getsystems") 1211 details = parse_multiline_response(response.text, graceful=False) 1212 for detail in details: 1213 try: 1214 system = PpmsSystem(detail) 1215 except ValueError as err: 1216 log.error("Error processing `getsystems` response: {}", err) 1217 parse_fails += 1 1218 continue 1219 1220 systems[system.system_id] = system 1221 1222 log.trace( 1223 "Updated {} bookable systems from PPMS ({} systems failed parsing)", 1224 len(systems), 1225 parse_fails, 1226 ) 1227 1228 self.systems = systems 1229 1230 def update_users(self, user_ids=[], active_only=True): 1231 """Update cached details for a list of users from PPMS. 1232 1233 Get the user details on a list of users (or all active ones) from PPMS and store 1234 them in the object's `users` dict. As a side effect, this will also fill the 1235 cache directory in case the object's `cache_path` attribute is set. 1236 1237 WARNING - very slow, especially when the PPMS instance has many users! 1238 1239 Parameters 1240 ---------- 1241 user_ids : list(str), optional 1242 A list of user IDs (login names) to request the cache for, by 1243 default [] which will result in all *active* users to be requested. 1244 active_only : bool, optional 1245 If set to `False` also "inactive" users will be fetched from PPMS, 1246 by default `True`. 1247 """ 1248 if not user_ids: 1249 user_ids = self.get_user_ids(active=active_only) 1250 1251 log.trace("Updating details on {} users", len(user_ids)) 1252 for user_id in user_ids: 1253 self.get_user(user_id, skip_cache=True) 1254 1255 log.debug("Collected details on {} users", len(self.users)) 1256 1257 def user_exists(self, login): 1258 """Check if an account with the given login name already exists in PPMS. 1259 1260 Parameters 1261 ---------- 1262 login : str 1263 The login name to check for. 1264 1265 Returns 1266 ------- 1267 bool 1268 True in case an account with that name exists in PPMS, false otherwise. 1269 """ 1270 try: 1271 self.get_user(login) 1272 return True 1273 except KeyError: 1274 return False
27class PpmsConnection: 28 29 """Connection object to communicate with a PPMS instance. 30 31 Attributes 32 ---------- 33 url : str 34 The URL of the PUMAPI instance. 35 api_key : str 36 The API key used for authenticating against the PUMAPI. 37 timeout : float 38 The timeout value used in the ``requests.post`` calls. 39 cache_path : str 40 A path to a local directory used for caching responses. 41 cache_users_only : bool 42 Flag indicating that only PPMS user details will be stored in the 43 on-disk cache, nothing else. 44 last_served_from_cache : bool 45 Indicates if the last request was served from the cache or on-line. 46 users : dict 47 A dict with usernames as keys, mapping to the related 48 :py:class:`pyppms.user.PpmsUser` object, serves as a cache during the object's 49 lifetime (can be empty if no calls to :py:meth:`get_user()` have been done yet). 50 fullname_mapping : dict 51 A dict mapping a user's *fullname* ("``<LASTNAME> <FIRSTNAME>``") to the 52 corresponding username. Entries are filled in dynamically by the 53 :py:meth:`get_user()` method. 54 systems 55 A dict with system IDs as keys, mapping to the related 56 :py:class:`pyppms.system.PpmsSystem` object. Serves as a cache during the 57 object's lifetime (can be empty if no calls to the :py:meth:`get_systems()` have 58 been done yet). 59 status : dict 60 A dict with keys ``auth_state``, ``auth_response`` and 61 ``auth_httpstatus`` 62 """ 63 64 def __init__(self, url, api_key, timeout=10, cache="", cache_users_only=False): 65 """Constructor for the PPMS connection object. 66 67 Open a connection to the PUMAPI defined in `url` and try to authenticate 68 against it using the given API key (or use cache-only mode if key is an 69 empty string). If an optional path to a caching location is specified, 70 responses will be read from that location unless no matching file can be 71 found there, in which case an on-line request will be done (with the 72 response being saved to the cache path). 73 74 Parameters 75 ---------- 76 url : str 77 The URL of the PUMAPI to connect to. 78 api_key : str 79 The API key to use for authenticating against the PUMAPI. If 80 specified as '' authentication will be skipped and the connection is 81 running in cache-only (local) mode. 82 timeout : float, optional 83 How many seconds to wait for the PUMAPI server to send a response 84 before giving up, by default 10. 85 cache : str, optional 86 A path to a local directory for caching responses from PUMAPI in 87 individual text files. Useful for testing and for speeding up 88 slow requests like 'getusers'. By default empty, which will result 89 in no caching being done. 90 cache_users_only : bool, optional 91 If set to `True`, only `getuser` requests will be cached on disk. 92 This can be used in to speed up the slow requests (through the 93 cache), while everything else will be handled through online 94 requests. By default `False`. 95 96 Raises 97 ------ 98 requests.exceptions.ConnectionError 99 Raised in case authentication fails. 100 """ 101 self.url = url 102 self.api_key = api_key 103 self.timeout = timeout 104 self.users = {} 105 self.fullname_mapping = {} 106 self.systems = {} 107 self.status = { 108 "auth_state": "NOT_TRIED", 109 "auth_response": None, 110 "auth_httpstatus": -1, 111 } 112 self.cache_path = cache 113 self.cache_users_only = cache_users_only 114 self.last_served_from_cache = False 115 """Indicates if the last request was served from the cache or on-line.""" 116 117 # run in cache-only mode (e.g. for testing or off-line usage) if no API 118 # key has been specified, skip authentication then: 119 if api_key != "": 120 self.__authenticate() 121 elif cache == "": 122 raise RuntimeError( 123 "Neither API key nor cache path given, at least one is required!" 124 ) 125 126 def __authenticate(self): 127 """Try to authenticate to PPMS using the `auth` request. 128 129 Raises 130 ------ 131 requests.exceptions.ConnectionError 132 Raised in case authentication failed for any reason. 133 """ 134 log.trace( 135 "Attempting authentication against {} with key [{}...{}]", 136 self.url, 137 self.api_key[:2], 138 self.api_key[-2:], 139 ) 140 self.status["auth_state"] = "attempting" 141 response = self.request("auth") 142 log.trace(f"Authenticate response: {response.text}") 143 self.status["auth_response"] = response.text 144 self.status["auth_httpstatus"] = response.status_code 145 146 # NOTE: an unauthorized request has already been caught be the request() method 147 # above. Our legacy code was additionally testing for 'error' in the response 148 # text - however, it is unclear if PUMAPI ever returns this: 149 if "error" in response.text.lower(): 150 self.status["auth_state"] = "FAILED-ERROR" 151 msg = f"Authentication failed with an error: {response.text}" 152 log.error(msg) 153 raise requests.exceptions.ConnectionError(msg) 154 155 status_ok = requests.codes.ok # pylint: disable-msg=no-member 156 157 if response.status_code != status_ok: 158 # NOTE: branch excluded from coverage as we don't have a known way 159 # to produce such a response from the API 160 log.warning( 161 "Unexpected combination of response [{}] and status code [{}], it's " 162 "unclear if authentication succeeded (assuming it didn't)", 163 response.status_code, 164 response.text, 165 ) 166 self.status["auth_state"] = "FAILED-UNKNOWN" 167 168 msg = ( 169 f"Authenticating against {self.url} with key " 170 f"[{self.api_key[:2]}...{self.api_key[-2:]}] FAILED!" 171 ) 172 log.error(msg) 173 raise requests.exceptions.ConnectionError(msg) 174 175 log.trace( 176 "Authentication succeeded, response=[{}], http_status=[{}]", 177 response.text, 178 response.status_code, 179 ) 180 self.status["auth_state"] = "good" 181 182 def request(self, action, parameters={}, skip_cache=False): 183 """Generic method to submit a request to PPMS and return the result. 184 185 This convenience method deals with adding the API key to a given 186 request, submitting it to the PUMAPI and checking the response for some 187 specific keywords indicating an error. 188 189 Parameters 190 ---------- 191 action : str 192 The command to be submitted to the PUMAPI. 193 parameters : dict, optional 194 A dictionary with additional parameters to be submitted with the 195 request. 196 skip_cache : bool, optional 197 If set to True the request will NOT be served from the local cache, 198 independent whether a matching response file exists there, by 199 default False. 200 201 Returns 202 ------- 203 requests.Response 204 The response object created by posting the request. 205 206 Raises 207 ------ 208 requests.exceptions.ConnectionError 209 Raised in case the request is not authorized. 210 """ 211 req_data = {"action": action, "apikey": self.api_key} 212 req_data.update(parameters) 213 # log.debug("Request parameters: {}", parameters) 214 215 response = None 216 try: 217 if skip_cache: # pragma: no cover 218 raise LookupError("Skipping the cache has been requested") 219 response = self.__intercept_read(req_data) 220 self.last_served_from_cache = True 221 except LookupError as err: 222 log.trace(f"Doing an on-line request: {err}") 223 response = requests.post(self.url, data=req_data, timeout=self.timeout) 224 self.last_served_from_cache = False 225 226 # store the response if it hasn't been read from the cache before: 227 if not self.last_served_from_cache: # pragma: no cover 228 self.__intercept_store(req_data, response) 229 230 # NOTE: the HTTP status code returned is always `200` even if 231 # authentication failed, so we need to check the actual response *TEXT* 232 # to figure out if we have succeeded: 233 if "request not authorized" in response.text.lower(): 234 self.status["auth_state"] = "FAILED" 235 msg = f"Not authorized to run action `{req_data['action']}`" 236 log.error(msg) 237 raise requests.exceptions.ConnectionError(msg) 238 239 return response 240 241 def __interception_path(self, req_data, create_dir=False): 242 """Derive the path for a local cache file from a request's parameters. 243 244 Parameters 245 ---------- 246 req_data : dict 247 The request's parameters, used to derive the name of the cache file. 248 create_dir : bool, optional 249 If set to True the cache directory will be created if necessary. 250 Useful when adding responses to the cache. By default False. 251 252 Returns 253 ------- 254 str 255 The full path to a file name identified by all parameters of the 256 request (except credentials like 'apikey'). 257 """ 258 action = req_data["action"] 259 260 if self.cache_users_only and action != "getuser": 261 log.trace(f"NOT caching '{action}' (cache_users_only is set)") 262 return None 263 264 intercept_dir = os.path.join(self.cache_path, action) 265 if create_dir and not os.path.exists(intercept_dir): # pragma: no cover 266 try: 267 os.makedirs(intercept_dir) 268 log.trace(f"Created dir to store response: {intercept_dir}") 269 except Exception as err: # pylint: disable-msg=broad-except 270 log.warning(f"Failed creating [{intercept_dir}]: {err}") 271 return None 272 273 signature = "" 274 # different python versions are returning dict items in different order, so 275 # simply iterating over them will not always produce the same result - hence we 276 # build up a sorted list of keys first and use that one then: 277 keylist = list(req_data.keys()) 278 keylist.sort() 279 for key in keylist: 280 if key in ["action", "apikey"]: 281 continue 282 signature += f"__{key}--{req_data[key]}" 283 if signature == "": 284 signature = "__response" 285 signature = signature[2:] + ".txt" 286 intercept_file = os.path.join(intercept_dir, signature) 287 return intercept_file 288 289 def __intercept_read(self, req_data): 290 """Try to read a cached response from a local file. 291 292 Parameters 293 ---------- 294 req_data : dict 295 The request's parameters, used to derive the name of the cache file. 296 297 Returns 298 ------- 299 PseudoResponse 300 The response text read from the cache file wrapped in a 301 PseudoResponse object, or None in case no matching file was found in 302 the local cache. 303 304 Raises 305 ------ 306 LookupError 307 Raised in case no cache path has been set or no cache file matching 308 the request parameters could be found in the cache. 309 """ 310 311 # pylint: disable-msg=too-few-public-methods 312 class PseudoResponse: 313 """Dummy response object with attribs 'text' and 'status_code'.""" 314 315 def __init__(self, text, status_code): 316 self.text = text 317 self.status_code = int(status_code) 318 319 if self.cache_path == "": 320 raise LookupError("No cache path configured") 321 322 intercept_file = self.__interception_path(req_data, create_dir=False) 323 if not intercept_file or not os.path.exists(intercept_file): # pragma: no cover 324 raise LookupError(f"No cache hit for [{intercept_file}]") 325 326 with open(intercept_file, "r", encoding="utf-8") as infile: 327 text = infile.read() 328 log.debug( 329 "Read intercepted response text from [{}]", 330 intercept_file[len(str(self.cache_path)) :], 331 ) 332 333 status_code = 200 334 status_file = os.path.splitext(intercept_file)[0] + "_status-code.txt" 335 if os.path.exists(status_file): 336 with open(status_file, "r", encoding="utf-8") as infile: 337 status_code = infile.read() 338 log.debug(f"Read intercepted response status code from [{status_file}]") 339 return PseudoResponse(text, status_code) 340 341 def __intercept_store(self, req_data, response): # pragma: no cover 342 """Store the response in a local cache file named after the request. 343 344 Parameters 345 ---------- 346 req_data : dict 347 The request's parameters, used to derive the name of the cache file 348 so it can be matched later when running the same request again. 349 response : requests.Response 350 The response object to store in the local cache. 351 """ 352 # NOTE: this method is excluded from coverage measurements as it can only be 353 # triggered when testing in online mode with at least one request not being 354 # served from the cache (which is orthogonal to off-line testing) 355 if self.cache_path == "": 356 return 357 358 intercept_file = self.__interception_path(req_data, create_dir=True) 359 if not intercept_file: 360 log.trace("Not storing intercepted results in cache.") 361 return 362 363 try: 364 with open(intercept_file, "w", encoding="utf-8") as outfile: 365 outfile.write(response.text) 366 log.debug( 367 "Wrote response text to [{}] ({} lines)", 368 intercept_file, 369 len(response.text.splitlines()), 370 ) 371 except Exception as err: # pylint: disable-msg=broad-except 372 log.error("Storing response text in [{}] failed: {}", intercept_file, err) 373 log.error("Response text was:\n--------\n{}\n--------", response.text) 374 375 def flush_cache(self, keep_users=False): 376 """Flush the PyPPMS on-disk cache. 377 378 Optionally flushes everything *except* the `getuser` cache if the 379 `keep_users` flag is set to `True`, as this is clearly the most 380 time-consuming operation when fetching data from PUMAPI and therefore 381 might want to be retained. 382 383 Please note that the `getusers` cache (plural, including the `s` suffix) 384 will be flushed no matter what, as this is simply a list of user IDs 385 that can be fetched with a single request. In consequence this means 386 that using the `keep_users` flag will allow you to have reasonably fast 387 reaction times while still getting information on *new* users live from 388 PUMAPI at the only cost of possibly having outdated information on 389 *existing* users. 390 391 Parameters 392 ---------- 393 keep_users : bool, optional 394 If set to `True` the `getuser` sub-directory in the cache location 395 will be kept, by default `False`. 396 """ 397 if self.cache_path == "": 398 log.debug("No cache path configured, not flushing!") 399 return 400 401 dirs_to_remove = [self.cache_path] # by default remove the entire cache dir 402 keep_msg = "" 403 if keep_users: 404 keep_msg = " (keeping user details dirs)" 405 dirs_to_remove = [] 406 cache_dirs = os.listdir(self.cache_path) 407 for subdir in cache_dirs: 408 if subdir == "getuser": 409 continue 410 dirs_to_remove.append(os.path.join(self.cache_path, subdir)) 411 412 log.debug("Flushing the on-disk cache at [{}] {}...", self.cache_path, keep_msg) 413 for directory in dirs_to_remove: 414 try: 415 shutil.rmtree(directory) 416 log.trace("Removed directory [{}].", directory) 417 except Exception as ex: # pylint: disable-msg=broad-except 418 log.warning("Removing the cache at [{}] failed: {}", directory, ex) 419 420 def get_admins(self): 421 """Get all PPMS administrator users. 422 423 Returns 424 ------- 425 list(pyppms.user.PpmsUser) 426 A list with PpmsUser objects that are PPMS administrators. 427 """ 428 response = self.request("getadmins") 429 430 admins = response.text.splitlines() 431 users = [] 432 for username in admins: 433 user = self.get_user(username) 434 users.append(user) 435 log.trace("{} admins in the PPMS database: {}", len(admins), ", ".join(admins)) 436 return users 437 438 def get_booking(self, system_id, booking_type="get"): 439 """Get the current or next booking of a system. 440 441 WARNING: if the next booking is requested but it is too far in the future, 442 PUMAPI silently ignores it - the response is identical to a system that has no 443 future bookings and there is no error reported either. Currently it is unclear 444 where the cutoff is (e.g. lookups for a booking that is two years from now still 445 work fine, but a booking in about 10 years is silently skipped). 446 447 Parameters 448 ---------- 449 system_id : int or int-like 450 The ID of the system in PPMS. 451 booking_type : {'get', 'next'}, optional 452 The type of booking to request, one of `get` (requesting the 453 currently running booking) and `next` (requesting the next upcoming 454 booking), by default `get`. 455 NOTE: if `next` is requested the resulting booking object will **NOT** have 456 an end time (`endtime` will be `None`) as PUMAPI doesn't provide one in that 457 case! 458 459 Returns 460 ------- 461 pyppms.booking.PpmsBooking or None 462 The booking object, or None if there is no booking for the system or the 463 request is refused by PUMAPI (e.g. "not authorized"). 464 465 Raises 466 ------ 467 ValueError 468 Raised if the specified `booking_type` is invalid. 469 """ 470 valid = ["get", "next"] 471 if booking_type not in valid: 472 raise ValueError( 473 f"Value for 'booking_type' ({booking_type}) not in {valid}!" 474 ) 475 476 try: 477 response = self.request(booking_type + "booking", {"id": system_id}) 478 except requests.exceptions.ConnectionError: 479 log.error("Requesting booking status for system {} failed!", system_id) 480 return None 481 482 desc = "any future bookings" 483 if booking_type == "get": 484 desc = "a currently active booking" 485 if not response.text.strip(): 486 log.trace("System [{}] doesn't have {}", system_id, desc) 487 return None 488 489 return PpmsBooking(response.text, booking_type, system_id) 490 491 def get_current_booking(self, system_id): 492 """Wrapper for `get_booking()` with 'booking_type' set to 'get'.""" 493 return self.get_booking(system_id, "get") 494 495 def get_group(self, group_id): 496 """Fetch group details from PPMS and create a dict from them. 497 498 Parameters 499 ---------- 500 group_id : str 501 The group's identifier in PPMS, called 'unitlogin' there. 502 503 Returns 504 ------- 505 dict 506 A dict with the group details, keys being derived from the header 507 line of the PUMAPI response, values from the data line. 508 """ 509 response = self.request("getgroup", {"unitlogin": group_id}) 510 log.trace("Group details returned by PPMS (raw): {}", response.text) 511 512 if not response.text: 513 msg = f"Group [{group_id}] is unknown to PPMS" 514 log.error(msg) 515 raise KeyError(msg) 516 517 details = dict_from_single_response(response.text) 518 519 log.trace("Details of group {}: {}", group_id, details) 520 return details 521 522 def get_group_users(self, unitlogin): 523 """Get all members of a group in PPMS. 524 525 Parameters 526 ---------- 527 unitlogin : str 528 The group's login ("unique login or id" in the PPMS web interface). 529 530 Returns 531 ------- 532 list(pyppms.user.PpmsUser) 533 A list with PpmsUser objects that are members of this PPMS group. 534 """ 535 response = self.request("getgroupusers", {"unitlogin": unitlogin}) 536 537 members = response.text.splitlines() 538 users = [] 539 for username in members: 540 user = self.get_user(username) 541 users.append(user) 542 log.trace( 543 "{} members in PPMS group [{}]: {}", 544 len(members), 545 unitlogin, 546 ", ".join(members), 547 ) 548 return users 549 550 def get_groups(self): 551 """Get a list of all groups in PPMS. 552 553 Returns 554 ------- 555 list(str) 556 A list with the group identifiers in PPMS. 557 """ 558 response = self.request("getgroups") 559 560 groups = response.text.splitlines() 561 log.trace("{} groups in the PPMS database: {}", len(groups), ", ".join(groups)) 562 return groups 563 564 def get_next_booking(self, system_id): 565 """Wrapper for `get_booking()` with 'booking_type' set to 'next'.""" 566 return self.get_booking(system_id, "next") 567 568 def get_running_sheet( 569 self, core_facility_ref, date, ignore_uncached_users=False, localisation="" 570 ): 571 """Get the running sheet for a specific day on the given facility. 572 573 The so-called "running-sheet" consists of all bookings / reservations of 574 a facility on a specifc day. 575 576 WARNING: PUMAPI doesn't return a proper unique user identifier with the 577 'getrunningsheet' request, instead the so called "full name" is given to 578 identify the user - unfortunately this can lead to ambiguities as 579 multiple different accounts can have the same full name. 580 581 Parameters 582 ---------- 583 core_facility_ref : int or int-like 584 The core facility ID for PPMS. 585 date : datetime.datetime 586 The date to request the running sheet for, e.g. ``datetime.now()`` or 587 similar. Note that only the date part is relevant, time will be ignored. 588 ignore_uncached_users : bool, optional 589 If set to `True` any booking for a user that is not present in the instance 590 attribute `fullname_mapping` will be ignored in the resulting list. 591 localisation : str, optional 592 If given, the runningsheet will be limited to systems where the 593 `localisation` (~"room") field matches the given value. 594 595 Returns 596 ------- 597 list(pyppms.booking.PpmsBooking) 598 A list with `PpmsBooking` objects for the given day. Empty in case 599 there are no bookings or parsing the response failed. 600 """ 601 bookings = [] 602 parameters = { 603 "plateformid": f"{core_facility_ref}", 604 "day": date.strftime("%Y-%m-%d"), 605 } 606 log.trace("Requesting runningsheet for {}", parameters["day"]) 607 response = self.request("getrunningsheet", parameters) 608 try: 609 entries = parse_multiline_response(response.text, graceful=False) 610 except NoDataError: 611 # in case no bookings exist the response will be empty! 612 log.trace("Runningsheet for the given day was empty!") 613 return [] 614 except Exception as err: # pylint: disable-msg=broad-except 615 log.error("Parsing runningsheet details failed: {}", err) 616 log.trace("Runningsheet PUMPAI response was: >>>{}<<<", response.text) 617 return [] 618 619 for entry in entries: 620 full = entry["User"] 621 if full not in self.fullname_mapping: 622 if ignore_uncached_users: 623 log.debug(f"Ignoring booking for uncached / unknown user [{full}]") 624 continue 625 626 log.debug(f"Booking refers an uncached user ({full}), updating users!") 627 self.update_users() 628 629 if full not in self.fullname_mapping: 630 log.error("PPMS doesn't seem to know user [{}], skipping", full) 631 continue 632 633 log.trace( 634 f"Booking for user '{self.fullname_mapping[full]}' ({full}) found" 635 ) 636 system_name = entry["Object"] 637 # FIXME: add a test with one system name being a subset of another system 638 # (this will result in more than one result and should be fixed e.g. by 639 # adding an optional parameter "exact" to get_systems_matching() or 640 # similar) 641 system_ids = self.get_systems_matching(localisation, [system_name]) 642 if len(system_ids) < 1: 643 if localisation: 644 log.debug(f"Given criteria return zero systems for [{system_name}]") 645 else: 646 log.warning(f"No systems matching criteria for [{system_name}]") 647 continue 648 649 if len(system_ids) > 1: 650 # NOTE: more than one result should not happen as PPMS doesn't allow for 651 # multiple systems having the same name - no result might happen though! 652 log.error("Ignoring booking for unknown system [{}]", system_name) 653 continue 654 655 booking = PpmsBooking.from_runningsheet( 656 entry, 657 system_ids[0], 658 self.fullname_mapping[full], 659 date, 660 ) 661 bookings.append(booking) 662 663 return bookings 664 665 def get_systems(self, force_refresh=False): 666 """Get a dict with all systems in PPMS. 667 668 Parameters 669 ---------- 670 force_refresh : bool, optional 671 If `True` the list of systems will be refreshed even if the object's 672 attribute `self.systems` is non-empty, by default `False`. Please 673 note that this will NOT skip the on-disk cache in case that exists! 674 675 Returns 676 ------- 677 dict(pyppms.system.PpmsSystem) 678 A dict with `PpmsSystem` objects parsed from the PUMAPI response where 679 the system ID (int) is used as the dict's key. If parsing a system 680 fails for any reason, the system is skipped entirely. 681 """ 682 if self.systems and not force_refresh: 683 log.trace("Using cached details for {} systems", len(self.systems)) 684 else: 685 self.update_systems() 686 687 return self.systems 688 689 def get_systems_matching(self, localisation, name_contains): 690 """Query PPMS for systems with a specific location and name. 691 692 This method assembles a list of PPMS system IDs whose "localisation" 693 (room) field matches a given string and where the system name contains 694 at least one of the strings given as the `name_contains` parameter. 695 696 Parameters 697 ---------- 698 localisation : str 699 A string that the system's "localisation" (i.e. the "Room" field in 700 the PPMS web interface) has to match. Can be an empty string which 701 will result in no filtering being done on the "Room" attribute. 702 name_contains : list(str) 703 A list of valid names (categories) of which the system's name has to 704 match at least one for being included. Supply an empty list for 705 skipping this filter. 706 707 Returns 708 ------- 709 list(int) 710 A list with PPMS system IDs matching all of the given criteria. 711 712 Raises 713 ------ 714 TypeError 715 Raised in case the `name_contains` parameter is of type `str` (it 716 needs to be `list(str)` instead). 717 """ 718 if isinstance(name_contains, str): 719 raise TypeError("`name_contains` must be a list of str, not str!") 720 721 loc = localisation 722 loc_desc = f"with location matching [{localisation}]" 723 if localisation == "": 724 loc_desc = "(no location filter given)" 725 726 log.trace( 727 "Querying PPMS for systems {}, name matching any of {}", 728 loc_desc, 729 name_contains, 730 ) 731 system_ids = [] 732 systems = self.get_systems() 733 for sys_id, system in systems.items(): 734 if loc.lower() not in str(system.localisation).lower(): 735 log.trace( 736 "System [{}] location ({}) is NOT matching ({}), ignoring", 737 system.name, 738 system.localisation, 739 loc, 740 ) 741 continue 742 743 # log.trace('System [{}] is matching location [{}], checking if ' 744 # 'the name is matching any of the valid pattern {}', 745 # system.name, loc, name_contains) 746 for valid_name in name_contains: 747 if valid_name in system.name: 748 log.trace("System [{}] matches all criteria", system.name) 749 system_ids.append(sys_id) 750 break 751 752 # if sys_id not in system_ids: 753 # log.trace('System [{}] does NOT match a valid name: {}', 754 # system.name, name_contains) 755 756 log.trace("Found {} bookable systems {}", len(system_ids), loc_desc) 757 log.trace("IDs of matching bookable systems {}: {}", loc_desc, system_ids) 758 return system_ids 759 760 def get_user(self, login_name, skip_cache=False): 761 """Fetch user details from PPMS and create a PpmsUser object from it. 762 763 Parameters 764 ---------- 765 login_name : str 766 The user's PPMS login name. 767 skip_cache : bool, optional 768 Passed as-is to the :py:meth:`request()` method 769 770 Returns 771 ------- 772 pyppms.user.PpmsUser 773 The user object created from the PUMAPI response. The object will be 774 additionally stored in the self.users dict using the login_name as 775 the dict's key. 776 777 Raises 778 ------ 779 KeyError 780 Raised if the user doesn't exist in PPMS. 781 """ 782 response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache) 783 784 if not response.text: 785 msg = f"User [{login_name}] is unknown to PPMS" 786 log.debug(msg) 787 raise KeyError(msg) 788 789 user = PpmsUser(response.text) 790 self.users[user.username] = user # update / add to the cached user objs 791 self.fullname_mapping[user.fullname] = user.username 792 return user 793 794 def get_user_dict(self, login_name, skip_cache=False): 795 """Get details on a given user from PPMS. 796 797 Parameters 798 ---------- 799 login_name : str 800 The PPMS account / login name of the user to query. 801 skip_cache : bool, optional 802 Passed as-is to the :py:meth:`request()` method 803 804 Returns 805 ------- 806 dict 807 A dict with the user details returned by the PUMAPI. 808 809 Example 810 ------- 811 >>> conn.get_user_dict('pyppms') 812 ... { 813 ... u'active': True, 814 ... u'affiliation': u'', 815 ... u'bcode': u'', 816 ... u'email': u'pyppms@python-facility.example', 817 ... u'fname': u'PumAPI', 818 ... u'lname': u'Python', 819 ... u'login': u'pyppms', 820 ... u'mustchbcode': False, 821 ... u'mustchpwd': False', 822 ... u'phone': u'+98 (76) 54 3210', 823 ... u'unitlogin': u'pyppms' 824 ... } 825 826 Raises 827 ------ 828 KeyError 829 Raised in case the user account is unknown to PPMS. 830 ValueError 831 Raised if the user details can't be parsed from the PUMAPI response. 832 """ 833 response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache) 834 835 if not response.text: 836 msg = f"User [{login_name}] is unknown to PPMS" 837 log.error(msg) 838 raise KeyError(msg) 839 840 # EXAMPLE: 841 # response.text = ( 842 # u'login,lname,fname,email,' 843 # u'phone,bcode,affiliation,unitlogin,mustchpwd,mustchbcode,' 844 # u'active\r\n' 845 # u'"pyppms","Python","PumAPI","pyppms@python-facility.example",' 846 # u'"+98 (76) 54 3210","","","pyppms",false,false,' 847 # u'true\r\n' 848 # ) 849 details = dict_from_single_response(response.text) 850 log.trace("Details for user [{}]: {}", login_name, details) 851 return details 852 853 def get_user_experience(self, login=None, system_id=None): 854 """Get user experience ("User rights") from PPMS. 855 856 Parameters 857 ---------- 858 login : str, optional 859 An optional login name to request the experience / permissions for, 860 by default None 861 system_id : int, optional 862 An optional system ID to request the experience / permissions for, 863 by default None 864 865 Returns 866 ------- 867 list(dict) 868 A list with dicts parsed from the user experience response. 869 """ 870 data = {} 871 if login is not None: 872 data["login"] = login 873 if system_id is not None: 874 data["id"] = system_id 875 response = self.request("getuserexp", parameters=data) 876 877 parsed = parse_multiline_response(response.text) 878 log.trace( 879 "Received {} experience entries for filters [user:{}] and [id:{}]", 880 len(parsed), 881 login, 882 system_id, 883 ) 884 return parsed 885 886 def get_user_ids(self, active=False): 887 """Get a list with all user IDs in the PPMS system. 888 889 Parameters 890 ---------- 891 active : bool, optional 892 Request only users marked as active in PPMS, by default False. 893 NOTE: "active" is a tri-state parameter in PPMS: "true", "false" 894 or empty! 895 896 Returns 897 ------- 898 list 899 A list of all (or active-only) user IDs in PPMS. 900 """ 901 # TODO: describe format of returned list and / or give an example! 902 parameters = {} 903 if active: 904 parameters["active"] = "true" 905 906 response = self.request("getusers", parameters) 907 908 users = response.text.splitlines() 909 active_desc = "active " if active else "" 910 log.trace("{} {}users in the PPMS database", len(users), active_desc) 911 log.trace(", ".join(users)) 912 return users 913 914 def get_users(self, force_refresh=False, active_only=True): 915 """Get user objects for all (or cached) PPMS users. 916 917 Parameters 918 ---------- 919 force_refresh : bool, optional 920 Re-request information from PPMS even if user details have been 921 cached locally before, by default False. 922 active_only : bool, optional 923 If set to `False` also "inactive" users will be fetched from PPMS, 924 by default `True`. 925 926 Returns 927 ------- 928 dict(pyppms.user.PpmsUser) 929 A dict of PpmsUser objects with the username (login) as key. 930 """ 931 if self.users and not force_refresh: 932 log.trace("Using cached details for {} users", len(self.users)) 933 else: 934 self.update_users(active_only=active_only) 935 936 return self.users 937 938 def get_users_emails(self, users=None, active=False): 939 """Get a list of user email addresses. WARNING - very slow! 940 941 Parameters 942 ---------- 943 users : list(str), optional 944 A list of login names to retrieve the email addresses for, if 945 omitted addresses for all (or active ones) will be requested. 946 active : bool, optional 947 Request only addresses of users marked as active in PPMS, by default 948 False. Will be ignored if a list of usernames is given explicitly. 949 950 Returns 951 ------- 952 list(str) 953 Email addresses of the users requested. 954 """ 955 emails = [] 956 if users is None: 957 users = self.get_user_ids(active=active) 958 for user in users: 959 email = self.get_user_dict(user)["email"] 960 if not email: 961 log.warning("--- WARNING: no email for user [{}]! ---", user) 962 continue 963 # log.trace("{}: {}", user, email) 964 emails.append(email) 965 966 return emails 967 968 def get_users_with_access_to_system(self, system_id): 969 """Get a list of usernames allowed to book the system with the given ID. 970 971 Parameters 972 ---------- 973 system_id : int or int-like 974 The ID of the system to query permitted users for. 975 976 Returns 977 ------- 978 list(str) 979 A list of usernames ('login') with permissions to book the system 980 with the given ID in PPMS. 981 982 Raises 983 ------ 984 ValueError 985 Raised in case parsing the response failes for any reason. 986 """ 987 users = [] 988 989 response = self.request("getsysrights", {"id": system_id}) 990 # this response has a unique format, so parse it directly here: 991 try: 992 lines = response.text.splitlines() 993 for line in lines: 994 permission, username = line.split(":") 995 if permission.upper() == "D": 996 log.trace( 997 "User [{}] is deactivated for booking system [{}], skipping", 998 username, 999 system_id, 1000 ) 1001 continue 1002 1003 log.trace( 1004 "User [{}] has permission to book system [{}]", username, system_id 1005 ) 1006 users.append(username) 1007 1008 except Exception as err: 1009 msg = ( 1010 f"Unable to parse data returned by PUMAPI: {response.text} - " 1011 f"ERROR: {err}" 1012 ) 1013 log.error(msg) 1014 raise ValueError(msg) from err 1015 1016 return users 1017 1018 def give_user_access_to_system(self, username, system_id): 1019 """Add permissions for a user to book a given system in PPMS. 1020 1021 Parameters 1022 ---------- 1023 username : str 1024 The username ('login') to allow for booking the system. 1025 system_id : int or int-like 1026 The ID of the system to add the permission for. 1027 1028 Returns 1029 ------- 1030 bool 1031 True in case the given username now has the permissions to book the 1032 system with the specified ID (or if the user already had them 1033 before), False otherwise. 1034 """ 1035 return self.set_system_booking_permissions(username, system_id, "A") 1036 1037 def new_user( # pylint: disable-msg=too-many-arguments 1038 self, login, lname, fname, email, ppms_group, phone=None, password=None 1039 ): 1040 """Create a new user in PPMS. 1041 1042 The method is asking PPMS to create a new user account with the given details. 1043 In case an account with that login name already exists, it will log a warning 1044 and return without sending any further requests to PPMS. 1045 1046 Parameters 1047 ---------- 1048 login : str 1049 The unique identifier for the user. 1050 lname : str 1051 The last name of the user. 1052 fname : str 1053 The first name of the user. 1054 email : str 1055 The email address of the user. 1056 ppms_group : str 1057 The unique identifier of the primary group of the new user. A new group will 1058 be created if no group with the given name exists. 1059 phone : str, optional 1060 The phone number of the user. 1061 password : str, optional 1062 The password for the user. If no password is set the user will not be able 1063 to log on to PPMS. 1064 1065 Raises 1066 ------ 1067 RuntimeError 1068 Will be raised in case creating the user fails. 1069 """ 1070 if self.user_exists(login): 1071 log.warning("NOT creating user [{}] as it already exists!", login) 1072 return 1073 1074 req_data = { 1075 "login": login, 1076 "lname": lname, 1077 "fname": fname, 1078 "email": email, 1079 "unitlogin": ppms_group, 1080 } 1081 if phone: 1082 req_data["phone"] = phone 1083 if password: 1084 req_data["pwd"] = password 1085 1086 response = self.request("newuser", req_data) 1087 if not "OK newuser" in response.text: 1088 msg = f"Creating new user failed: {response.text}" 1089 log.error(msg) 1090 raise RuntimeError(msg) 1091 1092 log.debug("Created user [{}] in PPMS.", login) 1093 log.trace("Response was: {}", response.text) 1094 1095 def remove_user_access_from_system(self, username, system_id): 1096 """Remove permissions for a user to book a given system in PPMS. 1097 1098 Parameters 1099 ---------- 1100 username : str 1101 The username ('login') to remove booking permissions on the system. 1102 system_id : int or int-like 1103 The ID of the system to modify the permission for. 1104 1105 Returns 1106 ------- 1107 bool 1108 True in case the given username now has the permissions to book the 1109 system with the specified ID (or if the user already had them 1110 before), False otherwise. 1111 """ 1112 return self.set_system_booking_permissions(username, system_id, "D") 1113 1114 def set_system_booking_permissions(self, login, system_id, permission): 1115 """Set permissions for a user on a given system in PPMS. 1116 1117 Parameters 1118 ---------- 1119 username : str 1120 The username ('login') to allow for booking the system. 1121 system_id : int or int-like 1122 The ID of the system to add the permission for. 1123 permission : {'D', 'A', 'N', 'S'} 1124 The permission level to set for the user, one of: 1125 - ``D`` : deactivated 1126 - ``A`` : autonomous 1127 - ``N`` : novice 1128 - ``S`` : superuser 1129 1130 Returns 1131 ------- 1132 bool 1133 True in case setting permissions for the given username on the 1134 system with the specified ID succeeded (or if the user already had 1135 those permissions before), False otherwise. 1136 """ 1137 1138 def permission_name(shortname): 1139 """Closure to validate a permission level and return its long name. 1140 1141 Parameters 1142 ---------- 1143 shortname : str 1144 A single character defining the permission level. 1145 1146 Returns 1147 ------- 1148 str 1149 The long (human-readable) name of the permission level. 1150 1151 Raises 1152 ------ 1153 KeyError 1154 Raised in case an invalid permission level was given. 1155 """ 1156 mapping = { 1157 "D": "deactivated", 1158 "A": "autonomous", 1159 "N": "novice", 1160 "S": "superuser", 1161 } 1162 try: 1163 return mapping[shortname] 1164 except KeyError as err: 1165 raise KeyError(f"Invalid permission [{shortname}] given") from err 1166 1167 log.debug( 1168 "Setting permission level [{}] for user [{}] on system [{}]", 1169 permission_name(permission), 1170 login, 1171 system_id, 1172 ) 1173 1174 parameters = {"id": system_id, "login": login, "type": permission} 1175 response = self.request("setright", parameters) 1176 1177 # NOTE: the 'setright' action will accept ANY permission type and return 'done' 1178 # on the request, so there is no way to check from the response if setting the 1179 # permission really worked!! 1180 # log.trace('Request returned text: {}', response.text) 1181 if response.text.lower().strip() == "done": 1182 log.trace( 1183 "User [{}] now has permission level [{}] on system [{}]", 1184 login, 1185 permission_name(permission), 1186 system_id, 1187 ) 1188 return True 1189 1190 if "invalid user" in response.text.lower(): 1191 log.warning("User [{}] doesn't seem to exist in PPMS", login) 1192 elif "system right not authorized" in response.text.lower(): 1193 log.error( 1194 "Unable to set permissions for system {}: {}", system_id, response.text 1195 ) 1196 else: 1197 log.error("Unexpected response, assuming request failed: {}", response.text) 1198 1199 return False 1200 1201 def update_systems(self): 1202 """Update cached details for all bookable systems from PPMS. 1203 1204 Get the details on all bookable systems from PPMS and store them in the local 1205 cache. If parsing the PUMAPI response for a system fails for any reason, the 1206 system is skipped entirely. 1207 """ 1208 log.trace("Updating list of bookable systems...") 1209 systems = {} 1210 parse_fails = 0 1211 response = self.request("getsystems") 1212 details = parse_multiline_response(response.text, graceful=False) 1213 for detail in details: 1214 try: 1215 system = PpmsSystem(detail) 1216 except ValueError as err: 1217 log.error("Error processing `getsystems` response: {}", err) 1218 parse_fails += 1 1219 continue 1220 1221 systems[system.system_id] = system 1222 1223 log.trace( 1224 "Updated {} bookable systems from PPMS ({} systems failed parsing)", 1225 len(systems), 1226 parse_fails, 1227 ) 1228 1229 self.systems = systems 1230 1231 def update_users(self, user_ids=[], active_only=True): 1232 """Update cached details for a list of users from PPMS. 1233 1234 Get the user details on a list of users (or all active ones) from PPMS and store 1235 them in the object's `users` dict. As a side effect, this will also fill the 1236 cache directory in case the object's `cache_path` attribute is set. 1237 1238 WARNING - very slow, especially when the PPMS instance has many users! 1239 1240 Parameters 1241 ---------- 1242 user_ids : list(str), optional 1243 A list of user IDs (login names) to request the cache for, by 1244 default [] which will result in all *active* users to be requested. 1245 active_only : bool, optional 1246 If set to `False` also "inactive" users will be fetched from PPMS, 1247 by default `True`. 1248 """ 1249 if not user_ids: 1250 user_ids = self.get_user_ids(active=active_only) 1251 1252 log.trace("Updating details on {} users", len(user_ids)) 1253 for user_id in user_ids: 1254 self.get_user(user_id, skip_cache=True) 1255 1256 log.debug("Collected details on {} users", len(self.users)) 1257 1258 def user_exists(self, login): 1259 """Check if an account with the given login name already exists in PPMS. 1260 1261 Parameters 1262 ---------- 1263 login : str 1264 The login name to check for. 1265 1266 Returns 1267 ------- 1268 bool 1269 True in case an account with that name exists in PPMS, false otherwise. 1270 """ 1271 try: 1272 self.get_user(login) 1273 return True 1274 except KeyError: 1275 return False
Connection object to communicate with a PPMS instance.
Attributes
- url (str): The URL of the PUMAPI instance.
- api_key (str): The API key used for authenticating against the PUMAPI.
- timeout (float):
The timeout value used in the
requests.post
calls. - cache_path (str): A path to a local directory used for caching responses.
- cache_users_only (bool): Flag indicating that only PPMS user details will be stored in the on-disk cache, nothing else.
- last_served_from_cache (bool): Indicates if the last request was served from the cache or on-line.
- users (dict):
A dict with usernames as keys, mapping to the related
pyppms.user.PpmsUser
object, serves as a cache during the object's lifetime (can be empty if no calls toget_user()()
have been done yet). - fullname_mapping (dict):
A dict mapping a user's fullname ("
<LASTNAME> <FIRSTNAME>
") to the corresponding username. Entries are filled in dynamically by theget_user()()
method. - systems: A dict with system IDs as keys, mapping to the related
pyppms.system.PpmsSystem
object. Serves as a cache during the object's lifetime (can be empty if no calls to theget_systems()()
have been done yet). - status (dict):
A dict with keys
auth_state
,auth_response
andauth_httpstatus
64 def __init__(self, url, api_key, timeout=10, cache="", cache_users_only=False): 65 """Constructor for the PPMS connection object. 66 67 Open a connection to the PUMAPI defined in `url` and try to authenticate 68 against it using the given API key (or use cache-only mode if key is an 69 empty string). If an optional path to a caching location is specified, 70 responses will be read from that location unless no matching file can be 71 found there, in which case an on-line request will be done (with the 72 response being saved to the cache path). 73 74 Parameters 75 ---------- 76 url : str 77 The URL of the PUMAPI to connect to. 78 api_key : str 79 The API key to use for authenticating against the PUMAPI. If 80 specified as '' authentication will be skipped and the connection is 81 running in cache-only (local) mode. 82 timeout : float, optional 83 How many seconds to wait for the PUMAPI server to send a response 84 before giving up, by default 10. 85 cache : str, optional 86 A path to a local directory for caching responses from PUMAPI in 87 individual text files. Useful for testing and for speeding up 88 slow requests like 'getusers'. By default empty, which will result 89 in no caching being done. 90 cache_users_only : bool, optional 91 If set to `True`, only `getuser` requests will be cached on disk. 92 This can be used in to speed up the slow requests (through the 93 cache), while everything else will be handled through online 94 requests. By default `False`. 95 96 Raises 97 ------ 98 requests.exceptions.ConnectionError 99 Raised in case authentication fails. 100 """ 101 self.url = url 102 self.api_key = api_key 103 self.timeout = timeout 104 self.users = {} 105 self.fullname_mapping = {} 106 self.systems = {} 107 self.status = { 108 "auth_state": "NOT_TRIED", 109 "auth_response": None, 110 "auth_httpstatus": -1, 111 } 112 self.cache_path = cache 113 self.cache_users_only = cache_users_only 114 self.last_served_from_cache = False 115 """Indicates if the last request was served from the cache or on-line.""" 116 117 # run in cache-only mode (e.g. for testing or off-line usage) if no API 118 # key has been specified, skip authentication then: 119 if api_key != "": 120 self.__authenticate() 121 elif cache == "": 122 raise RuntimeError( 123 "Neither API key nor cache path given, at least one is required!" 124 )
Constructor for the PPMS connection object.
Open a connection to the PUMAPI defined in url
and try to authenticate
against it using the given API key (or use cache-only mode if key is an
empty string). If an optional path to a caching location is specified,
responses will be read from that location unless no matching file can be
found there, in which case an on-line request will be done (with the
response being saved to the cache path).
Parameters
- url (str): The URL of the PUMAPI to connect to.
- api_key (str): The API key to use for authenticating against the PUMAPI. If specified as '' authentication will be skipped and the connection is running in cache-only (local) mode.
- timeout (float, optional): How many seconds to wait for the PUMAPI server to send a response before giving up, by default 10.
- cache (str, optional): A path to a local directory for caching responses from PUMAPI in individual text files. Useful for testing and for speeding up slow requests like 'getusers'. By default empty, which will result in no caching being done.
- cache_users_only (bool, optional):
If set to
True
, onlygetuser
requests will be cached on disk. This can be used in to speed up the slow requests (through the cache), while everything else will be handled through online requests. By defaultFalse
.
Raises
- requests.exceptions.ConnectionError: Raised in case authentication fails.
182 def request(self, action, parameters={}, skip_cache=False): 183 """Generic method to submit a request to PPMS and return the result. 184 185 This convenience method deals with adding the API key to a given 186 request, submitting it to the PUMAPI and checking the response for some 187 specific keywords indicating an error. 188 189 Parameters 190 ---------- 191 action : str 192 The command to be submitted to the PUMAPI. 193 parameters : dict, optional 194 A dictionary with additional parameters to be submitted with the 195 request. 196 skip_cache : bool, optional 197 If set to True the request will NOT be served from the local cache, 198 independent whether a matching response file exists there, by 199 default False. 200 201 Returns 202 ------- 203 requests.Response 204 The response object created by posting the request. 205 206 Raises 207 ------ 208 requests.exceptions.ConnectionError 209 Raised in case the request is not authorized. 210 """ 211 req_data = {"action": action, "apikey": self.api_key} 212 req_data.update(parameters) 213 # log.debug("Request parameters: {}", parameters) 214 215 response = None 216 try: 217 if skip_cache: # pragma: no cover 218 raise LookupError("Skipping the cache has been requested") 219 response = self.__intercept_read(req_data) 220 self.last_served_from_cache = True 221 except LookupError as err: 222 log.trace(f"Doing an on-line request: {err}") 223 response = requests.post(self.url, data=req_data, timeout=self.timeout) 224 self.last_served_from_cache = False 225 226 # store the response if it hasn't been read from the cache before: 227 if not self.last_served_from_cache: # pragma: no cover 228 self.__intercept_store(req_data, response) 229 230 # NOTE: the HTTP status code returned is always `200` even if 231 # authentication failed, so we need to check the actual response *TEXT* 232 # to figure out if we have succeeded: 233 if "request not authorized" in response.text.lower(): 234 self.status["auth_state"] = "FAILED" 235 msg = f"Not authorized to run action `{req_data['action']}`" 236 log.error(msg) 237 raise requests.exceptions.ConnectionError(msg) 238 239 return response
Generic method to submit a request to PPMS and return the result.
This convenience method deals with adding the API key to a given request, submitting it to the PUMAPI and checking the response for some specific keywords indicating an error.
Parameters
- action (str): The command to be submitted to the PUMAPI.
- parameters (dict, optional): A dictionary with additional parameters to be submitted with the request.
- skip_cache (bool, optional): If set to True the request will NOT be served from the local cache, independent whether a matching response file exists there, by default False.
Returns
- requests.Response: The response object created by posting the request.
Raises
- requests.exceptions.ConnectionError: Raised in case the request is not authorized.
375 def flush_cache(self, keep_users=False): 376 """Flush the PyPPMS on-disk cache. 377 378 Optionally flushes everything *except* the `getuser` cache if the 379 `keep_users` flag is set to `True`, as this is clearly the most 380 time-consuming operation when fetching data from PUMAPI and therefore 381 might want to be retained. 382 383 Please note that the `getusers` cache (plural, including the `s` suffix) 384 will be flushed no matter what, as this is simply a list of user IDs 385 that can be fetched with a single request. In consequence this means 386 that using the `keep_users` flag will allow you to have reasonably fast 387 reaction times while still getting information on *new* users live from 388 PUMAPI at the only cost of possibly having outdated information on 389 *existing* users. 390 391 Parameters 392 ---------- 393 keep_users : bool, optional 394 If set to `True` the `getuser` sub-directory in the cache location 395 will be kept, by default `False`. 396 """ 397 if self.cache_path == "": 398 log.debug("No cache path configured, not flushing!") 399 return 400 401 dirs_to_remove = [self.cache_path] # by default remove the entire cache dir 402 keep_msg = "" 403 if keep_users: 404 keep_msg = " (keeping user details dirs)" 405 dirs_to_remove = [] 406 cache_dirs = os.listdir(self.cache_path) 407 for subdir in cache_dirs: 408 if subdir == "getuser": 409 continue 410 dirs_to_remove.append(os.path.join(self.cache_path, subdir)) 411 412 log.debug("Flushing the on-disk cache at [{}] {}...", self.cache_path, keep_msg) 413 for directory in dirs_to_remove: 414 try: 415 shutil.rmtree(directory) 416 log.trace("Removed directory [{}].", directory) 417 except Exception as ex: # pylint: disable-msg=broad-except 418 log.warning("Removing the cache at [{}] failed: {}", directory, ex)
Flush the PyPPMS on-disk cache.
Optionally flushes everything except the getuser
cache if the
keep_users
flag is set to True
, as this is clearly the most
time-consuming operation when fetching data from PUMAPI and therefore
might want to be retained.
Please note that the getusers
cache (plural, including the s
suffix)
will be flushed no matter what, as this is simply a list of user IDs
that can be fetched with a single request. In consequence this means
that using the keep_users
flag will allow you to have reasonably fast
reaction times while still getting information on new users live from
PUMAPI at the only cost of possibly having outdated information on
existing users.
Parameters
- keep_users (bool, optional):
If set to
True
thegetuser
sub-directory in the cache location will be kept, by defaultFalse
.
420 def get_admins(self): 421 """Get all PPMS administrator users. 422 423 Returns 424 ------- 425 list(pyppms.user.PpmsUser) 426 A list with PpmsUser objects that are PPMS administrators. 427 """ 428 response = self.request("getadmins") 429 430 admins = response.text.splitlines() 431 users = [] 432 for username in admins: 433 user = self.get_user(username) 434 users.append(user) 435 log.trace("{} admins in the PPMS database: {}", len(admins), ", ".join(admins)) 436 return users
Get all PPMS administrator users.
Returns
- list(pyppms.user.PpmsUser): A list with PpmsUser objects that are PPMS administrators.
438 def get_booking(self, system_id, booking_type="get"): 439 """Get the current or next booking of a system. 440 441 WARNING: if the next booking is requested but it is too far in the future, 442 PUMAPI silently ignores it - the response is identical to a system that has no 443 future bookings and there is no error reported either. Currently it is unclear 444 where the cutoff is (e.g. lookups for a booking that is two years from now still 445 work fine, but a booking in about 10 years is silently skipped). 446 447 Parameters 448 ---------- 449 system_id : int or int-like 450 The ID of the system in PPMS. 451 booking_type : {'get', 'next'}, optional 452 The type of booking to request, one of `get` (requesting the 453 currently running booking) and `next` (requesting the next upcoming 454 booking), by default `get`. 455 NOTE: if `next` is requested the resulting booking object will **NOT** have 456 an end time (`endtime` will be `None`) as PUMAPI doesn't provide one in that 457 case! 458 459 Returns 460 ------- 461 pyppms.booking.PpmsBooking or None 462 The booking object, or None if there is no booking for the system or the 463 request is refused by PUMAPI (e.g. "not authorized"). 464 465 Raises 466 ------ 467 ValueError 468 Raised if the specified `booking_type` is invalid. 469 """ 470 valid = ["get", "next"] 471 if booking_type not in valid: 472 raise ValueError( 473 f"Value for 'booking_type' ({booking_type}) not in {valid}!" 474 ) 475 476 try: 477 response = self.request(booking_type + "booking", {"id": system_id}) 478 except requests.exceptions.ConnectionError: 479 log.error("Requesting booking status for system {} failed!", system_id) 480 return None 481 482 desc = "any future bookings" 483 if booking_type == "get": 484 desc = "a currently active booking" 485 if not response.text.strip(): 486 log.trace("System [{}] doesn't have {}", system_id, desc) 487 return None 488 489 return PpmsBooking(response.text, booking_type, system_id)
Get the current or next booking of a system.
WARNING: if the next booking is requested but it is too far in the future, PUMAPI silently ignores it - the response is identical to a system that has no future bookings and there is no error reported either. Currently it is unclear where the cutoff is (e.g. lookups for a booking that is two years from now still work fine, but a booking in about 10 years is silently skipped).
Parameters
- system_id (int or int-like): The ID of the system in PPMS.
- booking_type ({'get', 'next'}, optional):
The type of booking to request, one of
get
(requesting the currently running booking) andnext
(requesting the next upcoming booking), by defaultget
. NOTE: ifnext
is requested the resulting booking object will NOT have an end time (endtime
will beNone
) as PUMAPI doesn't provide one in that case!
Returns
- pyppms.booking.PpmsBooking or None: The booking object, or None if there is no booking for the system or the request is refused by PUMAPI (e.g. "not authorized").
Raises
- ValueError: Raised if the specified
booking_type
is invalid.
491 def get_current_booking(self, system_id): 492 """Wrapper for `get_booking()` with 'booking_type' set to 'get'.""" 493 return self.get_booking(system_id, "get")
Wrapper for get_booking()
with 'booking_type' set to 'get'.
495 def get_group(self, group_id): 496 """Fetch group details from PPMS and create a dict from them. 497 498 Parameters 499 ---------- 500 group_id : str 501 The group's identifier in PPMS, called 'unitlogin' there. 502 503 Returns 504 ------- 505 dict 506 A dict with the group details, keys being derived from the header 507 line of the PUMAPI response, values from the data line. 508 """ 509 response = self.request("getgroup", {"unitlogin": group_id}) 510 log.trace("Group details returned by PPMS (raw): {}", response.text) 511 512 if not response.text: 513 msg = f"Group [{group_id}] is unknown to PPMS" 514 log.error(msg) 515 raise KeyError(msg) 516 517 details = dict_from_single_response(response.text) 518 519 log.trace("Details of group {}: {}", group_id, details) 520 return details
Fetch group details from PPMS and create a dict from them.
Parameters
- group_id (str): The group's identifier in PPMS, called 'unitlogin' there.
Returns
- dict: A dict with the group details, keys being derived from the header line of the PUMAPI response, values from the data line.
522 def get_group_users(self, unitlogin): 523 """Get all members of a group in PPMS. 524 525 Parameters 526 ---------- 527 unitlogin : str 528 The group's login ("unique login or id" in the PPMS web interface). 529 530 Returns 531 ------- 532 list(pyppms.user.PpmsUser) 533 A list with PpmsUser objects that are members of this PPMS group. 534 """ 535 response = self.request("getgroupusers", {"unitlogin": unitlogin}) 536 537 members = response.text.splitlines() 538 users = [] 539 for username in members: 540 user = self.get_user(username) 541 users.append(user) 542 log.trace( 543 "{} members in PPMS group [{}]: {}", 544 len(members), 545 unitlogin, 546 ", ".join(members), 547 ) 548 return users
Get all members of a group in PPMS.
Parameters
- unitlogin (str): The group's login ("unique login or id" in the PPMS web interface).
Returns
- list(pyppms.user.PpmsUser): A list with PpmsUser objects that are members of this PPMS group.
550 def get_groups(self): 551 """Get a list of all groups in PPMS. 552 553 Returns 554 ------- 555 list(str) 556 A list with the group identifiers in PPMS. 557 """ 558 response = self.request("getgroups") 559 560 groups = response.text.splitlines() 561 log.trace("{} groups in the PPMS database: {}", len(groups), ", ".join(groups)) 562 return groups
Get a list of all groups in PPMS.
Returns
- list(str): A list with the group identifiers in PPMS.
564 def get_next_booking(self, system_id): 565 """Wrapper for `get_booking()` with 'booking_type' set to 'next'.""" 566 return self.get_booking(system_id, "next")
Wrapper for get_booking()
with 'booking_type' set to 'next'.
568 def get_running_sheet( 569 self, core_facility_ref, date, ignore_uncached_users=False, localisation="" 570 ): 571 """Get the running sheet for a specific day on the given facility. 572 573 The so-called "running-sheet" consists of all bookings / reservations of 574 a facility on a specifc day. 575 576 WARNING: PUMAPI doesn't return a proper unique user identifier with the 577 'getrunningsheet' request, instead the so called "full name" is given to 578 identify the user - unfortunately this can lead to ambiguities as 579 multiple different accounts can have the same full name. 580 581 Parameters 582 ---------- 583 core_facility_ref : int or int-like 584 The core facility ID for PPMS. 585 date : datetime.datetime 586 The date to request the running sheet for, e.g. ``datetime.now()`` or 587 similar. Note that only the date part is relevant, time will be ignored. 588 ignore_uncached_users : bool, optional 589 If set to `True` any booking for a user that is not present in the instance 590 attribute `fullname_mapping` will be ignored in the resulting list. 591 localisation : str, optional 592 If given, the runningsheet will be limited to systems where the 593 `localisation` (~"room") field matches the given value. 594 595 Returns 596 ------- 597 list(pyppms.booking.PpmsBooking) 598 A list with `PpmsBooking` objects for the given day. Empty in case 599 there are no bookings or parsing the response failed. 600 """ 601 bookings = [] 602 parameters = { 603 "plateformid": f"{core_facility_ref}", 604 "day": date.strftime("%Y-%m-%d"), 605 } 606 log.trace("Requesting runningsheet for {}", parameters["day"]) 607 response = self.request("getrunningsheet", parameters) 608 try: 609 entries = parse_multiline_response(response.text, graceful=False) 610 except NoDataError: 611 # in case no bookings exist the response will be empty! 612 log.trace("Runningsheet for the given day was empty!") 613 return [] 614 except Exception as err: # pylint: disable-msg=broad-except 615 log.error("Parsing runningsheet details failed: {}", err) 616 log.trace("Runningsheet PUMPAI response was: >>>{}<<<", response.text) 617 return [] 618 619 for entry in entries: 620 full = entry["User"] 621 if full not in self.fullname_mapping: 622 if ignore_uncached_users: 623 log.debug(f"Ignoring booking for uncached / unknown user [{full}]") 624 continue 625 626 log.debug(f"Booking refers an uncached user ({full}), updating users!") 627 self.update_users() 628 629 if full not in self.fullname_mapping: 630 log.error("PPMS doesn't seem to know user [{}], skipping", full) 631 continue 632 633 log.trace( 634 f"Booking for user '{self.fullname_mapping[full]}' ({full}) found" 635 ) 636 system_name = entry["Object"] 637 # FIXME: add a test with one system name being a subset of another system 638 # (this will result in more than one result and should be fixed e.g. by 639 # adding an optional parameter "exact" to get_systems_matching() or 640 # similar) 641 system_ids = self.get_systems_matching(localisation, [system_name]) 642 if len(system_ids) < 1: 643 if localisation: 644 log.debug(f"Given criteria return zero systems for [{system_name}]") 645 else: 646 log.warning(f"No systems matching criteria for [{system_name}]") 647 continue 648 649 if len(system_ids) > 1: 650 # NOTE: more than one result should not happen as PPMS doesn't allow for 651 # multiple systems having the same name - no result might happen though! 652 log.error("Ignoring booking for unknown system [{}]", system_name) 653 continue 654 655 booking = PpmsBooking.from_runningsheet( 656 entry, 657 system_ids[0], 658 self.fullname_mapping[full], 659 date, 660 ) 661 bookings.append(booking) 662 663 return bookings
Get the running sheet for a specific day on the given facility.
The so-called "running-sheet" consists of all bookings / reservations of a facility on a specifc day.
WARNING: PUMAPI doesn't return a proper unique user identifier with the 'getrunningsheet' request, instead the so called "full name" is given to identify the user - unfortunately this can lead to ambiguities as multiple different accounts can have the same full name.
Parameters
- core_facility_ref (int or int-like): The core facility ID for PPMS.
- date (datetime.datetime):
The date to request the running sheet for, e.g.
datetime.now()
or similar. Note that only the date part is relevant, time will be ignored. - ignore_uncached_users (bool, optional):
If set to
True
any booking for a user that is not present in the instance attributefullname_mapping
will be ignored in the resulting list. - localisation (str, optional):
If given, the runningsheet will be limited to systems where the
localisation
(~"room") field matches the given value.
Returns
- list(pyppms.booking.PpmsBooking): A list with
PpmsBooking
objects for the given day. Empty in case there are no bookings or parsing the response failed.
665 def get_systems(self, force_refresh=False): 666 """Get a dict with all systems in PPMS. 667 668 Parameters 669 ---------- 670 force_refresh : bool, optional 671 If `True` the list of systems will be refreshed even if the object's 672 attribute `self.systems` is non-empty, by default `False`. Please 673 note that this will NOT skip the on-disk cache in case that exists! 674 675 Returns 676 ------- 677 dict(pyppms.system.PpmsSystem) 678 A dict with `PpmsSystem` objects parsed from the PUMAPI response where 679 the system ID (int) is used as the dict's key. If parsing a system 680 fails for any reason, the system is skipped entirely. 681 """ 682 if self.systems and not force_refresh: 683 log.trace("Using cached details for {} systems", len(self.systems)) 684 else: 685 self.update_systems() 686 687 return self.systems
Get a dict with all systems in PPMS.
Parameters
- force_refresh (bool, optional):
If
True
the list of systems will be refreshed even if the object's attributeself.systems
is non-empty, by defaultFalse
. Please note that this will NOT skip the on-disk cache in case that exists!
Returns
- dict(pyppms.system.PpmsSystem): A dict with
PpmsSystem
objects parsed from the PUMAPI response where the system ID (int) is used as the dict's key. If parsing a system fails for any reason, the system is skipped entirely.
689 def get_systems_matching(self, localisation, name_contains): 690 """Query PPMS for systems with a specific location and name. 691 692 This method assembles a list of PPMS system IDs whose "localisation" 693 (room) field matches a given string and where the system name contains 694 at least one of the strings given as the `name_contains` parameter. 695 696 Parameters 697 ---------- 698 localisation : str 699 A string that the system's "localisation" (i.e. the "Room" field in 700 the PPMS web interface) has to match. Can be an empty string which 701 will result in no filtering being done on the "Room" attribute. 702 name_contains : list(str) 703 A list of valid names (categories) of which the system's name has to 704 match at least one for being included. Supply an empty list for 705 skipping this filter. 706 707 Returns 708 ------- 709 list(int) 710 A list with PPMS system IDs matching all of the given criteria. 711 712 Raises 713 ------ 714 TypeError 715 Raised in case the `name_contains` parameter is of type `str` (it 716 needs to be `list(str)` instead). 717 """ 718 if isinstance(name_contains, str): 719 raise TypeError("`name_contains` must be a list of str, not str!") 720 721 loc = localisation 722 loc_desc = f"with location matching [{localisation}]" 723 if localisation == "": 724 loc_desc = "(no location filter given)" 725 726 log.trace( 727 "Querying PPMS for systems {}, name matching any of {}", 728 loc_desc, 729 name_contains, 730 ) 731 system_ids = [] 732 systems = self.get_systems() 733 for sys_id, system in systems.items(): 734 if loc.lower() not in str(system.localisation).lower(): 735 log.trace( 736 "System [{}] location ({}) is NOT matching ({}), ignoring", 737 system.name, 738 system.localisation, 739 loc, 740 ) 741 continue 742 743 # log.trace('System [{}] is matching location [{}], checking if ' 744 # 'the name is matching any of the valid pattern {}', 745 # system.name, loc, name_contains) 746 for valid_name in name_contains: 747 if valid_name in system.name: 748 log.trace("System [{}] matches all criteria", system.name) 749 system_ids.append(sys_id) 750 break 751 752 # if sys_id not in system_ids: 753 # log.trace('System [{}] does NOT match a valid name: {}', 754 # system.name, name_contains) 755 756 log.trace("Found {} bookable systems {}", len(system_ids), loc_desc) 757 log.trace("IDs of matching bookable systems {}: {}", loc_desc, system_ids) 758 return system_ids
Query PPMS for systems with a specific location and name.
This method assembles a list of PPMS system IDs whose "localisation"
(room) field matches a given string and where the system name contains
at least one of the strings given as the name_contains
parameter.
Parameters
- localisation (str): A string that the system's "localisation" (i.e. the "Room" field in the PPMS web interface) has to match. Can be an empty string which will result in no filtering being done on the "Room" attribute.
- name_contains (list(str)): A list of valid names (categories) of which the system's name has to match at least one for being included. Supply an empty list for skipping this filter.
Returns
- list(int): A list with PPMS system IDs matching all of the given criteria.
Raises
- TypeError: Raised in case the
name_contains
parameter is of typestr
(it needs to belist(str)
instead).
760 def get_user(self, login_name, skip_cache=False): 761 """Fetch user details from PPMS and create a PpmsUser object from it. 762 763 Parameters 764 ---------- 765 login_name : str 766 The user's PPMS login name. 767 skip_cache : bool, optional 768 Passed as-is to the :py:meth:`request()` method 769 770 Returns 771 ------- 772 pyppms.user.PpmsUser 773 The user object created from the PUMAPI response. The object will be 774 additionally stored in the self.users dict using the login_name as 775 the dict's key. 776 777 Raises 778 ------ 779 KeyError 780 Raised if the user doesn't exist in PPMS. 781 """ 782 response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache) 783 784 if not response.text: 785 msg = f"User [{login_name}] is unknown to PPMS" 786 log.debug(msg) 787 raise KeyError(msg) 788 789 user = PpmsUser(response.text) 790 self.users[user.username] = user # update / add to the cached user objs 791 self.fullname_mapping[user.fullname] = user.username 792 return user
Fetch user details from PPMS and create a PpmsUser object from it.
Parameters
- login_name (str): The user's PPMS login name.
- skip_cache (bool, optional):
Passed as-is to the
request()()
method
Returns
- pyppms.user.PpmsUser: The user object created from the PUMAPI response. The object will be additionally stored in the self.users dict using the login_name as the dict's key.
Raises
- KeyError: Raised if the user doesn't exist in PPMS.
794 def get_user_dict(self, login_name, skip_cache=False): 795 """Get details on a given user from PPMS. 796 797 Parameters 798 ---------- 799 login_name : str 800 The PPMS account / login name of the user to query. 801 skip_cache : bool, optional 802 Passed as-is to the :py:meth:`request()` method 803 804 Returns 805 ------- 806 dict 807 A dict with the user details returned by the PUMAPI. 808 809 Example 810 ------- 811 >>> conn.get_user_dict('pyppms') 812 ... { 813 ... u'active': True, 814 ... u'affiliation': u'', 815 ... u'bcode': u'', 816 ... u'email': u'pyppms@python-facility.example', 817 ... u'fname': u'PumAPI', 818 ... u'lname': u'Python', 819 ... u'login': u'pyppms', 820 ... u'mustchbcode': False, 821 ... u'mustchpwd': False', 822 ... u'phone': u'+98 (76) 54 3210', 823 ... u'unitlogin': u'pyppms' 824 ... } 825 826 Raises 827 ------ 828 KeyError 829 Raised in case the user account is unknown to PPMS. 830 ValueError 831 Raised if the user details can't be parsed from the PUMAPI response. 832 """ 833 response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache) 834 835 if not response.text: 836 msg = f"User [{login_name}] is unknown to PPMS" 837 log.error(msg) 838 raise KeyError(msg) 839 840 # EXAMPLE: 841 # response.text = ( 842 # u'login,lname,fname,email,' 843 # u'phone,bcode,affiliation,unitlogin,mustchpwd,mustchbcode,' 844 # u'active\r\n' 845 # u'"pyppms","Python","PumAPI","pyppms@python-facility.example",' 846 # u'"+98 (76) 54 3210","","","pyppms",false,false,' 847 # u'true\r\n' 848 # ) 849 details = dict_from_single_response(response.text) 850 log.trace("Details for user [{}]: {}", login_name, details) 851 return details
Get details on a given user from PPMS.
Parameters
- login_name (str): The PPMS account / login name of the user to query.
- skip_cache (bool, optional):
Passed as-is to the
request()()
method
Returns
- dict: A dict with the user details returned by the PUMAPI.
Example
>>> conn.get_user_dict('pyppms')
... {
... u'active': True,
... u'affiliation': u'',
... u'bcode': u'',
... u'email': u'pyppms@python-facility.example',
... u'fname': u'PumAPI',
... u'lname': u'Python',
... u'login': u'pyppms',
... u'mustchbcode': False,
... u'mustchpwd': False',
... u'phone': u'+98 (76) 54 3210',
... u'unitlogin': u'pyppms'
... }
Raises
- KeyError: Raised in case the user account is unknown to PPMS.
- ValueError: Raised if the user details can't be parsed from the PUMAPI response.
853 def get_user_experience(self, login=None, system_id=None): 854 """Get user experience ("User rights") from PPMS. 855 856 Parameters 857 ---------- 858 login : str, optional 859 An optional login name to request the experience / permissions for, 860 by default None 861 system_id : int, optional 862 An optional system ID to request the experience / permissions for, 863 by default None 864 865 Returns 866 ------- 867 list(dict) 868 A list with dicts parsed from the user experience response. 869 """ 870 data = {} 871 if login is not None: 872 data["login"] = login 873 if system_id is not None: 874 data["id"] = system_id 875 response = self.request("getuserexp", parameters=data) 876 877 parsed = parse_multiline_response(response.text) 878 log.trace( 879 "Received {} experience entries for filters [user:{}] and [id:{}]", 880 len(parsed), 881 login, 882 system_id, 883 ) 884 return parsed
Get user experience ("User rights") from PPMS.
Parameters
- login (str, optional): An optional login name to request the experience / permissions for, by default None
- system_id (int, optional): An optional system ID to request the experience / permissions for, by default None
Returns
- list(dict): A list with dicts parsed from the user experience response.
886 def get_user_ids(self, active=False): 887 """Get a list with all user IDs in the PPMS system. 888 889 Parameters 890 ---------- 891 active : bool, optional 892 Request only users marked as active in PPMS, by default False. 893 NOTE: "active" is a tri-state parameter in PPMS: "true", "false" 894 or empty! 895 896 Returns 897 ------- 898 list 899 A list of all (or active-only) user IDs in PPMS. 900 """ 901 # TODO: describe format of returned list and / or give an example! 902 parameters = {} 903 if active: 904 parameters["active"] = "true" 905 906 response = self.request("getusers", parameters) 907 908 users = response.text.splitlines() 909 active_desc = "active " if active else "" 910 log.trace("{} {}users in the PPMS database", len(users), active_desc) 911 log.trace(", ".join(users)) 912 return users
Get a list with all user IDs in the PPMS system.
Parameters
- active (bool, optional): Request only users marked as active in PPMS, by default False. NOTE: "active" is a tri-state parameter in PPMS: "true", "false" or empty!
Returns
- list: A list of all (or active-only) user IDs in PPMS.
914 def get_users(self, force_refresh=False, active_only=True): 915 """Get user objects for all (or cached) PPMS users. 916 917 Parameters 918 ---------- 919 force_refresh : bool, optional 920 Re-request information from PPMS even if user details have been 921 cached locally before, by default False. 922 active_only : bool, optional 923 If set to `False` also "inactive" users will be fetched from PPMS, 924 by default `True`. 925 926 Returns 927 ------- 928 dict(pyppms.user.PpmsUser) 929 A dict of PpmsUser objects with the username (login) as key. 930 """ 931 if self.users and not force_refresh: 932 log.trace("Using cached details for {} users", len(self.users)) 933 else: 934 self.update_users(active_only=active_only) 935 936 return self.users
Get user objects for all (or cached) PPMS users.
Parameters
- force_refresh (bool, optional): Re-request information from PPMS even if user details have been cached locally before, by default False.
- active_only (bool, optional):
If set to
False
also "inactive" users will be fetched from PPMS, by defaultTrue
.
Returns
- dict(pyppms.user.PpmsUser): A dict of PpmsUser objects with the username (login) as key.
938 def get_users_emails(self, users=None, active=False): 939 """Get a list of user email addresses. WARNING - very slow! 940 941 Parameters 942 ---------- 943 users : list(str), optional 944 A list of login names to retrieve the email addresses for, if 945 omitted addresses for all (or active ones) will be requested. 946 active : bool, optional 947 Request only addresses of users marked as active in PPMS, by default 948 False. Will be ignored if a list of usernames is given explicitly. 949 950 Returns 951 ------- 952 list(str) 953 Email addresses of the users requested. 954 """ 955 emails = [] 956 if users is None: 957 users = self.get_user_ids(active=active) 958 for user in users: 959 email = self.get_user_dict(user)["email"] 960 if not email: 961 log.warning("--- WARNING: no email for user [{}]! ---", user) 962 continue 963 # log.trace("{}: {}", user, email) 964 emails.append(email) 965 966 return emails
Get a list of user email addresses. WARNING - very slow!
Parameters
- users (list(str), optional): A list of login names to retrieve the email addresses for, if omitted addresses for all (or active ones) will be requested.
- active (bool, optional): Request only addresses of users marked as active in PPMS, by default False. Will be ignored if a list of usernames is given explicitly.
Returns
- list(str): Email addresses of the users requested.
968 def get_users_with_access_to_system(self, system_id): 969 """Get a list of usernames allowed to book the system with the given ID. 970 971 Parameters 972 ---------- 973 system_id : int or int-like 974 The ID of the system to query permitted users for. 975 976 Returns 977 ------- 978 list(str) 979 A list of usernames ('login') with permissions to book the system 980 with the given ID in PPMS. 981 982 Raises 983 ------ 984 ValueError 985 Raised in case parsing the response failes for any reason. 986 """ 987 users = [] 988 989 response = self.request("getsysrights", {"id": system_id}) 990 # this response has a unique format, so parse it directly here: 991 try: 992 lines = response.text.splitlines() 993 for line in lines: 994 permission, username = line.split(":") 995 if permission.upper() == "D": 996 log.trace( 997 "User [{}] is deactivated for booking system [{}], skipping", 998 username, 999 system_id, 1000 ) 1001 continue 1002 1003 log.trace( 1004 "User [{}] has permission to book system [{}]", username, system_id 1005 ) 1006 users.append(username) 1007 1008 except Exception as err: 1009 msg = ( 1010 f"Unable to parse data returned by PUMAPI: {response.text} - " 1011 f"ERROR: {err}" 1012 ) 1013 log.error(msg) 1014 raise ValueError(msg) from err 1015 1016 return users
Get a list of usernames allowed to book the system with the given ID.
Parameters
- system_id (int or int-like): The ID of the system to query permitted users for.
Returns
- list(str): A list of usernames ('login') with permissions to book the system with the given ID in PPMS.
Raises
- ValueError: Raised in case parsing the response failes for any reason.
1018 def give_user_access_to_system(self, username, system_id): 1019 """Add permissions for a user to book a given system in PPMS. 1020 1021 Parameters 1022 ---------- 1023 username : str 1024 The username ('login') to allow for booking the system. 1025 system_id : int or int-like 1026 The ID of the system to add the permission for. 1027 1028 Returns 1029 ------- 1030 bool 1031 True in case the given username now has the permissions to book the 1032 system with the specified ID (or if the user already had them 1033 before), False otherwise. 1034 """ 1035 return self.set_system_booking_permissions(username, system_id, "A")
Add permissions for a user to book a given system in PPMS.
Parameters
- username (str): The username ('login') to allow for booking the system.
- system_id (int or int-like): The ID of the system to add the permission for.
Returns
- bool: True in case the given username now has the permissions to book the system with the specified ID (or if the user already had them before), False otherwise.
1037 def new_user( # pylint: disable-msg=too-many-arguments 1038 self, login, lname, fname, email, ppms_group, phone=None, password=None 1039 ): 1040 """Create a new user in PPMS. 1041 1042 The method is asking PPMS to create a new user account with the given details. 1043 In case an account with that login name already exists, it will log a warning 1044 and return without sending any further requests to PPMS. 1045 1046 Parameters 1047 ---------- 1048 login : str 1049 The unique identifier for the user. 1050 lname : str 1051 The last name of the user. 1052 fname : str 1053 The first name of the user. 1054 email : str 1055 The email address of the user. 1056 ppms_group : str 1057 The unique identifier of the primary group of the new user. A new group will 1058 be created if no group with the given name exists. 1059 phone : str, optional 1060 The phone number of the user. 1061 password : str, optional 1062 The password for the user. If no password is set the user will not be able 1063 to log on to PPMS. 1064 1065 Raises 1066 ------ 1067 RuntimeError 1068 Will be raised in case creating the user fails. 1069 """ 1070 if self.user_exists(login): 1071 log.warning("NOT creating user [{}] as it already exists!", login) 1072 return 1073 1074 req_data = { 1075 "login": login, 1076 "lname": lname, 1077 "fname": fname, 1078 "email": email, 1079 "unitlogin": ppms_group, 1080 } 1081 if phone: 1082 req_data["phone"] = phone 1083 if password: 1084 req_data["pwd"] = password 1085 1086 response = self.request("newuser", req_data) 1087 if not "OK newuser" in response.text: 1088 msg = f"Creating new user failed: {response.text}" 1089 log.error(msg) 1090 raise RuntimeError(msg) 1091 1092 log.debug("Created user [{}] in PPMS.", login) 1093 log.trace("Response was: {}", response.text)
Create a new user in PPMS.
The method is asking PPMS to create a new user account with the given details. In case an account with that login name already exists, it will log a warning and return without sending any further requests to PPMS.
Parameters
- login (str): The unique identifier for the user.
- lname (str): The last name of the user.
- fname (str): The first name of the user.
- email (str): The email address of the user.
- ppms_group (str): The unique identifier of the primary group of the new user. A new group will be created if no group with the given name exists.
- phone (str, optional): The phone number of the user.
- password (str, optional): The password for the user. If no password is set the user will not be able to log on to PPMS.
Raises
- RuntimeError: Will be raised in case creating the user fails.
1095 def remove_user_access_from_system(self, username, system_id): 1096 """Remove permissions for a user to book a given system in PPMS. 1097 1098 Parameters 1099 ---------- 1100 username : str 1101 The username ('login') to remove booking permissions on the system. 1102 system_id : int or int-like 1103 The ID of the system to modify the permission for. 1104 1105 Returns 1106 ------- 1107 bool 1108 True in case the given username now has the permissions to book the 1109 system with the specified ID (or if the user already had them 1110 before), False otherwise. 1111 """ 1112 return self.set_system_booking_permissions(username, system_id, "D")
Remove permissions for a user to book a given system in PPMS.
Parameters
- username (str): The username ('login') to remove booking permissions on the system.
- system_id (int or int-like): The ID of the system to modify the permission for.
Returns
- bool: True in case the given username now has the permissions to book the system with the specified ID (or if the user already had them before), False otherwise.
1114 def set_system_booking_permissions(self, login, system_id, permission): 1115 """Set permissions for a user on a given system in PPMS. 1116 1117 Parameters 1118 ---------- 1119 username : str 1120 The username ('login') to allow for booking the system. 1121 system_id : int or int-like 1122 The ID of the system to add the permission for. 1123 permission : {'D', 'A', 'N', 'S'} 1124 The permission level to set for the user, one of: 1125 - ``D`` : deactivated 1126 - ``A`` : autonomous 1127 - ``N`` : novice 1128 - ``S`` : superuser 1129 1130 Returns 1131 ------- 1132 bool 1133 True in case setting permissions for the given username on the 1134 system with the specified ID succeeded (or if the user already had 1135 those permissions before), False otherwise. 1136 """ 1137 1138 def permission_name(shortname): 1139 """Closure to validate a permission level and return its long name. 1140 1141 Parameters 1142 ---------- 1143 shortname : str 1144 A single character defining the permission level. 1145 1146 Returns 1147 ------- 1148 str 1149 The long (human-readable) name of the permission level. 1150 1151 Raises 1152 ------ 1153 KeyError 1154 Raised in case an invalid permission level was given. 1155 """ 1156 mapping = { 1157 "D": "deactivated", 1158 "A": "autonomous", 1159 "N": "novice", 1160 "S": "superuser", 1161 } 1162 try: 1163 return mapping[shortname] 1164 except KeyError as err: 1165 raise KeyError(f"Invalid permission [{shortname}] given") from err 1166 1167 log.debug( 1168 "Setting permission level [{}] for user [{}] on system [{}]", 1169 permission_name(permission), 1170 login, 1171 system_id, 1172 ) 1173 1174 parameters = {"id": system_id, "login": login, "type": permission} 1175 response = self.request("setright", parameters) 1176 1177 # NOTE: the 'setright' action will accept ANY permission type and return 'done' 1178 # on the request, so there is no way to check from the response if setting the 1179 # permission really worked!! 1180 # log.trace('Request returned text: {}', response.text) 1181 if response.text.lower().strip() == "done": 1182 log.trace( 1183 "User [{}] now has permission level [{}] on system [{}]", 1184 login, 1185 permission_name(permission), 1186 system_id, 1187 ) 1188 return True 1189 1190 if "invalid user" in response.text.lower(): 1191 log.warning("User [{}] doesn't seem to exist in PPMS", login) 1192 elif "system right not authorized" in response.text.lower(): 1193 log.error( 1194 "Unable to set permissions for system {}: {}", system_id, response.text 1195 ) 1196 else: 1197 log.error("Unexpected response, assuming request failed: {}", response.text) 1198 1199 return False
Set permissions for a user on a given system in PPMS.
Parameters
- username (str): The username ('login') to allow for booking the system.
- system_id (int or int-like): The ID of the system to add the permission for.
- permission ({'D', 'A', 'N', 'S'}):
The permission level to set for the user, one of:
D
: deactivatedA
: autonomousN
: noviceS
: superuser
Returns
- bool: True in case setting permissions for the given username on the system with the specified ID succeeded (or if the user already had those permissions before), False otherwise.
1201 def update_systems(self): 1202 """Update cached details for all bookable systems from PPMS. 1203 1204 Get the details on all bookable systems from PPMS and store them in the local 1205 cache. If parsing the PUMAPI response for a system fails for any reason, the 1206 system is skipped entirely. 1207 """ 1208 log.trace("Updating list of bookable systems...") 1209 systems = {} 1210 parse_fails = 0 1211 response = self.request("getsystems") 1212 details = parse_multiline_response(response.text, graceful=False) 1213 for detail in details: 1214 try: 1215 system = PpmsSystem(detail) 1216 except ValueError as err: 1217 log.error("Error processing `getsystems` response: {}", err) 1218 parse_fails += 1 1219 continue 1220 1221 systems[system.system_id] = system 1222 1223 log.trace( 1224 "Updated {} bookable systems from PPMS ({} systems failed parsing)", 1225 len(systems), 1226 parse_fails, 1227 ) 1228 1229 self.systems = systems
Update cached details for all bookable systems from PPMS.
Get the details on all bookable systems from PPMS and store them in the local cache. If parsing the PUMAPI response for a system fails for any reason, the system is skipped entirely.
1231 def update_users(self, user_ids=[], active_only=True): 1232 """Update cached details for a list of users from PPMS. 1233 1234 Get the user details on a list of users (or all active ones) from PPMS and store 1235 them in the object's `users` dict. As a side effect, this will also fill the 1236 cache directory in case the object's `cache_path` attribute is set. 1237 1238 WARNING - very slow, especially when the PPMS instance has many users! 1239 1240 Parameters 1241 ---------- 1242 user_ids : list(str), optional 1243 A list of user IDs (login names) to request the cache for, by 1244 default [] which will result in all *active* users to be requested. 1245 active_only : bool, optional 1246 If set to `False` also "inactive" users will be fetched from PPMS, 1247 by default `True`. 1248 """ 1249 if not user_ids: 1250 user_ids = self.get_user_ids(active=active_only) 1251 1252 log.trace("Updating details on {} users", len(user_ids)) 1253 for user_id in user_ids: 1254 self.get_user(user_id, skip_cache=True) 1255 1256 log.debug("Collected details on {} users", len(self.users))
Update cached details for a list of users from PPMS.
Get the user details on a list of users (or all active ones) from PPMS and store
them in the object's users
dict. As a side effect, this will also fill the
cache directory in case the object's cache_path
attribute is set.
WARNING - very slow, especially when the PPMS instance has many users!
Parameters
- user_ids (list(str), optional): A list of user IDs (login names) to request the cache for, by default [] which will result in all active users to be requested.
- active_only (bool, optional):
If set to
False
also "inactive" users will be fetched from PPMS, by defaultTrue
.
1258 def user_exists(self, login): 1259 """Check if an account with the given login name already exists in PPMS. 1260 1261 Parameters 1262 ---------- 1263 login : str 1264 The login name to check for. 1265 1266 Returns 1267 ------- 1268 bool 1269 True in case an account with that name exists in PPMS, false otherwise. 1270 """ 1271 try: 1272 self.get_user(login) 1273 return True 1274 except KeyError: 1275 return False
Check if an account with the given login name already exists in PPMS.
Parameters
- login (str): The login name to check for.
Returns
- bool: True in case an account with that name exists in PPMS, false otherwise.