pyppms.ppms

Core connection module for the PUMAPI communication.

   1"""Core connection module for the PUMAPI communication."""
   2
   3# pylint: disable-msg=dangerous-default-value
   4
   5# NOTE: the "pyppms" package is simply a wrapper for the existing API, so we can't make
   6#       any design decisions here - hence it is pointless to complain about the number
   7#       of instance attributes, public methods or other stuff:
   8# pylint: disable-msg=too-many-instance-attributes
   9# pylint: disable-msg=too-many-public-methods
  10
  11import os
  12import os.path
  13import shutil
  14from io import open
  15
  16import requests
  17from loguru import logger as log
  18
  19from .common import dict_from_single_response, parse_multiline_response
  20from .user import PpmsUser
  21from .system import PpmsSystem
  22from .booking import PpmsBooking
  23from .exceptions import NoDataError
  24
  25
  26class PpmsConnection:
  27
  28    """Connection object to communicate with a PPMS instance.
  29
  30    Attributes
  31    ----------
  32    url : str
  33        The URL of the PUMAPI instance.
  34    api_key : str
  35        The API key used for authenticating against the PUMAPI.
  36    timeout : float
  37        The timeout value used in the ``requests.post`` calls.
  38    cache_path : str
  39        A path to a local directory used for caching responses.
  40    cache_users_only : bool
  41        Flag indicating that only PPMS user details will be stored in the
  42        on-disk cache, nothing else.
  43    last_served_from_cache : bool
  44        Indicates if the last request was served from the cache or on-line.
  45    users : dict
  46        A dict with usernames as keys, mapping to the related
  47        :py:class:`pyppms.user.PpmsUser` object, serves as a cache during the object's
  48        lifetime (can be empty if no calls to :py:meth:`get_user()` have been done yet).
  49    fullname_mapping : dict
  50        A dict mapping a user's *fullname* ("``<LASTNAME> <FIRSTNAME>``") to the
  51        corresponding username. Entries are filled in dynamically by the
  52        :py:meth:`get_user()` method.
  53    systems
  54        A dict with system IDs as keys, mapping to the related
  55        :py:class:`pyppms.system.PpmsSystem` object. Serves as a cache during the
  56        object's lifetime (can be empty if no calls to the :py:meth:`get_systems()` have
  57        been done yet).
  58    status : dict
  59        A dict with keys ``auth_state``, ``auth_response`` and
  60        ``auth_httpstatus``
  61    """
  62
  63    def __init__(self, url, api_key, timeout=10, cache="", cache_users_only=False):
  64        """Constructor for the PPMS connection object.
  65
  66        Open a connection to the PUMAPI defined in `url` and try to authenticate
  67        against it using the given API key (or use cache-only mode if key is an
  68        empty string). If an optional path to a caching location is specified,
  69        responses will be read from that location unless no matching file can be
  70        found there, in which case an on-line request will be done (with the
  71        response being saved to the cache path).
  72
  73        Parameters
  74        ----------
  75        url : str
  76            The URL of the PUMAPI to connect to.
  77        api_key : str
  78            The API key to use for authenticating against the PUMAPI. If
  79            specified as '' authentication will be skipped and the connection is
  80            running in cache-only (local) mode.
  81        timeout : float, optional
  82            How many seconds to wait for the PUMAPI server to send a response
  83            before giving up, by default 10.
  84        cache : str, optional
  85            A path to a local directory for caching responses from PUMAPI in
  86            individual text files. Useful for testing and for speeding up
  87            slow requests like 'getusers'. By default empty, which will result
  88            in no caching being done.
  89        cache_users_only : bool, optional
  90            If set to `True`, only `getuser` requests will be cached on disk.
  91            This can be used in to speed up the slow requests (through the
  92            cache), while everything else will be handled through online
  93            requests. By default `False`.
  94
  95        Raises
  96        ------
  97        requests.exceptions.ConnectionError
  98            Raised in case authentication fails.
  99        """
 100        self.url = url
 101        self.api_key = api_key
 102        self.timeout = timeout
 103        self.users = {}
 104        self.fullname_mapping = {}
 105        self.systems = {}
 106        self.status = {
 107            "auth_state": "NOT_TRIED",
 108            "auth_response": None,
 109            "auth_httpstatus": -1,
 110        }
 111        self.cache_path = cache
 112        self.cache_users_only = cache_users_only
 113        self.last_served_from_cache = False
 114        """Indicates if the last request was served from the cache or on-line."""
 115
 116        # run in cache-only mode (e.g. for testing or off-line usage) if no API
 117        # key has been specified, skip authentication then:
 118        if api_key != "":
 119            self.__authenticate()
 120        elif cache == "":
 121            raise RuntimeError(
 122                "Neither API key nor cache path given, at least one is required!"
 123            )
 124
 125    def __authenticate(self):
 126        """Try to authenticate to PPMS using the `auth` request.
 127
 128        Raises
 129        ------
 130        requests.exceptions.ConnectionError
 131            Raised in case authentication failed for any reason.
 132        """
 133        log.trace(
 134            "Attempting authentication against {} with key [{}...{}]",
 135            self.url,
 136            self.api_key[:2],
 137            self.api_key[-2:],
 138        )
 139        self.status["auth_state"] = "attempting"
 140        response = self.request("auth")
 141        log.trace(f"Authenticate response: {response.text}")
 142        self.status["auth_response"] = response.text
 143        self.status["auth_httpstatus"] = response.status_code
 144
 145        # NOTE: an unauthorized request has already been caught be the request() method
 146        # above. Our legacy code was additionally testing for 'error' in the response
 147        # text - however, it is unclear if PUMAPI ever returns this:
 148        if "error" in response.text.lower():
 149            self.status["auth_state"] = "FAILED-ERROR"
 150            msg = f"Authentication failed with an error: {response.text}"
 151            log.error(msg)
 152            raise requests.exceptions.ConnectionError(msg)
 153
 154        status_ok = requests.codes.ok  # pylint: disable-msg=no-member
 155
 156        if response.status_code != status_ok:
 157            # NOTE: branch excluded from coverage as we don't have a known way
 158            # to produce such a response from the API
 159            log.warning(
 160                "Unexpected combination of response [{}] and status code [{}], it's "
 161                "unclear if authentication succeeded (assuming it didn't)",
 162                response.status_code,
 163                response.text,
 164            )
 165            self.status["auth_state"] = "FAILED-UNKNOWN"
 166
 167            msg = (
 168                f"Authenticating against {self.url} with key "
 169                f"[{self.api_key[:2]}...{self.api_key[-2:]}] FAILED!"
 170            )
 171            log.error(msg)
 172            raise requests.exceptions.ConnectionError(msg)
 173
 174        log.trace(
 175            "Authentication succeeded, response=[{}], http_status=[{}]",
 176            response.text,
 177            response.status_code,
 178        )
 179        self.status["auth_state"] = "good"
 180
 181    def request(self, action, parameters={}, skip_cache=False):
 182        """Generic method to submit a request to PPMS and return the result.
 183
 184        This convenience method deals with adding the API key to a given
 185        request, submitting it to the PUMAPI and checking the response for some
 186        specific keywords indicating an error.
 187
 188        Parameters
 189        ----------
 190        action : str
 191            The command to be submitted to the PUMAPI.
 192        parameters : dict, optional
 193            A dictionary with additional parameters to be submitted with the
 194            request.
 195        skip_cache : bool, optional
 196            If set to True the request will NOT be served from the local cache,
 197            independent whether a matching response file exists there, by
 198            default False.
 199
 200        Returns
 201        -------
 202        requests.Response
 203            The response object created by posting the request.
 204
 205        Raises
 206        ------
 207        requests.exceptions.ConnectionError
 208            Raised in case the request is not authorized.
 209        """
 210        req_data = {"action": action, "apikey": self.api_key}
 211        req_data.update(parameters)
 212        # log.debug("Request parameters: {}", parameters)
 213
 214        response = None
 215        try:
 216            if skip_cache:  # pragma: no cover
 217                raise LookupError("Skipping the cache has been requested")
 218            response = self.__intercept_read(req_data)
 219            self.last_served_from_cache = True
 220        except LookupError as err:
 221            log.trace(f"Doing an on-line request: {err}")
 222            response = requests.post(self.url, data=req_data, timeout=self.timeout)
 223            self.last_served_from_cache = False
 224
 225        # store the response if it hasn't been read from the cache before:
 226        if not self.last_served_from_cache:  # pragma: no cover
 227            self.__intercept_store(req_data, response)
 228
 229        # NOTE: the HTTP status code returned is always `200` even if
 230        # authentication failed, so we need to check the actual response *TEXT*
 231        # to figure out if we have succeeded:
 232        if "request not authorized" in response.text.lower():
 233            self.status["auth_state"] = "FAILED"
 234            msg = f"Not authorized to run action `{req_data['action']}`"
 235            log.error(msg)
 236            raise requests.exceptions.ConnectionError(msg)
 237
 238        return response
 239
 240    def __interception_path(self, req_data, create_dir=False):
 241        """Derive the path for a local cache file from a request's parameters.
 242
 243        Parameters
 244        ----------
 245        req_data : dict
 246            The request's parameters, used to derive the name of the cache file.
 247        create_dir : bool, optional
 248            If set to True the cache directory will be created if necessary.
 249            Useful when adding responses to the cache. By default False.
 250
 251        Returns
 252        -------
 253        str
 254            The full path to a file name identified by all parameters of the
 255            request (except credentials like 'apikey').
 256        """
 257        action = req_data["action"]
 258
 259        if self.cache_users_only and action != "getuser":
 260            log.trace(f"NOT caching '{action}' (cache_users_only is set)")
 261            return None
 262
 263        intercept_dir = os.path.join(self.cache_path, action)
 264        if create_dir and not os.path.exists(intercept_dir):  # pragma: no cover
 265            try:
 266                os.makedirs(intercept_dir)
 267                log.trace(f"Created dir to store response: {intercept_dir}")
 268            except Exception as err:  # pylint: disable-msg=broad-except
 269                log.warning(f"Failed creating [{intercept_dir}]: {err}")
 270                return None
 271
 272        signature = ""
 273        # different python versions are returning dict items in different order, so
 274        # simply iterating over them will not always produce the same result - hence we
 275        # build up a sorted list of keys first and use that one then:
 276        keylist = list(req_data.keys())
 277        keylist.sort()
 278        for key in keylist:
 279            if key in ["action", "apikey"]:
 280                continue
 281            signature += f"__{key}--{req_data[key]}"
 282        if signature == "":
 283            signature = "__response"
 284        signature = signature[2:] + ".txt"
 285        intercept_file = os.path.join(intercept_dir, signature)
 286        return intercept_file
 287
 288    def __intercept_read(self, req_data):
 289        """Try to read a cached response from a local file.
 290
 291        Parameters
 292        ----------
 293        req_data : dict
 294            The request's parameters, used to derive the name of the cache file.
 295
 296        Returns
 297        -------
 298        PseudoResponse
 299            The response text read from the cache file wrapped in a
 300            PseudoResponse object, or None in case no matching file was found in
 301            the local cache.
 302
 303        Raises
 304        ------
 305        LookupError
 306            Raised in case no cache path has been set or no cache file matching
 307            the request parameters could be found in the cache.
 308        """
 309
 310        # pylint: disable-msg=too-few-public-methods
 311        class PseudoResponse:
 312            """Dummy response object with attribs 'text' and 'status_code'."""
 313
 314            def __init__(self, text, status_code):
 315                self.text = text
 316                self.status_code = int(status_code)
 317
 318        if self.cache_path == "":
 319            raise LookupError("No cache path configured")
 320
 321        intercept_file = self.__interception_path(req_data, create_dir=False)
 322        if not intercept_file or not os.path.exists(intercept_file):  # pragma: no cover
 323            raise LookupError(f"No cache hit for [{intercept_file}]")
 324
 325        with open(intercept_file, "r", encoding="utf-8") as infile:
 326            text = infile.read()
 327        log.debug(
 328            "Read intercepted response text from [{}]",
 329            intercept_file[len(str(self.cache_path)) :],
 330        )
 331
 332        status_code = 200
 333        status_file = os.path.splitext(intercept_file)[0] + "_status-code.txt"
 334        if os.path.exists(status_file):
 335            with open(status_file, "r", encoding="utf-8") as infile:
 336                status_code = infile.read()
 337            log.debug(f"Read intercepted response status code from [{status_file}]")
 338        return PseudoResponse(text, status_code)
 339
 340    def __intercept_store(self, req_data, response):  # pragma: no cover
 341        """Store the response in a local cache file named after the request.
 342
 343        Parameters
 344        ----------
 345        req_data : dict
 346            The request's parameters, used to derive the name of the cache file
 347            so it can be matched later when running the same request again.
 348        response : requests.Response
 349            The response object to store in the local cache.
 350        """
 351        # NOTE: this method is excluded from coverage measurements as it can only be
 352        # triggered when testing in online mode with at least one request not being
 353        # served from the cache (which is orthogonal to off-line testing)
 354        if self.cache_path == "":
 355            return
 356
 357        intercept_file = self.__interception_path(req_data, create_dir=True)
 358        if not intercept_file:
 359            log.trace("Not storing intercepted results in cache.")
 360            return
 361
 362        try:
 363            with open(intercept_file, "w", encoding="utf-8") as outfile:
 364                outfile.write(response.text)
 365            log.debug(
 366                "Wrote response text to [{}] ({} lines)",
 367                intercept_file,
 368                len(response.text.splitlines()),
 369            )
 370        except Exception as err:  # pylint: disable-msg=broad-except
 371            log.error("Storing response text in [{}] failed: {}", intercept_file, err)
 372            log.error("Response text was:\n--------\n{}\n--------", response.text)
 373
 374    def flush_cache(self, keep_users=False):
 375        """Flush the PyPPMS on-disk cache.
 376
 377        Optionally flushes everything *except* the `getuser` cache if the
 378        `keep_users` flag is set to `True`, as this is clearly the most
 379        time-consuming operation when fetching data from PUMAPI and therefore
 380        might want to be retained.
 381
 382        Please note that the `getusers` cache (plural, including the `s` suffix)
 383        will be flushed no matter what, as this is simply a list of user IDs
 384        that can be fetched with a single request. In consequence this means
 385        that using the `keep_users` flag will allow you to have reasonably fast
 386        reaction times while still getting information on *new* users live from
 387        PUMAPI at the only cost of possibly having outdated information on
 388        *existing* users.
 389
 390        Parameters
 391        ----------
 392        keep_users : bool, optional
 393            If set to `True` the `getuser` sub-directory in the cache location
 394            will be kept, by default `False`.
 395        """
 396        if self.cache_path == "":
 397            log.debug("No cache path configured, not flushing!")
 398            return
 399
 400        dirs_to_remove = [self.cache_path]  # by default remove the entire cache dir
 401        keep_msg = ""
 402        if keep_users:
 403            keep_msg = " (keeping user details dirs)"
 404            dirs_to_remove = []
 405            cache_dirs = os.listdir(self.cache_path)
 406            for subdir in cache_dirs:
 407                if subdir == "getuser":
 408                    continue
 409                dirs_to_remove.append(os.path.join(self.cache_path, subdir))
 410
 411        log.debug("Flushing the on-disk cache at [{}] {}...", self.cache_path, keep_msg)
 412        for directory in dirs_to_remove:
 413            try:
 414                shutil.rmtree(directory)
 415                log.trace("Removed directory [{}].", directory)
 416            except Exception as ex:  # pylint: disable-msg=broad-except
 417                log.warning("Removing the cache at [{}] failed: {}", directory, ex)
 418
 419    def get_admins(self):
 420        """Get all PPMS administrator users.
 421
 422        Returns
 423        -------
 424        list(pyppms.user.PpmsUser)
 425            A list with PpmsUser objects that are PPMS administrators.
 426        """
 427        response = self.request("getadmins")
 428
 429        admins = response.text.splitlines()
 430        users = []
 431        for username in admins:
 432            user = self.get_user(username)
 433            users.append(user)
 434        log.trace("{} admins in the PPMS database: {}", len(admins), ", ".join(admins))
 435        return users
 436
 437    def get_booking(self, system_id, booking_type="get"):
 438        """Get the current or next booking of a system.
 439
 440        WARNING: if the next booking is requested but it is too far in the future,
 441        PUMAPI silently ignores it - the response is identical to a system that has no
 442        future bookings and there is no error reported either. Currently it is unclear
 443        where the cutoff is (e.g. lookups for a booking that is two years from now still
 444        work fine, but a booking in about 10 years is silently skipped).
 445
 446        Parameters
 447        ----------
 448        system_id : int or int-like
 449            The ID of the system in PPMS.
 450        booking_type : {'get', 'next'}, optional
 451            The type of booking to request, one of `get` (requesting the
 452            currently running booking) and `next` (requesting the next upcoming
 453            booking), by default `get`.
 454            NOTE: if `next` is requested the resulting booking object will **NOT** have
 455            an end time (`endtime` will be `None`) as PUMAPI doesn't provide one in that
 456            case!
 457
 458        Returns
 459        -------
 460        pyppms.booking.PpmsBooking or None
 461            The booking object, or None if there is no booking for the system or the
 462            request is refused by PUMAPI (e.g. "not authorized").
 463
 464        Raises
 465        ------
 466        ValueError
 467            Raised if the specified `booking_type` is invalid.
 468        """
 469        valid = ["get", "next"]
 470        if booking_type not in valid:
 471            raise ValueError(
 472                f"Value for 'booking_type' ({booking_type}) not in {valid}!"
 473            )
 474
 475        try:
 476            response = self.request(booking_type + "booking", {"id": system_id})
 477        except requests.exceptions.ConnectionError:
 478            log.error("Requesting booking status for system {} failed!", system_id)
 479            return None
 480
 481        desc = "any future bookings"
 482        if booking_type == "get":
 483            desc = "a currently active booking"
 484        if not response.text.strip():
 485            log.trace("System [{}] doesn't have {}", system_id, desc)
 486            return None
 487
 488        return PpmsBooking(response.text, booking_type, system_id)
 489
 490    def get_current_booking(self, system_id):
 491        """Wrapper for `get_booking()` with 'booking_type' set to 'get'."""
 492        return self.get_booking(system_id, "get")
 493
 494    def get_group(self, group_id):
 495        """Fetch group details from PPMS and create a dict from them.
 496
 497        Parameters
 498        ----------
 499        group_id : str
 500            The group's identifier in PPMS, called 'unitlogin' there.
 501
 502        Returns
 503        -------
 504        dict
 505            A dict with the group details, keys being derived from the header
 506            line of the PUMAPI response, values from the data line.
 507        """
 508        response = self.request("getgroup", {"unitlogin": group_id})
 509        log.trace("Group details returned by PPMS (raw): {}", response.text)
 510
 511        if not response.text:
 512            msg = f"Group [{group_id}] is unknown to PPMS"
 513            log.error(msg)
 514            raise KeyError(msg)
 515
 516        details = dict_from_single_response(response.text)
 517
 518        log.trace("Details of group {}: {}", group_id, details)
 519        return details
 520
 521    def get_group_users(self, unitlogin):
 522        """Get all members of a group in PPMS.
 523
 524        Parameters
 525        ----------
 526        unitlogin : str
 527            The group's login ("unique login or id" in the PPMS web interface).
 528
 529        Returns
 530        -------
 531        list(pyppms.user.PpmsUser)
 532            A list with PpmsUser objects that are members of this PPMS group.
 533        """
 534        response = self.request("getgroupusers", {"unitlogin": unitlogin})
 535
 536        members = response.text.splitlines()
 537        users = []
 538        for username in members:
 539            user = self.get_user(username)
 540            users.append(user)
 541        log.trace(
 542            "{} members in PPMS group [{}]: {}",
 543            len(members),
 544            unitlogin,
 545            ", ".join(members),
 546        )
 547        return users
 548
 549    def get_groups(self):
 550        """Get a list of all groups in PPMS.
 551
 552        Returns
 553        -------
 554        list(str)
 555            A list with the group identifiers in PPMS.
 556        """
 557        response = self.request("getgroups")
 558
 559        groups = response.text.splitlines()
 560        log.trace("{} groups in the PPMS database: {}", len(groups), ", ".join(groups))
 561        return groups
 562
 563    def get_next_booking(self, system_id):
 564        """Wrapper for `get_booking()` with 'booking_type' set to 'next'."""
 565        return self.get_booking(system_id, "next")
 566
 567    def get_running_sheet(
 568        self, core_facility_ref, date, ignore_uncached_users=False, localisation=""
 569    ):
 570        """Get the running sheet for a specific day on the given facility.
 571
 572        The so-called "running-sheet" consists of all bookings / reservations of
 573        a facility on a specifc day.
 574
 575        WARNING: PUMAPI doesn't return a proper unique user identifier with the
 576        'getrunningsheet' request, instead the so called "full name" is given to
 577        identify the user - unfortunately this can lead to ambiguities as
 578        multiple different accounts can have the same full name.
 579
 580        Parameters
 581        ----------
 582        core_facility_ref : int or int-like
 583            The core facility ID for PPMS.
 584        date : datetime.datetime
 585            The date to request the running sheet for, e.g. ``datetime.now()`` or
 586            similar. Note that only the date part is relevant, time will be ignored.
 587        ignore_uncached_users : bool, optional
 588            If set to `True` any booking for a user that is not present in the instance
 589            attribute `fullname_mapping` will be ignored in the resulting list.
 590        localisation : str, optional
 591            If given, the runningsheet will be limited to systems where the
 592            `localisation` (~"room") field matches the given value.
 593
 594        Returns
 595        -------
 596        list(pyppms.booking.PpmsBooking)
 597            A list with `PpmsBooking` objects for the given day. Empty in case
 598            there are no bookings or parsing the response failed.
 599        """
 600        bookings = []
 601        parameters = {
 602            "plateformid": f"{core_facility_ref}",
 603            "day": date.strftime("%Y-%m-%d"),
 604        }
 605        log.trace("Requesting runningsheet for {}", parameters["day"])
 606        response = self.request("getrunningsheet", parameters)
 607        try:
 608            entries = parse_multiline_response(response.text, graceful=False)
 609        except NoDataError:
 610            # in case no bookings exist the response will be empty!
 611            log.trace("Runningsheet for the given day was empty!")
 612            return []
 613        except Exception as err:  # pylint: disable-msg=broad-except
 614            log.error("Parsing runningsheet details failed: {}", err)
 615            log.trace("Runningsheet PUMPAI response was: >>>{}<<<", response.text)
 616            return []
 617
 618        for entry in entries:
 619            full = entry["User"]
 620            if full not in self.fullname_mapping:
 621                if ignore_uncached_users:
 622                    log.debug(f"Ignoring booking for uncached / unknown user [{full}]")
 623                    continue
 624
 625                log.debug(f"Booking refers an uncached user ({full}), updating users!")
 626                self.update_users()
 627
 628            if full not in self.fullname_mapping:
 629                log.error("PPMS doesn't seem to know user [{}], skipping", full)
 630                continue
 631
 632            log.trace(
 633                f"Booking for user '{self.fullname_mapping[full]}' ({full}) found"
 634            )
 635            system_name = entry["Object"]
 636            # FIXME: add a test with one system name being a subset of another system
 637            # (this will result in more than one result and should be fixed e.g. by
 638            # adding an optional parameter "exact" to get_systems_matching() or
 639            # similar)
 640            system_ids = self.get_systems_matching(localisation, [system_name])
 641            if len(system_ids) < 1:
 642                if localisation:
 643                    log.debug(f"Given criteria return zero systems for [{system_name}]")
 644                else:
 645                    log.warning(f"No systems matching criteria for [{system_name}]")
 646                continue
 647
 648            if len(system_ids) > 1:
 649                # NOTE: more than one result should not happen as PPMS doesn't allow for
 650                # multiple systems having the same name - no result might happen though!
 651                log.error("Ignoring booking for unknown system [{}]", system_name)
 652                continue
 653
 654            booking = PpmsBooking.from_runningsheet(
 655                entry,
 656                system_ids[0],
 657                self.fullname_mapping[full],
 658                date,
 659            )
 660            bookings.append(booking)
 661
 662        return bookings
 663
 664    def get_systems(self, force_refresh=False):
 665        """Get a dict with all systems in PPMS.
 666
 667        Parameters
 668        ----------
 669        force_refresh : bool, optional
 670            If `True` the list of systems will be refreshed even if the object's
 671            attribute `self.systems` is non-empty, by default `False`. Please
 672            note that this will NOT skip the on-disk cache in case that exists!
 673
 674        Returns
 675        -------
 676        dict(pyppms.system.PpmsSystem)
 677            A dict with `PpmsSystem` objects parsed from the PUMAPI response where
 678            the system ID (int) is used as the dict's key. If parsing a system
 679            fails for any reason, the system is skipped entirely.
 680        """
 681        if self.systems and not force_refresh:
 682            log.trace("Using cached details for {} systems", len(self.systems))
 683        else:
 684            self.update_systems()
 685
 686        return self.systems
 687
 688    def get_systems_matching(self, localisation, name_contains):
 689        """Query PPMS for systems with a specific location and name.
 690
 691        This method assembles a list of PPMS system IDs whose "localisation"
 692        (room) field matches a given string and where the system name contains
 693        at least one of the strings given as the `name_contains` parameter.
 694
 695        Parameters
 696        ----------
 697        localisation : str
 698            A string that the system's "localisation" (i.e. the "Room" field in
 699            the PPMS web interface) has to match. Can be an empty string which
 700            will result in no filtering being done on the "Room" attribute.
 701        name_contains : list(str)
 702            A list of valid names (categories) of which the system's name has to
 703            match at least one for being included. Supply an empty list for
 704            skipping this filter.
 705
 706        Returns
 707        -------
 708        list(int)
 709            A list with PPMS system IDs matching all of the given criteria.
 710
 711        Raises
 712        ------
 713        TypeError
 714            Raised in case the `name_contains` parameter is of type `str` (it
 715            needs to be `list(str)` instead).
 716        """
 717        if isinstance(name_contains, str):
 718            raise TypeError("`name_contains` must be a list of str, not str!")
 719
 720        loc = localisation
 721        loc_desc = f"with location matching [{localisation}]"
 722        if localisation == "":
 723            loc_desc = "(no location filter given)"
 724
 725        log.trace(
 726            "Querying PPMS for systems {}, name matching any of {}",
 727            loc_desc,
 728            name_contains,
 729        )
 730        system_ids = []
 731        systems = self.get_systems()
 732        for sys_id, system in systems.items():
 733            if loc.lower() not in str(system.localisation).lower():
 734                log.trace(
 735                    "System [{}] location ({}) is NOT matching ({}), ignoring",
 736                    system.name,
 737                    system.localisation,
 738                    loc,
 739                )
 740                continue
 741
 742            # log.trace('System [{}] is matching location [{}], checking if '
 743            #           'the name is matching any of the valid pattern {}',
 744            #           system.name, loc, name_contains)
 745            for valid_name in name_contains:
 746                if valid_name in system.name:
 747                    log.trace("System [{}] matches all criteria", system.name)
 748                    system_ids.append(sys_id)
 749                    break
 750
 751            # if sys_id not in system_ids:
 752            #     log.trace('System [{}] does NOT match a valid name: {}',
 753            #               system.name, name_contains)
 754
 755        log.trace("Found {} bookable systems {}", len(system_ids), loc_desc)
 756        log.trace("IDs of matching bookable systems {}: {}", loc_desc, system_ids)
 757        return system_ids
 758
 759    def get_user(self, login_name, skip_cache=False):
 760        """Fetch user details from PPMS and create a PpmsUser object from it.
 761
 762        Parameters
 763        ----------
 764        login_name : str
 765            The user's PPMS login name.
 766        skip_cache : bool, optional
 767            Passed as-is to the :py:meth:`request()` method
 768
 769        Returns
 770        -------
 771        pyppms.user.PpmsUser
 772            The user object created from the PUMAPI response. The object will be
 773            additionally stored in the self.users dict using the login_name as
 774            the dict's key.
 775
 776        Raises
 777        ------
 778        KeyError
 779            Raised if the user doesn't exist in PPMS.
 780        """
 781        response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache)
 782
 783        if not response.text:
 784            msg = f"User [{login_name}] is unknown to PPMS"
 785            log.debug(msg)
 786            raise KeyError(msg)
 787
 788        user = PpmsUser(response.text)
 789        self.users[user.username] = user  # update / add to the cached user objs
 790        self.fullname_mapping[user.fullname] = user.username
 791        return user
 792
 793    def get_user_dict(self, login_name, skip_cache=False):
 794        """Get details on a given user from PPMS.
 795
 796        Parameters
 797        ----------
 798        login_name : str
 799            The PPMS account / login name of the user to query.
 800        skip_cache : bool, optional
 801            Passed as-is to the :py:meth:`request()` method
 802
 803        Returns
 804        -------
 805        dict
 806            A dict with the user details returned by the PUMAPI.
 807
 808        Example
 809        -------
 810        >>> conn.get_user_dict('pyppms')
 811        ... {
 812        ...     u'active': True,
 813        ...     u'affiliation': u'',
 814        ...     u'bcode': u'',
 815        ...     u'email': u'pyppms@python-facility.example',
 816        ...     u'fname': u'PumAPI',
 817        ...     u'lname': u'Python',
 818        ...     u'login': u'pyppms',
 819        ...     u'mustchbcode': False,
 820        ...     u'mustchpwd': False',
 821        ...     u'phone': u'+98 (76) 54 3210',
 822        ...     u'unitlogin': u'pyppms'
 823        ... }
 824
 825        Raises
 826        ------
 827        KeyError
 828            Raised in case the user account is unknown to PPMS.
 829        ValueError
 830            Raised if the user details can't be parsed from the PUMAPI response.
 831        """
 832        response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache)
 833
 834        if not response.text:
 835            msg = f"User [{login_name}] is unknown to PPMS"
 836            log.error(msg)
 837            raise KeyError(msg)
 838
 839        # EXAMPLE:
 840        # response.text = (
 841        #     u'login,lname,fname,email,'
 842        #     u'phone,bcode,affiliation,unitlogin,mustchpwd,mustchbcode,'
 843        #     u'active\r\n'
 844        #     u'"pyppms","Python","PumAPI","pyppms@python-facility.example",'
 845        #     u'"+98 (76) 54 3210","","","pyppms",false,false,'
 846        #     u'true\r\n'
 847        # )
 848        details = dict_from_single_response(response.text)
 849        log.trace("Details for user [{}]: {}", login_name, details)
 850        return details
 851
 852    def get_user_experience(self, login=None, system_id=None):
 853        """Get user experience ("User rights") from PPMS.
 854
 855        Parameters
 856        ----------
 857        login : str, optional
 858            An optional login name to request the experience / permissions for,
 859            by default None
 860        system_id : int, optional
 861            An optional system ID to request the experience / permissions for,
 862            by default None
 863
 864        Returns
 865        -------
 866        list(dict)
 867            A list with dicts parsed from the user experience response.
 868        """
 869        data = {}
 870        if login is not None:
 871            data["login"] = login
 872        if system_id is not None:
 873            data["id"] = system_id
 874        response = self.request("getuserexp", parameters=data)
 875
 876        parsed = parse_multiline_response(response.text)
 877        log.trace(
 878            "Received {} experience entries for filters [user:{}] and [id:{}]",
 879            len(parsed),
 880            login,
 881            system_id,
 882        )
 883        return parsed
 884
 885    def get_user_ids(self, active=False):
 886        """Get a list with all user IDs in the PPMS system.
 887
 888        Parameters
 889        ----------
 890        active : bool, optional
 891            Request only users marked as active in PPMS, by default False.
 892            NOTE: "active" is a tri-state parameter in PPMS: "true", "false"
 893            or empty!
 894
 895        Returns
 896        -------
 897        list
 898            A list of all (or active-only) user IDs in PPMS.
 899        """
 900        # TODO: describe format of returned list and / or give an example!
 901        parameters = {}
 902        if active:
 903            parameters["active"] = "true"
 904
 905        response = self.request("getusers", parameters)
 906
 907        users = response.text.splitlines()
 908        active_desc = "active " if active else ""
 909        log.trace("{} {}users in the PPMS database", len(users), active_desc)
 910        log.trace(", ".join(users))
 911        return users
 912
 913    def get_users(self, force_refresh=False, active_only=True):
 914        """Get user objects for all (or cached) PPMS users.
 915
 916        Parameters
 917        ----------
 918        force_refresh : bool, optional
 919            Re-request information from PPMS even if user details have been
 920            cached locally before, by default False.
 921        active_only : bool, optional
 922            If set to `False` also "inactive" users will be fetched from PPMS,
 923            by default `True`.
 924
 925        Returns
 926        -------
 927        dict(pyppms.user.PpmsUser)
 928            A dict of PpmsUser objects with the username (login) as key.
 929        """
 930        if self.users and not force_refresh:
 931            log.trace("Using cached details for {} users", len(self.users))
 932        else:
 933            self.update_users(active_only=active_only)
 934
 935        return self.users
 936
 937    def get_users_emails(self, users=None, active=False):
 938        """Get a list of user email addresses. WARNING - very slow!
 939
 940        Parameters
 941        ----------
 942        users : list(str), optional
 943            A list of login names to retrieve the email addresses for, if
 944            omitted addresses for all (or active ones) will be requested.
 945        active : bool, optional
 946            Request only addresses of users marked as active in PPMS, by default
 947            False. Will be ignored if a list of usernames is given explicitly.
 948
 949        Returns
 950        -------
 951        list(str)
 952            Email addresses of the users requested.
 953        """
 954        emails = []
 955        if users is None:
 956            users = self.get_user_ids(active=active)
 957        for user in users:
 958            email = self.get_user_dict(user)["email"]
 959            if not email:
 960                log.warning("--- WARNING: no email for user [{}]! ---", user)
 961                continue
 962            # log.trace("{}: {}", user, email)
 963            emails.append(email)
 964
 965        return emails
 966
 967    def get_users_with_access_to_system(self, system_id):
 968        """Get a list of usernames allowed to book the system with the given ID.
 969
 970        Parameters
 971        ----------
 972        system_id : int or int-like
 973            The ID of the system to query permitted users for.
 974
 975        Returns
 976        -------
 977        list(str)
 978            A list of usernames ('login') with permissions to book the system
 979            with the given ID in PPMS.
 980
 981        Raises
 982        ------
 983        ValueError
 984            Raised in case parsing the response failes for any reason.
 985        """
 986        users = []
 987
 988        response = self.request("getsysrights", {"id": system_id})
 989        # this response has a unique format, so parse it directly here:
 990        try:
 991            lines = response.text.splitlines()
 992            for line in lines:
 993                permission, username = line.split(":")
 994                if permission.upper() == "D":
 995                    log.trace(
 996                        "User [{}] is deactivated for booking system [{}], skipping",
 997                        username,
 998                        system_id,
 999                    )
1000                    continue
1001
1002                log.trace(
1003                    "User [{}] has permission to book system [{}]", username, system_id
1004                )
1005                users.append(username)
1006
1007        except Exception as err:
1008            msg = (
1009                f"Unable to parse data returned by PUMAPI: {response.text} - "
1010                f"ERROR: {err}"
1011            )
1012            log.error(msg)
1013            raise ValueError(msg) from err
1014
1015        return users
1016
1017    def give_user_access_to_system(self, username, system_id):
1018        """Add permissions for a user to book a given system in PPMS.
1019
1020        Parameters
1021        ----------
1022        username : str
1023            The username ('login') to allow for booking the system.
1024        system_id : int or int-like
1025            The ID of the system to add the permission for.
1026
1027        Returns
1028        -------
1029        bool
1030            True in case the given username now has the permissions to book the
1031            system with the specified ID (or if the user already had them
1032            before), False otherwise.
1033        """
1034        return self.set_system_booking_permissions(username, system_id, "A")
1035
1036    def new_user(  # pylint: disable-msg=too-many-arguments
1037        self, login, lname, fname, email, ppms_group, phone=None, password=None
1038    ):
1039        """Create a new user in PPMS.
1040
1041        The method is asking PPMS to create a new user account with the given details.
1042        In case an account with that login name already exists, it will log a warning
1043        and return without sending any further requests to PPMS.
1044
1045        Parameters
1046        ----------
1047        login : str
1048            The unique identifier for the user.
1049        lname : str
1050            The last name of the user.
1051        fname : str
1052            The first name of the user.
1053        email : str
1054            The email address of the user.
1055        ppms_group : str
1056            The unique identifier of the primary group of the new user. A new group will
1057            be created if no group with the given name exists.
1058        phone : str, optional
1059            The phone number of the user.
1060        password : str, optional
1061            The password for the user. If no password is set the user will not be able
1062            to log on to PPMS.
1063
1064        Raises
1065        ------
1066        RuntimeError
1067            Will be raised in case creating the user fails.
1068        """
1069        if self.user_exists(login):
1070            log.warning("NOT creating user [{}] as it already exists!", login)
1071            return
1072
1073        req_data = {
1074            "login": login,
1075            "lname": lname,
1076            "fname": fname,
1077            "email": email,
1078            "unitlogin": ppms_group,
1079        }
1080        if phone:
1081            req_data["phone"] = phone
1082        if password:
1083            req_data["pwd"] = password
1084
1085        response = self.request("newuser", req_data)
1086        if not "OK newuser" in response.text:
1087            msg = f"Creating new user failed: {response.text}"
1088            log.error(msg)
1089            raise RuntimeError(msg)
1090
1091        log.debug("Created user [{}] in PPMS.", login)
1092        log.trace("Response was: {}", response.text)
1093
1094    def remove_user_access_from_system(self, username, system_id):
1095        """Remove permissions for a user to book a given system in PPMS.
1096
1097        Parameters
1098        ----------
1099        username : str
1100            The username ('login') to remove booking permissions on the system.
1101        system_id : int or int-like
1102            The ID of the system to modify the permission for.
1103
1104        Returns
1105        -------
1106        bool
1107            True in case the given username now has the permissions to book the
1108            system with the specified ID (or if the user already had them
1109            before), False otherwise.
1110        """
1111        return self.set_system_booking_permissions(username, system_id, "D")
1112
1113    def set_system_booking_permissions(self, login, system_id, permission):
1114        """Set permissions for a user on a given system in PPMS.
1115
1116        Parameters
1117        ----------
1118        username : str
1119            The username ('login') to allow for booking the system.
1120        system_id : int or int-like
1121            The ID of the system to add the permission for.
1122        permission : {'D', 'A', 'N', 'S'}
1123            The permission level to set for the user, one of:
1124              - ``D`` : deactivated
1125              - ``A`` : autonomous
1126              - ``N`` : novice
1127              - ``S`` : superuser
1128
1129        Returns
1130        -------
1131        bool
1132            True in case setting permissions for the given username on the
1133            system with the specified ID succeeded (or if the user already had
1134            those permissions before), False otherwise.
1135        """
1136
1137        def permission_name(shortname):
1138            """Closure to validate a permission level and return its long name.
1139
1140            Parameters
1141            ----------
1142            shortname : str
1143                A single character defining the permission level.
1144
1145            Returns
1146            -------
1147            str
1148                The long (human-readable) name of the permission level.
1149
1150            Raises
1151            ------
1152            KeyError
1153                Raised in case an invalid permission level was given.
1154            """
1155            mapping = {
1156                "D": "deactivated",
1157                "A": "autonomous",
1158                "N": "novice",
1159                "S": "superuser",
1160            }
1161            try:
1162                return mapping[shortname]
1163            except KeyError as err:
1164                raise KeyError(f"Invalid permission [{shortname}] given") from err
1165
1166        log.debug(
1167            "Setting permission level [{}] for user [{}] on system [{}]",
1168            permission_name(permission),
1169            login,
1170            system_id,
1171        )
1172
1173        parameters = {"id": system_id, "login": login, "type": permission}
1174        response = self.request("setright", parameters)
1175
1176        # NOTE: the 'setright' action will accept ANY permission type and return 'done'
1177        # on the request, so there is no way to check from the response if setting the
1178        # permission really worked!!
1179        # log.trace('Request returned text: {}', response.text)
1180        if response.text.lower().strip() == "done":
1181            log.trace(
1182                "User [{}] now has permission level [{}] on system [{}]",
1183                login,
1184                permission_name(permission),
1185                system_id,
1186            )
1187            return True
1188
1189        if "invalid user" in response.text.lower():
1190            log.warning("User [{}] doesn't seem to exist in PPMS", login)
1191        elif "system right not authorized" in response.text.lower():
1192            log.error(
1193                "Unable to set permissions for system {}: {}", system_id, response.text
1194            )
1195        else:
1196            log.error("Unexpected response, assuming request failed: {}", response.text)
1197
1198        return False
1199
1200    def update_systems(self):
1201        """Update cached details for all bookable systems from PPMS.
1202
1203        Get the details on all bookable systems from PPMS and store them in the local
1204        cache. If parsing the PUMAPI response for a system fails for any reason, the
1205        system is skipped entirely.
1206        """
1207        log.trace("Updating list of bookable systems...")
1208        systems = {}
1209        parse_fails = 0
1210        response = self.request("getsystems")
1211        details = parse_multiline_response(response.text, graceful=False)
1212        for detail in details:
1213            try:
1214                system = PpmsSystem(detail)
1215            except ValueError as err:
1216                log.error("Error processing `getsystems` response: {}", err)
1217                parse_fails += 1
1218                continue
1219
1220            systems[system.system_id] = system
1221
1222        log.trace(
1223            "Updated {} bookable systems from PPMS ({} systems failed parsing)",
1224            len(systems),
1225            parse_fails,
1226        )
1227
1228        self.systems = systems
1229
1230    def update_users(self, user_ids=[], active_only=True):
1231        """Update cached details for a list of users from PPMS.
1232
1233        Get the user details on a list of users (or all active ones) from PPMS and store
1234        them in the object's `users` dict. As a side effect, this will also fill the
1235        cache directory in case the object's `cache_path` attribute is set.
1236
1237        WARNING - very slow, especially when the PPMS instance has many users!
1238
1239        Parameters
1240        ----------
1241        user_ids : list(str), optional
1242            A list of user IDs (login names) to request the cache for, by
1243            default [] which will result in all *active* users to be requested.
1244        active_only : bool, optional
1245            If set to `False` also "inactive" users will be fetched from PPMS,
1246            by default `True`.
1247        """
1248        if not user_ids:
1249            user_ids = self.get_user_ids(active=active_only)
1250
1251        log.trace("Updating details on {} users", len(user_ids))
1252        for user_id in user_ids:
1253            self.get_user(user_id, skip_cache=True)
1254
1255        log.debug("Collected details on {} users", len(self.users))
1256
1257    def user_exists(self, login):
1258        """Check if an account with the given login name already exists in PPMS.
1259
1260        Parameters
1261        ----------
1262        login : str
1263            The login name to check for.
1264
1265        Returns
1266        -------
1267        bool
1268            True in case an account with that name exists in PPMS, false otherwise.
1269        """
1270        try:
1271            self.get_user(login)
1272            return True
1273        except KeyError:
1274            return False
class PpmsConnection:
  27class PpmsConnection:
  28
  29    """Connection object to communicate with a PPMS instance.
  30
  31    Attributes
  32    ----------
  33    url : str
  34        The URL of the PUMAPI instance.
  35    api_key : str
  36        The API key used for authenticating against the PUMAPI.
  37    timeout : float
  38        The timeout value used in the ``requests.post`` calls.
  39    cache_path : str
  40        A path to a local directory used for caching responses.
  41    cache_users_only : bool
  42        Flag indicating that only PPMS user details will be stored in the
  43        on-disk cache, nothing else.
  44    last_served_from_cache : bool
  45        Indicates if the last request was served from the cache or on-line.
  46    users : dict
  47        A dict with usernames as keys, mapping to the related
  48        :py:class:`pyppms.user.PpmsUser` object, serves as a cache during the object's
  49        lifetime (can be empty if no calls to :py:meth:`get_user()` have been done yet).
  50    fullname_mapping : dict
  51        A dict mapping a user's *fullname* ("``<LASTNAME> <FIRSTNAME>``") to the
  52        corresponding username. Entries are filled in dynamically by the
  53        :py:meth:`get_user()` method.
  54    systems
  55        A dict with system IDs as keys, mapping to the related
  56        :py:class:`pyppms.system.PpmsSystem` object. Serves as a cache during the
  57        object's lifetime (can be empty if no calls to the :py:meth:`get_systems()` have
  58        been done yet).
  59    status : dict
  60        A dict with keys ``auth_state``, ``auth_response`` and
  61        ``auth_httpstatus``
  62    """
  63
  64    def __init__(self, url, api_key, timeout=10, cache="", cache_users_only=False):
  65        """Constructor for the PPMS connection object.
  66
  67        Open a connection to the PUMAPI defined in `url` and try to authenticate
  68        against it using the given API key (or use cache-only mode if key is an
  69        empty string). If an optional path to a caching location is specified,
  70        responses will be read from that location unless no matching file can be
  71        found there, in which case an on-line request will be done (with the
  72        response being saved to the cache path).
  73
  74        Parameters
  75        ----------
  76        url : str
  77            The URL of the PUMAPI to connect to.
  78        api_key : str
  79            The API key to use for authenticating against the PUMAPI. If
  80            specified as '' authentication will be skipped and the connection is
  81            running in cache-only (local) mode.
  82        timeout : float, optional
  83            How many seconds to wait for the PUMAPI server to send a response
  84            before giving up, by default 10.
  85        cache : str, optional
  86            A path to a local directory for caching responses from PUMAPI in
  87            individual text files. Useful for testing and for speeding up
  88            slow requests like 'getusers'. By default empty, which will result
  89            in no caching being done.
  90        cache_users_only : bool, optional
  91            If set to `True`, only `getuser` requests will be cached on disk.
  92            This can be used in to speed up the slow requests (through the
  93            cache), while everything else will be handled through online
  94            requests. By default `False`.
  95
  96        Raises
  97        ------
  98        requests.exceptions.ConnectionError
  99            Raised in case authentication fails.
 100        """
 101        self.url = url
 102        self.api_key = api_key
 103        self.timeout = timeout
 104        self.users = {}
 105        self.fullname_mapping = {}
 106        self.systems = {}
 107        self.status = {
 108            "auth_state": "NOT_TRIED",
 109            "auth_response": None,
 110            "auth_httpstatus": -1,
 111        }
 112        self.cache_path = cache
 113        self.cache_users_only = cache_users_only
 114        self.last_served_from_cache = False
 115        """Indicates if the last request was served from the cache or on-line."""
 116
 117        # run in cache-only mode (e.g. for testing or off-line usage) if no API
 118        # key has been specified, skip authentication then:
 119        if api_key != "":
 120            self.__authenticate()
 121        elif cache == "":
 122            raise RuntimeError(
 123                "Neither API key nor cache path given, at least one is required!"
 124            )
 125
 126    def __authenticate(self):
 127        """Try to authenticate to PPMS using the `auth` request.
 128
 129        Raises
 130        ------
 131        requests.exceptions.ConnectionError
 132            Raised in case authentication failed for any reason.
 133        """
 134        log.trace(
 135            "Attempting authentication against {} with key [{}...{}]",
 136            self.url,
 137            self.api_key[:2],
 138            self.api_key[-2:],
 139        )
 140        self.status["auth_state"] = "attempting"
 141        response = self.request("auth")
 142        log.trace(f"Authenticate response: {response.text}")
 143        self.status["auth_response"] = response.text
 144        self.status["auth_httpstatus"] = response.status_code
 145
 146        # NOTE: an unauthorized request has already been caught be the request() method
 147        # above. Our legacy code was additionally testing for 'error' in the response
 148        # text - however, it is unclear if PUMAPI ever returns this:
 149        if "error" in response.text.lower():
 150            self.status["auth_state"] = "FAILED-ERROR"
 151            msg = f"Authentication failed with an error: {response.text}"
 152            log.error(msg)
 153            raise requests.exceptions.ConnectionError(msg)
 154
 155        status_ok = requests.codes.ok  # pylint: disable-msg=no-member
 156
 157        if response.status_code != status_ok:
 158            # NOTE: branch excluded from coverage as we don't have a known way
 159            # to produce such a response from the API
 160            log.warning(
 161                "Unexpected combination of response [{}] and status code [{}], it's "
 162                "unclear if authentication succeeded (assuming it didn't)",
 163                response.status_code,
 164                response.text,
 165            )
 166            self.status["auth_state"] = "FAILED-UNKNOWN"
 167
 168            msg = (
 169                f"Authenticating against {self.url} with key "
 170                f"[{self.api_key[:2]}...{self.api_key[-2:]}] FAILED!"
 171            )
 172            log.error(msg)
 173            raise requests.exceptions.ConnectionError(msg)
 174
 175        log.trace(
 176            "Authentication succeeded, response=[{}], http_status=[{}]",
 177            response.text,
 178            response.status_code,
 179        )
 180        self.status["auth_state"] = "good"
 181
 182    def request(self, action, parameters={}, skip_cache=False):
 183        """Generic method to submit a request to PPMS and return the result.
 184
 185        This convenience method deals with adding the API key to a given
 186        request, submitting it to the PUMAPI and checking the response for some
 187        specific keywords indicating an error.
 188
 189        Parameters
 190        ----------
 191        action : str
 192            The command to be submitted to the PUMAPI.
 193        parameters : dict, optional
 194            A dictionary with additional parameters to be submitted with the
 195            request.
 196        skip_cache : bool, optional
 197            If set to True the request will NOT be served from the local cache,
 198            independent whether a matching response file exists there, by
 199            default False.
 200
 201        Returns
 202        -------
 203        requests.Response
 204            The response object created by posting the request.
 205
 206        Raises
 207        ------
 208        requests.exceptions.ConnectionError
 209            Raised in case the request is not authorized.
 210        """
 211        req_data = {"action": action, "apikey": self.api_key}
 212        req_data.update(parameters)
 213        # log.debug("Request parameters: {}", parameters)
 214
 215        response = None
 216        try:
 217            if skip_cache:  # pragma: no cover
 218                raise LookupError("Skipping the cache has been requested")
 219            response = self.__intercept_read(req_data)
 220            self.last_served_from_cache = True
 221        except LookupError as err:
 222            log.trace(f"Doing an on-line request: {err}")
 223            response = requests.post(self.url, data=req_data, timeout=self.timeout)
 224            self.last_served_from_cache = False
 225
 226        # store the response if it hasn't been read from the cache before:
 227        if not self.last_served_from_cache:  # pragma: no cover
 228            self.__intercept_store(req_data, response)
 229
 230        # NOTE: the HTTP status code returned is always `200` even if
 231        # authentication failed, so we need to check the actual response *TEXT*
 232        # to figure out if we have succeeded:
 233        if "request not authorized" in response.text.lower():
 234            self.status["auth_state"] = "FAILED"
 235            msg = f"Not authorized to run action `{req_data['action']}`"
 236            log.error(msg)
 237            raise requests.exceptions.ConnectionError(msg)
 238
 239        return response
 240
 241    def __interception_path(self, req_data, create_dir=False):
 242        """Derive the path for a local cache file from a request's parameters.
 243
 244        Parameters
 245        ----------
 246        req_data : dict
 247            The request's parameters, used to derive the name of the cache file.
 248        create_dir : bool, optional
 249            If set to True the cache directory will be created if necessary.
 250            Useful when adding responses to the cache. By default False.
 251
 252        Returns
 253        -------
 254        str
 255            The full path to a file name identified by all parameters of the
 256            request (except credentials like 'apikey').
 257        """
 258        action = req_data["action"]
 259
 260        if self.cache_users_only and action != "getuser":
 261            log.trace(f"NOT caching '{action}' (cache_users_only is set)")
 262            return None
 263
 264        intercept_dir = os.path.join(self.cache_path, action)
 265        if create_dir and not os.path.exists(intercept_dir):  # pragma: no cover
 266            try:
 267                os.makedirs(intercept_dir)
 268                log.trace(f"Created dir to store response: {intercept_dir}")
 269            except Exception as err:  # pylint: disable-msg=broad-except
 270                log.warning(f"Failed creating [{intercept_dir}]: {err}")
 271                return None
 272
 273        signature = ""
 274        # different python versions are returning dict items in different order, so
 275        # simply iterating over them will not always produce the same result - hence we
 276        # build up a sorted list of keys first and use that one then:
 277        keylist = list(req_data.keys())
 278        keylist.sort()
 279        for key in keylist:
 280            if key in ["action", "apikey"]:
 281                continue
 282            signature += f"__{key}--{req_data[key]}"
 283        if signature == "":
 284            signature = "__response"
 285        signature = signature[2:] + ".txt"
 286        intercept_file = os.path.join(intercept_dir, signature)
 287        return intercept_file
 288
 289    def __intercept_read(self, req_data):
 290        """Try to read a cached response from a local file.
 291
 292        Parameters
 293        ----------
 294        req_data : dict
 295            The request's parameters, used to derive the name of the cache file.
 296
 297        Returns
 298        -------
 299        PseudoResponse
 300            The response text read from the cache file wrapped in a
 301            PseudoResponse object, or None in case no matching file was found in
 302            the local cache.
 303
 304        Raises
 305        ------
 306        LookupError
 307            Raised in case no cache path has been set or no cache file matching
 308            the request parameters could be found in the cache.
 309        """
 310
 311        # pylint: disable-msg=too-few-public-methods
 312        class PseudoResponse:
 313            """Dummy response object with attribs 'text' and 'status_code'."""
 314
 315            def __init__(self, text, status_code):
 316                self.text = text
 317                self.status_code = int(status_code)
 318
 319        if self.cache_path == "":
 320            raise LookupError("No cache path configured")
 321
 322        intercept_file = self.__interception_path(req_data, create_dir=False)
 323        if not intercept_file or not os.path.exists(intercept_file):  # pragma: no cover
 324            raise LookupError(f"No cache hit for [{intercept_file}]")
 325
 326        with open(intercept_file, "r", encoding="utf-8") as infile:
 327            text = infile.read()
 328        log.debug(
 329            "Read intercepted response text from [{}]",
 330            intercept_file[len(str(self.cache_path)) :],
 331        )
 332
 333        status_code = 200
 334        status_file = os.path.splitext(intercept_file)[0] + "_status-code.txt"
 335        if os.path.exists(status_file):
 336            with open(status_file, "r", encoding="utf-8") as infile:
 337                status_code = infile.read()
 338            log.debug(f"Read intercepted response status code from [{status_file}]")
 339        return PseudoResponse(text, status_code)
 340
 341    def __intercept_store(self, req_data, response):  # pragma: no cover
 342        """Store the response in a local cache file named after the request.
 343
 344        Parameters
 345        ----------
 346        req_data : dict
 347            The request's parameters, used to derive the name of the cache file
 348            so it can be matched later when running the same request again.
 349        response : requests.Response
 350            The response object to store in the local cache.
 351        """
 352        # NOTE: this method is excluded from coverage measurements as it can only be
 353        # triggered when testing in online mode with at least one request not being
 354        # served from the cache (which is orthogonal to off-line testing)
 355        if self.cache_path == "":
 356            return
 357
 358        intercept_file = self.__interception_path(req_data, create_dir=True)
 359        if not intercept_file:
 360            log.trace("Not storing intercepted results in cache.")
 361            return
 362
 363        try:
 364            with open(intercept_file, "w", encoding="utf-8") as outfile:
 365                outfile.write(response.text)
 366            log.debug(
 367                "Wrote response text to [{}] ({} lines)",
 368                intercept_file,
 369                len(response.text.splitlines()),
 370            )
 371        except Exception as err:  # pylint: disable-msg=broad-except
 372            log.error("Storing response text in [{}] failed: {}", intercept_file, err)
 373            log.error("Response text was:\n--------\n{}\n--------", response.text)
 374
 375    def flush_cache(self, keep_users=False):
 376        """Flush the PyPPMS on-disk cache.
 377
 378        Optionally flushes everything *except* the `getuser` cache if the
 379        `keep_users` flag is set to `True`, as this is clearly the most
 380        time-consuming operation when fetching data from PUMAPI and therefore
 381        might want to be retained.
 382
 383        Please note that the `getusers` cache (plural, including the `s` suffix)
 384        will be flushed no matter what, as this is simply a list of user IDs
 385        that can be fetched with a single request. In consequence this means
 386        that using the `keep_users` flag will allow you to have reasonably fast
 387        reaction times while still getting information on *new* users live from
 388        PUMAPI at the only cost of possibly having outdated information on
 389        *existing* users.
 390
 391        Parameters
 392        ----------
 393        keep_users : bool, optional
 394            If set to `True` the `getuser` sub-directory in the cache location
 395            will be kept, by default `False`.
 396        """
 397        if self.cache_path == "":
 398            log.debug("No cache path configured, not flushing!")
 399            return
 400
 401        dirs_to_remove = [self.cache_path]  # by default remove the entire cache dir
 402        keep_msg = ""
 403        if keep_users:
 404            keep_msg = " (keeping user details dirs)"
 405            dirs_to_remove = []
 406            cache_dirs = os.listdir(self.cache_path)
 407            for subdir in cache_dirs:
 408                if subdir == "getuser":
 409                    continue
 410                dirs_to_remove.append(os.path.join(self.cache_path, subdir))
 411
 412        log.debug("Flushing the on-disk cache at [{}] {}...", self.cache_path, keep_msg)
 413        for directory in dirs_to_remove:
 414            try:
 415                shutil.rmtree(directory)
 416                log.trace("Removed directory [{}].", directory)
 417            except Exception as ex:  # pylint: disable-msg=broad-except
 418                log.warning("Removing the cache at [{}] failed: {}", directory, ex)
 419
 420    def get_admins(self):
 421        """Get all PPMS administrator users.
 422
 423        Returns
 424        -------
 425        list(pyppms.user.PpmsUser)
 426            A list with PpmsUser objects that are PPMS administrators.
 427        """
 428        response = self.request("getadmins")
 429
 430        admins = response.text.splitlines()
 431        users = []
 432        for username in admins:
 433            user = self.get_user(username)
 434            users.append(user)
 435        log.trace("{} admins in the PPMS database: {}", len(admins), ", ".join(admins))
 436        return users
 437
 438    def get_booking(self, system_id, booking_type="get"):
 439        """Get the current or next booking of a system.
 440
 441        WARNING: if the next booking is requested but it is too far in the future,
 442        PUMAPI silently ignores it - the response is identical to a system that has no
 443        future bookings and there is no error reported either. Currently it is unclear
 444        where the cutoff is (e.g. lookups for a booking that is two years from now still
 445        work fine, but a booking in about 10 years is silently skipped).
 446
 447        Parameters
 448        ----------
 449        system_id : int or int-like
 450            The ID of the system in PPMS.
 451        booking_type : {'get', 'next'}, optional
 452            The type of booking to request, one of `get` (requesting the
 453            currently running booking) and `next` (requesting the next upcoming
 454            booking), by default `get`.
 455            NOTE: if `next` is requested the resulting booking object will **NOT** have
 456            an end time (`endtime` will be `None`) as PUMAPI doesn't provide one in that
 457            case!
 458
 459        Returns
 460        -------
 461        pyppms.booking.PpmsBooking or None
 462            The booking object, or None if there is no booking for the system or the
 463            request is refused by PUMAPI (e.g. "not authorized").
 464
 465        Raises
 466        ------
 467        ValueError
 468            Raised if the specified `booking_type` is invalid.
 469        """
 470        valid = ["get", "next"]
 471        if booking_type not in valid:
 472            raise ValueError(
 473                f"Value for 'booking_type' ({booking_type}) not in {valid}!"
 474            )
 475
 476        try:
 477            response = self.request(booking_type + "booking", {"id": system_id})
 478        except requests.exceptions.ConnectionError:
 479            log.error("Requesting booking status for system {} failed!", system_id)
 480            return None
 481
 482        desc = "any future bookings"
 483        if booking_type == "get":
 484            desc = "a currently active booking"
 485        if not response.text.strip():
 486            log.trace("System [{}] doesn't have {}", system_id, desc)
 487            return None
 488
 489        return PpmsBooking(response.text, booking_type, system_id)
 490
 491    def get_current_booking(self, system_id):
 492        """Wrapper for `get_booking()` with 'booking_type' set to 'get'."""
 493        return self.get_booking(system_id, "get")
 494
 495    def get_group(self, group_id):
 496        """Fetch group details from PPMS and create a dict from them.
 497
 498        Parameters
 499        ----------
 500        group_id : str
 501            The group's identifier in PPMS, called 'unitlogin' there.
 502
 503        Returns
 504        -------
 505        dict
 506            A dict with the group details, keys being derived from the header
 507            line of the PUMAPI response, values from the data line.
 508        """
 509        response = self.request("getgroup", {"unitlogin": group_id})
 510        log.trace("Group details returned by PPMS (raw): {}", response.text)
 511
 512        if not response.text:
 513            msg = f"Group [{group_id}] is unknown to PPMS"
 514            log.error(msg)
 515            raise KeyError(msg)
 516
 517        details = dict_from_single_response(response.text)
 518
 519        log.trace("Details of group {}: {}", group_id, details)
 520        return details
 521
 522    def get_group_users(self, unitlogin):
 523        """Get all members of a group in PPMS.
 524
 525        Parameters
 526        ----------
 527        unitlogin : str
 528            The group's login ("unique login or id" in the PPMS web interface).
 529
 530        Returns
 531        -------
 532        list(pyppms.user.PpmsUser)
 533            A list with PpmsUser objects that are members of this PPMS group.
 534        """
 535        response = self.request("getgroupusers", {"unitlogin": unitlogin})
 536
 537        members = response.text.splitlines()
 538        users = []
 539        for username in members:
 540            user = self.get_user(username)
 541            users.append(user)
 542        log.trace(
 543            "{} members in PPMS group [{}]: {}",
 544            len(members),
 545            unitlogin,
 546            ", ".join(members),
 547        )
 548        return users
 549
 550    def get_groups(self):
 551        """Get a list of all groups in PPMS.
 552
 553        Returns
 554        -------
 555        list(str)
 556            A list with the group identifiers in PPMS.
 557        """
 558        response = self.request("getgroups")
 559
 560        groups = response.text.splitlines()
 561        log.trace("{} groups in the PPMS database: {}", len(groups), ", ".join(groups))
 562        return groups
 563
 564    def get_next_booking(self, system_id):
 565        """Wrapper for `get_booking()` with 'booking_type' set to 'next'."""
 566        return self.get_booking(system_id, "next")
 567
 568    def get_running_sheet(
 569        self, core_facility_ref, date, ignore_uncached_users=False, localisation=""
 570    ):
 571        """Get the running sheet for a specific day on the given facility.
 572
 573        The so-called "running-sheet" consists of all bookings / reservations of
 574        a facility on a specifc day.
 575
 576        WARNING: PUMAPI doesn't return a proper unique user identifier with the
 577        'getrunningsheet' request, instead the so called "full name" is given to
 578        identify the user - unfortunately this can lead to ambiguities as
 579        multiple different accounts can have the same full name.
 580
 581        Parameters
 582        ----------
 583        core_facility_ref : int or int-like
 584            The core facility ID for PPMS.
 585        date : datetime.datetime
 586            The date to request the running sheet for, e.g. ``datetime.now()`` or
 587            similar. Note that only the date part is relevant, time will be ignored.
 588        ignore_uncached_users : bool, optional
 589            If set to `True` any booking for a user that is not present in the instance
 590            attribute `fullname_mapping` will be ignored in the resulting list.
 591        localisation : str, optional
 592            If given, the runningsheet will be limited to systems where the
 593            `localisation` (~"room") field matches the given value.
 594
 595        Returns
 596        -------
 597        list(pyppms.booking.PpmsBooking)
 598            A list with `PpmsBooking` objects for the given day. Empty in case
 599            there are no bookings or parsing the response failed.
 600        """
 601        bookings = []
 602        parameters = {
 603            "plateformid": f"{core_facility_ref}",
 604            "day": date.strftime("%Y-%m-%d"),
 605        }
 606        log.trace("Requesting runningsheet for {}", parameters["day"])
 607        response = self.request("getrunningsheet", parameters)
 608        try:
 609            entries = parse_multiline_response(response.text, graceful=False)
 610        except NoDataError:
 611            # in case no bookings exist the response will be empty!
 612            log.trace("Runningsheet for the given day was empty!")
 613            return []
 614        except Exception as err:  # pylint: disable-msg=broad-except
 615            log.error("Parsing runningsheet details failed: {}", err)
 616            log.trace("Runningsheet PUMPAI response was: >>>{}<<<", response.text)
 617            return []
 618
 619        for entry in entries:
 620            full = entry["User"]
 621            if full not in self.fullname_mapping:
 622                if ignore_uncached_users:
 623                    log.debug(f"Ignoring booking for uncached / unknown user [{full}]")
 624                    continue
 625
 626                log.debug(f"Booking refers an uncached user ({full}), updating users!")
 627                self.update_users()
 628
 629            if full not in self.fullname_mapping:
 630                log.error("PPMS doesn't seem to know user [{}], skipping", full)
 631                continue
 632
 633            log.trace(
 634                f"Booking for user '{self.fullname_mapping[full]}' ({full}) found"
 635            )
 636            system_name = entry["Object"]
 637            # FIXME: add a test with one system name being a subset of another system
 638            # (this will result in more than one result and should be fixed e.g. by
 639            # adding an optional parameter "exact" to get_systems_matching() or
 640            # similar)
 641            system_ids = self.get_systems_matching(localisation, [system_name])
 642            if len(system_ids) < 1:
 643                if localisation:
 644                    log.debug(f"Given criteria return zero systems for [{system_name}]")
 645                else:
 646                    log.warning(f"No systems matching criteria for [{system_name}]")
 647                continue
 648
 649            if len(system_ids) > 1:
 650                # NOTE: more than one result should not happen as PPMS doesn't allow for
 651                # multiple systems having the same name - no result might happen though!
 652                log.error("Ignoring booking for unknown system [{}]", system_name)
 653                continue
 654
 655            booking = PpmsBooking.from_runningsheet(
 656                entry,
 657                system_ids[0],
 658                self.fullname_mapping[full],
 659                date,
 660            )
 661            bookings.append(booking)
 662
 663        return bookings
 664
 665    def get_systems(self, force_refresh=False):
 666        """Get a dict with all systems in PPMS.
 667
 668        Parameters
 669        ----------
 670        force_refresh : bool, optional
 671            If `True` the list of systems will be refreshed even if the object's
 672            attribute `self.systems` is non-empty, by default `False`. Please
 673            note that this will NOT skip the on-disk cache in case that exists!
 674
 675        Returns
 676        -------
 677        dict(pyppms.system.PpmsSystem)
 678            A dict with `PpmsSystem` objects parsed from the PUMAPI response where
 679            the system ID (int) is used as the dict's key. If parsing a system
 680            fails for any reason, the system is skipped entirely.
 681        """
 682        if self.systems and not force_refresh:
 683            log.trace("Using cached details for {} systems", len(self.systems))
 684        else:
 685            self.update_systems()
 686
 687        return self.systems
 688
 689    def get_systems_matching(self, localisation, name_contains):
 690        """Query PPMS for systems with a specific location and name.
 691
 692        This method assembles a list of PPMS system IDs whose "localisation"
 693        (room) field matches a given string and where the system name contains
 694        at least one of the strings given as the `name_contains` parameter.
 695
 696        Parameters
 697        ----------
 698        localisation : str
 699            A string that the system's "localisation" (i.e. the "Room" field in
 700            the PPMS web interface) has to match. Can be an empty string which
 701            will result in no filtering being done on the "Room" attribute.
 702        name_contains : list(str)
 703            A list of valid names (categories) of which the system's name has to
 704            match at least one for being included. Supply an empty list for
 705            skipping this filter.
 706
 707        Returns
 708        -------
 709        list(int)
 710            A list with PPMS system IDs matching all of the given criteria.
 711
 712        Raises
 713        ------
 714        TypeError
 715            Raised in case the `name_contains` parameter is of type `str` (it
 716            needs to be `list(str)` instead).
 717        """
 718        if isinstance(name_contains, str):
 719            raise TypeError("`name_contains` must be a list of str, not str!")
 720
 721        loc = localisation
 722        loc_desc = f"with location matching [{localisation}]"
 723        if localisation == "":
 724            loc_desc = "(no location filter given)"
 725
 726        log.trace(
 727            "Querying PPMS for systems {}, name matching any of {}",
 728            loc_desc,
 729            name_contains,
 730        )
 731        system_ids = []
 732        systems = self.get_systems()
 733        for sys_id, system in systems.items():
 734            if loc.lower() not in str(system.localisation).lower():
 735                log.trace(
 736                    "System [{}] location ({}) is NOT matching ({}), ignoring",
 737                    system.name,
 738                    system.localisation,
 739                    loc,
 740                )
 741                continue
 742
 743            # log.trace('System [{}] is matching location [{}], checking if '
 744            #           'the name is matching any of the valid pattern {}',
 745            #           system.name, loc, name_contains)
 746            for valid_name in name_contains:
 747                if valid_name in system.name:
 748                    log.trace("System [{}] matches all criteria", system.name)
 749                    system_ids.append(sys_id)
 750                    break
 751
 752            # if sys_id not in system_ids:
 753            #     log.trace('System [{}] does NOT match a valid name: {}',
 754            #               system.name, name_contains)
 755
 756        log.trace("Found {} bookable systems {}", len(system_ids), loc_desc)
 757        log.trace("IDs of matching bookable systems {}: {}", loc_desc, system_ids)
 758        return system_ids
 759
 760    def get_user(self, login_name, skip_cache=False):
 761        """Fetch user details from PPMS and create a PpmsUser object from it.
 762
 763        Parameters
 764        ----------
 765        login_name : str
 766            The user's PPMS login name.
 767        skip_cache : bool, optional
 768            Passed as-is to the :py:meth:`request()` method
 769
 770        Returns
 771        -------
 772        pyppms.user.PpmsUser
 773            The user object created from the PUMAPI response. The object will be
 774            additionally stored in the self.users dict using the login_name as
 775            the dict's key.
 776
 777        Raises
 778        ------
 779        KeyError
 780            Raised if the user doesn't exist in PPMS.
 781        """
 782        response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache)
 783
 784        if not response.text:
 785            msg = f"User [{login_name}] is unknown to PPMS"
 786            log.debug(msg)
 787            raise KeyError(msg)
 788
 789        user = PpmsUser(response.text)
 790        self.users[user.username] = user  # update / add to the cached user objs
 791        self.fullname_mapping[user.fullname] = user.username
 792        return user
 793
 794    def get_user_dict(self, login_name, skip_cache=False):
 795        """Get details on a given user from PPMS.
 796
 797        Parameters
 798        ----------
 799        login_name : str
 800            The PPMS account / login name of the user to query.
 801        skip_cache : bool, optional
 802            Passed as-is to the :py:meth:`request()` method
 803
 804        Returns
 805        -------
 806        dict
 807            A dict with the user details returned by the PUMAPI.
 808
 809        Example
 810        -------
 811        >>> conn.get_user_dict('pyppms')
 812        ... {
 813        ...     u'active': True,
 814        ...     u'affiliation': u'',
 815        ...     u'bcode': u'',
 816        ...     u'email': u'pyppms@python-facility.example',
 817        ...     u'fname': u'PumAPI',
 818        ...     u'lname': u'Python',
 819        ...     u'login': u'pyppms',
 820        ...     u'mustchbcode': False,
 821        ...     u'mustchpwd': False',
 822        ...     u'phone': u'+98 (76) 54 3210',
 823        ...     u'unitlogin': u'pyppms'
 824        ... }
 825
 826        Raises
 827        ------
 828        KeyError
 829            Raised in case the user account is unknown to PPMS.
 830        ValueError
 831            Raised if the user details can't be parsed from the PUMAPI response.
 832        """
 833        response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache)
 834
 835        if not response.text:
 836            msg = f"User [{login_name}] is unknown to PPMS"
 837            log.error(msg)
 838            raise KeyError(msg)
 839
 840        # EXAMPLE:
 841        # response.text = (
 842        #     u'login,lname,fname,email,'
 843        #     u'phone,bcode,affiliation,unitlogin,mustchpwd,mustchbcode,'
 844        #     u'active\r\n'
 845        #     u'"pyppms","Python","PumAPI","pyppms@python-facility.example",'
 846        #     u'"+98 (76) 54 3210","","","pyppms",false,false,'
 847        #     u'true\r\n'
 848        # )
 849        details = dict_from_single_response(response.text)
 850        log.trace("Details for user [{}]: {}", login_name, details)
 851        return details
 852
 853    def get_user_experience(self, login=None, system_id=None):
 854        """Get user experience ("User rights") from PPMS.
 855
 856        Parameters
 857        ----------
 858        login : str, optional
 859            An optional login name to request the experience / permissions for,
 860            by default None
 861        system_id : int, optional
 862            An optional system ID to request the experience / permissions for,
 863            by default None
 864
 865        Returns
 866        -------
 867        list(dict)
 868            A list with dicts parsed from the user experience response.
 869        """
 870        data = {}
 871        if login is not None:
 872            data["login"] = login
 873        if system_id is not None:
 874            data["id"] = system_id
 875        response = self.request("getuserexp", parameters=data)
 876
 877        parsed = parse_multiline_response(response.text)
 878        log.trace(
 879            "Received {} experience entries for filters [user:{}] and [id:{}]",
 880            len(parsed),
 881            login,
 882            system_id,
 883        )
 884        return parsed
 885
 886    def get_user_ids(self, active=False):
 887        """Get a list with all user IDs in the PPMS system.
 888
 889        Parameters
 890        ----------
 891        active : bool, optional
 892            Request only users marked as active in PPMS, by default False.
 893            NOTE: "active" is a tri-state parameter in PPMS: "true", "false"
 894            or empty!
 895
 896        Returns
 897        -------
 898        list
 899            A list of all (or active-only) user IDs in PPMS.
 900        """
 901        # TODO: describe format of returned list and / or give an example!
 902        parameters = {}
 903        if active:
 904            parameters["active"] = "true"
 905
 906        response = self.request("getusers", parameters)
 907
 908        users = response.text.splitlines()
 909        active_desc = "active " if active else ""
 910        log.trace("{} {}users in the PPMS database", len(users), active_desc)
 911        log.trace(", ".join(users))
 912        return users
 913
 914    def get_users(self, force_refresh=False, active_only=True):
 915        """Get user objects for all (or cached) PPMS users.
 916
 917        Parameters
 918        ----------
 919        force_refresh : bool, optional
 920            Re-request information from PPMS even if user details have been
 921            cached locally before, by default False.
 922        active_only : bool, optional
 923            If set to `False` also "inactive" users will be fetched from PPMS,
 924            by default `True`.
 925
 926        Returns
 927        -------
 928        dict(pyppms.user.PpmsUser)
 929            A dict of PpmsUser objects with the username (login) as key.
 930        """
 931        if self.users and not force_refresh:
 932            log.trace("Using cached details for {} users", len(self.users))
 933        else:
 934            self.update_users(active_only=active_only)
 935
 936        return self.users
 937
 938    def get_users_emails(self, users=None, active=False):
 939        """Get a list of user email addresses. WARNING - very slow!
 940
 941        Parameters
 942        ----------
 943        users : list(str), optional
 944            A list of login names to retrieve the email addresses for, if
 945            omitted addresses for all (or active ones) will be requested.
 946        active : bool, optional
 947            Request only addresses of users marked as active in PPMS, by default
 948            False. Will be ignored if a list of usernames is given explicitly.
 949
 950        Returns
 951        -------
 952        list(str)
 953            Email addresses of the users requested.
 954        """
 955        emails = []
 956        if users is None:
 957            users = self.get_user_ids(active=active)
 958        for user in users:
 959            email = self.get_user_dict(user)["email"]
 960            if not email:
 961                log.warning("--- WARNING: no email for user [{}]! ---", user)
 962                continue
 963            # log.trace("{}: {}", user, email)
 964            emails.append(email)
 965
 966        return emails
 967
 968    def get_users_with_access_to_system(self, system_id):
 969        """Get a list of usernames allowed to book the system with the given ID.
 970
 971        Parameters
 972        ----------
 973        system_id : int or int-like
 974            The ID of the system to query permitted users for.
 975
 976        Returns
 977        -------
 978        list(str)
 979            A list of usernames ('login') with permissions to book the system
 980            with the given ID in PPMS.
 981
 982        Raises
 983        ------
 984        ValueError
 985            Raised in case parsing the response failes for any reason.
 986        """
 987        users = []
 988
 989        response = self.request("getsysrights", {"id": system_id})
 990        # this response has a unique format, so parse it directly here:
 991        try:
 992            lines = response.text.splitlines()
 993            for line in lines:
 994                permission, username = line.split(":")
 995                if permission.upper() == "D":
 996                    log.trace(
 997                        "User [{}] is deactivated for booking system [{}], skipping",
 998                        username,
 999                        system_id,
1000                    )
1001                    continue
1002
1003                log.trace(
1004                    "User [{}] has permission to book system [{}]", username, system_id
1005                )
1006                users.append(username)
1007
1008        except Exception as err:
1009            msg = (
1010                f"Unable to parse data returned by PUMAPI: {response.text} - "
1011                f"ERROR: {err}"
1012            )
1013            log.error(msg)
1014            raise ValueError(msg) from err
1015
1016        return users
1017
1018    def give_user_access_to_system(self, username, system_id):
1019        """Add permissions for a user to book a given system in PPMS.
1020
1021        Parameters
1022        ----------
1023        username : str
1024            The username ('login') to allow for booking the system.
1025        system_id : int or int-like
1026            The ID of the system to add the permission for.
1027
1028        Returns
1029        -------
1030        bool
1031            True in case the given username now has the permissions to book the
1032            system with the specified ID (or if the user already had them
1033            before), False otherwise.
1034        """
1035        return self.set_system_booking_permissions(username, system_id, "A")
1036
1037    def new_user(  # pylint: disable-msg=too-many-arguments
1038        self, login, lname, fname, email, ppms_group, phone=None, password=None
1039    ):
1040        """Create a new user in PPMS.
1041
1042        The method is asking PPMS to create a new user account with the given details.
1043        In case an account with that login name already exists, it will log a warning
1044        and return without sending any further requests to PPMS.
1045
1046        Parameters
1047        ----------
1048        login : str
1049            The unique identifier for the user.
1050        lname : str
1051            The last name of the user.
1052        fname : str
1053            The first name of the user.
1054        email : str
1055            The email address of the user.
1056        ppms_group : str
1057            The unique identifier of the primary group of the new user. A new group will
1058            be created if no group with the given name exists.
1059        phone : str, optional
1060            The phone number of the user.
1061        password : str, optional
1062            The password for the user. If no password is set the user will not be able
1063            to log on to PPMS.
1064
1065        Raises
1066        ------
1067        RuntimeError
1068            Will be raised in case creating the user fails.
1069        """
1070        if self.user_exists(login):
1071            log.warning("NOT creating user [{}] as it already exists!", login)
1072            return
1073
1074        req_data = {
1075            "login": login,
1076            "lname": lname,
1077            "fname": fname,
1078            "email": email,
1079            "unitlogin": ppms_group,
1080        }
1081        if phone:
1082            req_data["phone"] = phone
1083        if password:
1084            req_data["pwd"] = password
1085
1086        response = self.request("newuser", req_data)
1087        if not "OK newuser" in response.text:
1088            msg = f"Creating new user failed: {response.text}"
1089            log.error(msg)
1090            raise RuntimeError(msg)
1091
1092        log.debug("Created user [{}] in PPMS.", login)
1093        log.trace("Response was: {}", response.text)
1094
1095    def remove_user_access_from_system(self, username, system_id):
1096        """Remove permissions for a user to book a given system in PPMS.
1097
1098        Parameters
1099        ----------
1100        username : str
1101            The username ('login') to remove booking permissions on the system.
1102        system_id : int or int-like
1103            The ID of the system to modify the permission for.
1104
1105        Returns
1106        -------
1107        bool
1108            True in case the given username now has the permissions to book the
1109            system with the specified ID (or if the user already had them
1110            before), False otherwise.
1111        """
1112        return self.set_system_booking_permissions(username, system_id, "D")
1113
1114    def set_system_booking_permissions(self, login, system_id, permission):
1115        """Set permissions for a user on a given system in PPMS.
1116
1117        Parameters
1118        ----------
1119        username : str
1120            The username ('login') to allow for booking the system.
1121        system_id : int or int-like
1122            The ID of the system to add the permission for.
1123        permission : {'D', 'A', 'N', 'S'}
1124            The permission level to set for the user, one of:
1125              - ``D`` : deactivated
1126              - ``A`` : autonomous
1127              - ``N`` : novice
1128              - ``S`` : superuser
1129
1130        Returns
1131        -------
1132        bool
1133            True in case setting permissions for the given username on the
1134            system with the specified ID succeeded (or if the user already had
1135            those permissions before), False otherwise.
1136        """
1137
1138        def permission_name(shortname):
1139            """Closure to validate a permission level and return its long name.
1140
1141            Parameters
1142            ----------
1143            shortname : str
1144                A single character defining the permission level.
1145
1146            Returns
1147            -------
1148            str
1149                The long (human-readable) name of the permission level.
1150
1151            Raises
1152            ------
1153            KeyError
1154                Raised in case an invalid permission level was given.
1155            """
1156            mapping = {
1157                "D": "deactivated",
1158                "A": "autonomous",
1159                "N": "novice",
1160                "S": "superuser",
1161            }
1162            try:
1163                return mapping[shortname]
1164            except KeyError as err:
1165                raise KeyError(f"Invalid permission [{shortname}] given") from err
1166
1167        log.debug(
1168            "Setting permission level [{}] for user [{}] on system [{}]",
1169            permission_name(permission),
1170            login,
1171            system_id,
1172        )
1173
1174        parameters = {"id": system_id, "login": login, "type": permission}
1175        response = self.request("setright", parameters)
1176
1177        # NOTE: the 'setright' action will accept ANY permission type and return 'done'
1178        # on the request, so there is no way to check from the response if setting the
1179        # permission really worked!!
1180        # log.trace('Request returned text: {}', response.text)
1181        if response.text.lower().strip() == "done":
1182            log.trace(
1183                "User [{}] now has permission level [{}] on system [{}]",
1184                login,
1185                permission_name(permission),
1186                system_id,
1187            )
1188            return True
1189
1190        if "invalid user" in response.text.lower():
1191            log.warning("User [{}] doesn't seem to exist in PPMS", login)
1192        elif "system right not authorized" in response.text.lower():
1193            log.error(
1194                "Unable to set permissions for system {}: {}", system_id, response.text
1195            )
1196        else:
1197            log.error("Unexpected response, assuming request failed: {}", response.text)
1198
1199        return False
1200
1201    def update_systems(self):
1202        """Update cached details for all bookable systems from PPMS.
1203
1204        Get the details on all bookable systems from PPMS and store them in the local
1205        cache. If parsing the PUMAPI response for a system fails for any reason, the
1206        system is skipped entirely.
1207        """
1208        log.trace("Updating list of bookable systems...")
1209        systems = {}
1210        parse_fails = 0
1211        response = self.request("getsystems")
1212        details = parse_multiline_response(response.text, graceful=False)
1213        for detail in details:
1214            try:
1215                system = PpmsSystem(detail)
1216            except ValueError as err:
1217                log.error("Error processing `getsystems` response: {}", err)
1218                parse_fails += 1
1219                continue
1220
1221            systems[system.system_id] = system
1222
1223        log.trace(
1224            "Updated {} bookable systems from PPMS ({} systems failed parsing)",
1225            len(systems),
1226            parse_fails,
1227        )
1228
1229        self.systems = systems
1230
1231    def update_users(self, user_ids=[], active_only=True):
1232        """Update cached details for a list of users from PPMS.
1233
1234        Get the user details on a list of users (or all active ones) from PPMS and store
1235        them in the object's `users` dict. As a side effect, this will also fill the
1236        cache directory in case the object's `cache_path` attribute is set.
1237
1238        WARNING - very slow, especially when the PPMS instance has many users!
1239
1240        Parameters
1241        ----------
1242        user_ids : list(str), optional
1243            A list of user IDs (login names) to request the cache for, by
1244            default [] which will result in all *active* users to be requested.
1245        active_only : bool, optional
1246            If set to `False` also "inactive" users will be fetched from PPMS,
1247            by default `True`.
1248        """
1249        if not user_ids:
1250            user_ids = self.get_user_ids(active=active_only)
1251
1252        log.trace("Updating details on {} users", len(user_ids))
1253        for user_id in user_ids:
1254            self.get_user(user_id, skip_cache=True)
1255
1256        log.debug("Collected details on {} users", len(self.users))
1257
1258    def user_exists(self, login):
1259        """Check if an account with the given login name already exists in PPMS.
1260
1261        Parameters
1262        ----------
1263        login : str
1264            The login name to check for.
1265
1266        Returns
1267        -------
1268        bool
1269            True in case an account with that name exists in PPMS, false otherwise.
1270        """
1271        try:
1272            self.get_user(login)
1273            return True
1274        except KeyError:
1275            return False

Connection object to communicate with a PPMS instance.

Attributes
  • url (str): The URL of the PUMAPI instance.
  • api_key (str): The API key used for authenticating against the PUMAPI.
  • timeout (float): The timeout value used in the requests.post calls.
  • cache_path (str): A path to a local directory used for caching responses.
  • cache_users_only (bool): Flag indicating that only PPMS user details will be stored in the on-disk cache, nothing else.
  • last_served_from_cache (bool): Indicates if the last request was served from the cache or on-line.
  • users (dict): A dict with usernames as keys, mapping to the related pyppms.user.PpmsUser object, serves as a cache during the object's lifetime (can be empty if no calls to get_user()() have been done yet).
  • fullname_mapping (dict): A dict mapping a user's fullname ("<LASTNAME> <FIRSTNAME>") to the corresponding username. Entries are filled in dynamically by the get_user()() method.
  • systems: A dict with system IDs as keys, mapping to the related pyppms.system.PpmsSystem object. Serves as a cache during the object's lifetime (can be empty if no calls to the get_systems()() have been done yet).
  • status (dict): A dict with keys auth_state, auth_response and auth_httpstatus
PpmsConnection(url, api_key, timeout=10, cache='', cache_users_only=False)
 64    def __init__(self, url, api_key, timeout=10, cache="", cache_users_only=False):
 65        """Constructor for the PPMS connection object.
 66
 67        Open a connection to the PUMAPI defined in `url` and try to authenticate
 68        against it using the given API key (or use cache-only mode if key is an
 69        empty string). If an optional path to a caching location is specified,
 70        responses will be read from that location unless no matching file can be
 71        found there, in which case an on-line request will be done (with the
 72        response being saved to the cache path).
 73
 74        Parameters
 75        ----------
 76        url : str
 77            The URL of the PUMAPI to connect to.
 78        api_key : str
 79            The API key to use for authenticating against the PUMAPI. If
 80            specified as '' authentication will be skipped and the connection is
 81            running in cache-only (local) mode.
 82        timeout : float, optional
 83            How many seconds to wait for the PUMAPI server to send a response
 84            before giving up, by default 10.
 85        cache : str, optional
 86            A path to a local directory for caching responses from PUMAPI in
 87            individual text files. Useful for testing and for speeding up
 88            slow requests like 'getusers'. By default empty, which will result
 89            in no caching being done.
 90        cache_users_only : bool, optional
 91            If set to `True`, only `getuser` requests will be cached on disk.
 92            This can be used in to speed up the slow requests (through the
 93            cache), while everything else will be handled through online
 94            requests. By default `False`.
 95
 96        Raises
 97        ------
 98        requests.exceptions.ConnectionError
 99            Raised in case authentication fails.
100        """
101        self.url = url
102        self.api_key = api_key
103        self.timeout = timeout
104        self.users = {}
105        self.fullname_mapping = {}
106        self.systems = {}
107        self.status = {
108            "auth_state": "NOT_TRIED",
109            "auth_response": None,
110            "auth_httpstatus": -1,
111        }
112        self.cache_path = cache
113        self.cache_users_only = cache_users_only
114        self.last_served_from_cache = False
115        """Indicates if the last request was served from the cache or on-line."""
116
117        # run in cache-only mode (e.g. for testing or off-line usage) if no API
118        # key has been specified, skip authentication then:
119        if api_key != "":
120            self.__authenticate()
121        elif cache == "":
122            raise RuntimeError(
123                "Neither API key nor cache path given, at least one is required!"
124            )

Constructor for the PPMS connection object.

Open a connection to the PUMAPI defined in url and try to authenticate against it using the given API key (or use cache-only mode if key is an empty string). If an optional path to a caching location is specified, responses will be read from that location unless no matching file can be found there, in which case an on-line request will be done (with the response being saved to the cache path).

Parameters
  • url (str): The URL of the PUMAPI to connect to.
  • api_key (str): The API key to use for authenticating against the PUMAPI. If specified as '' authentication will be skipped and the connection is running in cache-only (local) mode.
  • timeout (float, optional): How many seconds to wait for the PUMAPI server to send a response before giving up, by default 10.
  • cache (str, optional): A path to a local directory for caching responses from PUMAPI in individual text files. Useful for testing and for speeding up slow requests like 'getusers'. By default empty, which will result in no caching being done.
  • cache_users_only (bool, optional): If set to True, only getuser requests will be cached on disk. This can be used in to speed up the slow requests (through the cache), while everything else will be handled through online requests. By default False.
Raises
  • requests.exceptions.ConnectionError: Raised in case authentication fails.
last_served_from_cache

Indicates if the last request was served from the cache or on-line.

def request(self, action, parameters={}, skip_cache=False):
182    def request(self, action, parameters={}, skip_cache=False):
183        """Generic method to submit a request to PPMS and return the result.
184
185        This convenience method deals with adding the API key to a given
186        request, submitting it to the PUMAPI and checking the response for some
187        specific keywords indicating an error.
188
189        Parameters
190        ----------
191        action : str
192            The command to be submitted to the PUMAPI.
193        parameters : dict, optional
194            A dictionary with additional parameters to be submitted with the
195            request.
196        skip_cache : bool, optional
197            If set to True the request will NOT be served from the local cache,
198            independent whether a matching response file exists there, by
199            default False.
200
201        Returns
202        -------
203        requests.Response
204            The response object created by posting the request.
205
206        Raises
207        ------
208        requests.exceptions.ConnectionError
209            Raised in case the request is not authorized.
210        """
211        req_data = {"action": action, "apikey": self.api_key}
212        req_data.update(parameters)
213        # log.debug("Request parameters: {}", parameters)
214
215        response = None
216        try:
217            if skip_cache:  # pragma: no cover
218                raise LookupError("Skipping the cache has been requested")
219            response = self.__intercept_read(req_data)
220            self.last_served_from_cache = True
221        except LookupError as err:
222            log.trace(f"Doing an on-line request: {err}")
223            response = requests.post(self.url, data=req_data, timeout=self.timeout)
224            self.last_served_from_cache = False
225
226        # store the response if it hasn't been read from the cache before:
227        if not self.last_served_from_cache:  # pragma: no cover
228            self.__intercept_store(req_data, response)
229
230        # NOTE: the HTTP status code returned is always `200` even if
231        # authentication failed, so we need to check the actual response *TEXT*
232        # to figure out if we have succeeded:
233        if "request not authorized" in response.text.lower():
234            self.status["auth_state"] = "FAILED"
235            msg = f"Not authorized to run action `{req_data['action']}`"
236            log.error(msg)
237            raise requests.exceptions.ConnectionError(msg)
238
239        return response

Generic method to submit a request to PPMS and return the result.

This convenience method deals with adding the API key to a given request, submitting it to the PUMAPI and checking the response for some specific keywords indicating an error.

Parameters
  • action (str): The command to be submitted to the PUMAPI.
  • parameters (dict, optional): A dictionary with additional parameters to be submitted with the request.
  • skip_cache (bool, optional): If set to True the request will NOT be served from the local cache, independent whether a matching response file exists there, by default False.
Returns
  • requests.Response: The response object created by posting the request.
Raises
  • requests.exceptions.ConnectionError: Raised in case the request is not authorized.
def flush_cache(self, keep_users=False):
375    def flush_cache(self, keep_users=False):
376        """Flush the PyPPMS on-disk cache.
377
378        Optionally flushes everything *except* the `getuser` cache if the
379        `keep_users` flag is set to `True`, as this is clearly the most
380        time-consuming operation when fetching data from PUMAPI and therefore
381        might want to be retained.
382
383        Please note that the `getusers` cache (plural, including the `s` suffix)
384        will be flushed no matter what, as this is simply a list of user IDs
385        that can be fetched with a single request. In consequence this means
386        that using the `keep_users` flag will allow you to have reasonably fast
387        reaction times while still getting information on *new* users live from
388        PUMAPI at the only cost of possibly having outdated information on
389        *existing* users.
390
391        Parameters
392        ----------
393        keep_users : bool, optional
394            If set to `True` the `getuser` sub-directory in the cache location
395            will be kept, by default `False`.
396        """
397        if self.cache_path == "":
398            log.debug("No cache path configured, not flushing!")
399            return
400
401        dirs_to_remove = [self.cache_path]  # by default remove the entire cache dir
402        keep_msg = ""
403        if keep_users:
404            keep_msg = " (keeping user details dirs)"
405            dirs_to_remove = []
406            cache_dirs = os.listdir(self.cache_path)
407            for subdir in cache_dirs:
408                if subdir == "getuser":
409                    continue
410                dirs_to_remove.append(os.path.join(self.cache_path, subdir))
411
412        log.debug("Flushing the on-disk cache at [{}] {}...", self.cache_path, keep_msg)
413        for directory in dirs_to_remove:
414            try:
415                shutil.rmtree(directory)
416                log.trace("Removed directory [{}].", directory)
417            except Exception as ex:  # pylint: disable-msg=broad-except
418                log.warning("Removing the cache at [{}] failed: {}", directory, ex)

Flush the PyPPMS on-disk cache.

Optionally flushes everything except the getuser cache if the keep_users flag is set to True, as this is clearly the most time-consuming operation when fetching data from PUMAPI and therefore might want to be retained.

Please note that the getusers cache (plural, including the s suffix) will be flushed no matter what, as this is simply a list of user IDs that can be fetched with a single request. In consequence this means that using the keep_users flag will allow you to have reasonably fast reaction times while still getting information on new users live from PUMAPI at the only cost of possibly having outdated information on existing users.

Parameters
  • keep_users (bool, optional): If set to True the getuser sub-directory in the cache location will be kept, by default False.
def get_admins(self):
420    def get_admins(self):
421        """Get all PPMS administrator users.
422
423        Returns
424        -------
425        list(pyppms.user.PpmsUser)
426            A list with PpmsUser objects that are PPMS administrators.
427        """
428        response = self.request("getadmins")
429
430        admins = response.text.splitlines()
431        users = []
432        for username in admins:
433            user = self.get_user(username)
434            users.append(user)
435        log.trace("{} admins in the PPMS database: {}", len(admins), ", ".join(admins))
436        return users

Get all PPMS administrator users.

Returns
def get_booking(self, system_id, booking_type='get'):
438    def get_booking(self, system_id, booking_type="get"):
439        """Get the current or next booking of a system.
440
441        WARNING: if the next booking is requested but it is too far in the future,
442        PUMAPI silently ignores it - the response is identical to a system that has no
443        future bookings and there is no error reported either. Currently it is unclear
444        where the cutoff is (e.g. lookups for a booking that is two years from now still
445        work fine, but a booking in about 10 years is silently skipped).
446
447        Parameters
448        ----------
449        system_id : int or int-like
450            The ID of the system in PPMS.
451        booking_type : {'get', 'next'}, optional
452            The type of booking to request, one of `get` (requesting the
453            currently running booking) and `next` (requesting the next upcoming
454            booking), by default `get`.
455            NOTE: if `next` is requested the resulting booking object will **NOT** have
456            an end time (`endtime` will be `None`) as PUMAPI doesn't provide one in that
457            case!
458
459        Returns
460        -------
461        pyppms.booking.PpmsBooking or None
462            The booking object, or None if there is no booking for the system or the
463            request is refused by PUMAPI (e.g. "not authorized").
464
465        Raises
466        ------
467        ValueError
468            Raised if the specified `booking_type` is invalid.
469        """
470        valid = ["get", "next"]
471        if booking_type not in valid:
472            raise ValueError(
473                f"Value for 'booking_type' ({booking_type}) not in {valid}!"
474            )
475
476        try:
477            response = self.request(booking_type + "booking", {"id": system_id})
478        except requests.exceptions.ConnectionError:
479            log.error("Requesting booking status for system {} failed!", system_id)
480            return None
481
482        desc = "any future bookings"
483        if booking_type == "get":
484            desc = "a currently active booking"
485        if not response.text.strip():
486            log.trace("System [{}] doesn't have {}", system_id, desc)
487            return None
488
489        return PpmsBooking(response.text, booking_type, system_id)

Get the current or next booking of a system.

WARNING: if the next booking is requested but it is too far in the future, PUMAPI silently ignores it - the response is identical to a system that has no future bookings and there is no error reported either. Currently it is unclear where the cutoff is (e.g. lookups for a booking that is two years from now still work fine, but a booking in about 10 years is silently skipped).

Parameters
  • system_id (int or int-like): The ID of the system in PPMS.
  • booking_type ({'get', 'next'}, optional): The type of booking to request, one of get (requesting the currently running booking) and next (requesting the next upcoming booking), by default get. NOTE: if next is requested the resulting booking object will NOT have an end time (endtime will be None) as PUMAPI doesn't provide one in that case!
Returns
  • pyppms.booking.PpmsBooking or None: The booking object, or None if there is no booking for the system or the request is refused by PUMAPI (e.g. "not authorized").
Raises
  • ValueError: Raised if the specified booking_type is invalid.
def get_current_booking(self, system_id):
491    def get_current_booking(self, system_id):
492        """Wrapper for `get_booking()` with 'booking_type' set to 'get'."""
493        return self.get_booking(system_id, "get")

Wrapper for get_booking() with 'booking_type' set to 'get'.

def get_group(self, group_id):
495    def get_group(self, group_id):
496        """Fetch group details from PPMS and create a dict from them.
497
498        Parameters
499        ----------
500        group_id : str
501            The group's identifier in PPMS, called 'unitlogin' there.
502
503        Returns
504        -------
505        dict
506            A dict with the group details, keys being derived from the header
507            line of the PUMAPI response, values from the data line.
508        """
509        response = self.request("getgroup", {"unitlogin": group_id})
510        log.trace("Group details returned by PPMS (raw): {}", response.text)
511
512        if not response.text:
513            msg = f"Group [{group_id}] is unknown to PPMS"
514            log.error(msg)
515            raise KeyError(msg)
516
517        details = dict_from_single_response(response.text)
518
519        log.trace("Details of group {}: {}", group_id, details)
520        return details

Fetch group details from PPMS and create a dict from them.

Parameters
  • group_id (str): The group's identifier in PPMS, called 'unitlogin' there.
Returns
  • dict: A dict with the group details, keys being derived from the header line of the PUMAPI response, values from the data line.
def get_group_users(self, unitlogin):
522    def get_group_users(self, unitlogin):
523        """Get all members of a group in PPMS.
524
525        Parameters
526        ----------
527        unitlogin : str
528            The group's login ("unique login or id" in the PPMS web interface).
529
530        Returns
531        -------
532        list(pyppms.user.PpmsUser)
533            A list with PpmsUser objects that are members of this PPMS group.
534        """
535        response = self.request("getgroupusers", {"unitlogin": unitlogin})
536
537        members = response.text.splitlines()
538        users = []
539        for username in members:
540            user = self.get_user(username)
541            users.append(user)
542        log.trace(
543            "{} members in PPMS group [{}]: {}",
544            len(members),
545            unitlogin,
546            ", ".join(members),
547        )
548        return users

Get all members of a group in PPMS.

Parameters
  • unitlogin (str): The group's login ("unique login or id" in the PPMS web interface).
Returns
def get_groups(self):
550    def get_groups(self):
551        """Get a list of all groups in PPMS.
552
553        Returns
554        -------
555        list(str)
556            A list with the group identifiers in PPMS.
557        """
558        response = self.request("getgroups")
559
560        groups = response.text.splitlines()
561        log.trace("{} groups in the PPMS database: {}", len(groups), ", ".join(groups))
562        return groups

Get a list of all groups in PPMS.

Returns
  • list(str): A list with the group identifiers in PPMS.
def get_next_booking(self, system_id):
564    def get_next_booking(self, system_id):
565        """Wrapper for `get_booking()` with 'booking_type' set to 'next'."""
566        return self.get_booking(system_id, "next")

Wrapper for get_booking() with 'booking_type' set to 'next'.

def get_running_sheet( self, core_facility_ref, date, ignore_uncached_users=False, localisation=''):
568    def get_running_sheet(
569        self, core_facility_ref, date, ignore_uncached_users=False, localisation=""
570    ):
571        """Get the running sheet for a specific day on the given facility.
572
573        The so-called "running-sheet" consists of all bookings / reservations of
574        a facility on a specifc day.
575
576        WARNING: PUMAPI doesn't return a proper unique user identifier with the
577        'getrunningsheet' request, instead the so called "full name" is given to
578        identify the user - unfortunately this can lead to ambiguities as
579        multiple different accounts can have the same full name.
580
581        Parameters
582        ----------
583        core_facility_ref : int or int-like
584            The core facility ID for PPMS.
585        date : datetime.datetime
586            The date to request the running sheet for, e.g. ``datetime.now()`` or
587            similar. Note that only the date part is relevant, time will be ignored.
588        ignore_uncached_users : bool, optional
589            If set to `True` any booking for a user that is not present in the instance
590            attribute `fullname_mapping` will be ignored in the resulting list.
591        localisation : str, optional
592            If given, the runningsheet will be limited to systems where the
593            `localisation` (~"room") field matches the given value.
594
595        Returns
596        -------
597        list(pyppms.booking.PpmsBooking)
598            A list with `PpmsBooking` objects for the given day. Empty in case
599            there are no bookings or parsing the response failed.
600        """
601        bookings = []
602        parameters = {
603            "plateformid": f"{core_facility_ref}",
604            "day": date.strftime("%Y-%m-%d"),
605        }
606        log.trace("Requesting runningsheet for {}", parameters["day"])
607        response = self.request("getrunningsheet", parameters)
608        try:
609            entries = parse_multiline_response(response.text, graceful=False)
610        except NoDataError:
611            # in case no bookings exist the response will be empty!
612            log.trace("Runningsheet for the given day was empty!")
613            return []
614        except Exception as err:  # pylint: disable-msg=broad-except
615            log.error("Parsing runningsheet details failed: {}", err)
616            log.trace("Runningsheet PUMPAI response was: >>>{}<<<", response.text)
617            return []
618
619        for entry in entries:
620            full = entry["User"]
621            if full not in self.fullname_mapping:
622                if ignore_uncached_users:
623                    log.debug(f"Ignoring booking for uncached / unknown user [{full}]")
624                    continue
625
626                log.debug(f"Booking refers an uncached user ({full}), updating users!")
627                self.update_users()
628
629            if full not in self.fullname_mapping:
630                log.error("PPMS doesn't seem to know user [{}], skipping", full)
631                continue
632
633            log.trace(
634                f"Booking for user '{self.fullname_mapping[full]}' ({full}) found"
635            )
636            system_name = entry["Object"]
637            # FIXME: add a test with one system name being a subset of another system
638            # (this will result in more than one result and should be fixed e.g. by
639            # adding an optional parameter "exact" to get_systems_matching() or
640            # similar)
641            system_ids = self.get_systems_matching(localisation, [system_name])
642            if len(system_ids) < 1:
643                if localisation:
644                    log.debug(f"Given criteria return zero systems for [{system_name}]")
645                else:
646                    log.warning(f"No systems matching criteria for [{system_name}]")
647                continue
648
649            if len(system_ids) > 1:
650                # NOTE: more than one result should not happen as PPMS doesn't allow for
651                # multiple systems having the same name - no result might happen though!
652                log.error("Ignoring booking for unknown system [{}]", system_name)
653                continue
654
655            booking = PpmsBooking.from_runningsheet(
656                entry,
657                system_ids[0],
658                self.fullname_mapping[full],
659                date,
660            )
661            bookings.append(booking)
662
663        return bookings

Get the running sheet for a specific day on the given facility.

The so-called "running-sheet" consists of all bookings / reservations of a facility on a specifc day.

WARNING: PUMAPI doesn't return a proper unique user identifier with the 'getrunningsheet' request, instead the so called "full name" is given to identify the user - unfortunately this can lead to ambiguities as multiple different accounts can have the same full name.

Parameters
  • core_facility_ref (int or int-like): The core facility ID for PPMS.
  • date (datetime.datetime): The date to request the running sheet for, e.g. datetime.now() or similar. Note that only the date part is relevant, time will be ignored.
  • ignore_uncached_users (bool, optional): If set to True any booking for a user that is not present in the instance attribute fullname_mapping will be ignored in the resulting list.
  • localisation (str, optional): If given, the runningsheet will be limited to systems where the localisation (~"room") field matches the given value.
Returns
  • list(pyppms.booking.PpmsBooking): A list with PpmsBooking objects for the given day. Empty in case there are no bookings or parsing the response failed.
def get_systems(self, force_refresh=False):
665    def get_systems(self, force_refresh=False):
666        """Get a dict with all systems in PPMS.
667
668        Parameters
669        ----------
670        force_refresh : bool, optional
671            If `True` the list of systems will be refreshed even if the object's
672            attribute `self.systems` is non-empty, by default `False`. Please
673            note that this will NOT skip the on-disk cache in case that exists!
674
675        Returns
676        -------
677        dict(pyppms.system.PpmsSystem)
678            A dict with `PpmsSystem` objects parsed from the PUMAPI response where
679            the system ID (int) is used as the dict's key. If parsing a system
680            fails for any reason, the system is skipped entirely.
681        """
682        if self.systems and not force_refresh:
683            log.trace("Using cached details for {} systems", len(self.systems))
684        else:
685            self.update_systems()
686
687        return self.systems

Get a dict with all systems in PPMS.

Parameters
  • force_refresh (bool, optional): If True the list of systems will be refreshed even if the object's attribute self.systems is non-empty, by default False. Please note that this will NOT skip the on-disk cache in case that exists!
Returns
  • dict(pyppms.system.PpmsSystem): A dict with PpmsSystem objects parsed from the PUMAPI response where the system ID (int) is used as the dict's key. If parsing a system fails for any reason, the system is skipped entirely.
def get_systems_matching(self, localisation, name_contains):
689    def get_systems_matching(self, localisation, name_contains):
690        """Query PPMS for systems with a specific location and name.
691
692        This method assembles a list of PPMS system IDs whose "localisation"
693        (room) field matches a given string and where the system name contains
694        at least one of the strings given as the `name_contains` parameter.
695
696        Parameters
697        ----------
698        localisation : str
699            A string that the system's "localisation" (i.e. the "Room" field in
700            the PPMS web interface) has to match. Can be an empty string which
701            will result in no filtering being done on the "Room" attribute.
702        name_contains : list(str)
703            A list of valid names (categories) of which the system's name has to
704            match at least one for being included. Supply an empty list for
705            skipping this filter.
706
707        Returns
708        -------
709        list(int)
710            A list with PPMS system IDs matching all of the given criteria.
711
712        Raises
713        ------
714        TypeError
715            Raised in case the `name_contains` parameter is of type `str` (it
716            needs to be `list(str)` instead).
717        """
718        if isinstance(name_contains, str):
719            raise TypeError("`name_contains` must be a list of str, not str!")
720
721        loc = localisation
722        loc_desc = f"with location matching [{localisation}]"
723        if localisation == "":
724            loc_desc = "(no location filter given)"
725
726        log.trace(
727            "Querying PPMS for systems {}, name matching any of {}",
728            loc_desc,
729            name_contains,
730        )
731        system_ids = []
732        systems = self.get_systems()
733        for sys_id, system in systems.items():
734            if loc.lower() not in str(system.localisation).lower():
735                log.trace(
736                    "System [{}] location ({}) is NOT matching ({}), ignoring",
737                    system.name,
738                    system.localisation,
739                    loc,
740                )
741                continue
742
743            # log.trace('System [{}] is matching location [{}], checking if '
744            #           'the name is matching any of the valid pattern {}',
745            #           system.name, loc, name_contains)
746            for valid_name in name_contains:
747                if valid_name in system.name:
748                    log.trace("System [{}] matches all criteria", system.name)
749                    system_ids.append(sys_id)
750                    break
751
752            # if sys_id not in system_ids:
753            #     log.trace('System [{}] does NOT match a valid name: {}',
754            #               system.name, name_contains)
755
756        log.trace("Found {} bookable systems {}", len(system_ids), loc_desc)
757        log.trace("IDs of matching bookable systems {}: {}", loc_desc, system_ids)
758        return system_ids

Query PPMS for systems with a specific location and name.

This method assembles a list of PPMS system IDs whose "localisation" (room) field matches a given string and where the system name contains at least one of the strings given as the name_contains parameter.

Parameters
  • localisation (str): A string that the system's "localisation" (i.e. the "Room" field in the PPMS web interface) has to match. Can be an empty string which will result in no filtering being done on the "Room" attribute.
  • name_contains (list(str)): A list of valid names (categories) of which the system's name has to match at least one for being included. Supply an empty list for skipping this filter.
Returns
  • list(int): A list with PPMS system IDs matching all of the given criteria.
Raises
  • TypeError: Raised in case the name_contains parameter is of type str (it needs to be list(str) instead).
def get_user(self, login_name, skip_cache=False):
760    def get_user(self, login_name, skip_cache=False):
761        """Fetch user details from PPMS and create a PpmsUser object from it.
762
763        Parameters
764        ----------
765        login_name : str
766            The user's PPMS login name.
767        skip_cache : bool, optional
768            Passed as-is to the :py:meth:`request()` method
769
770        Returns
771        -------
772        pyppms.user.PpmsUser
773            The user object created from the PUMAPI response. The object will be
774            additionally stored in the self.users dict using the login_name as
775            the dict's key.
776
777        Raises
778        ------
779        KeyError
780            Raised if the user doesn't exist in PPMS.
781        """
782        response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache)
783
784        if not response.text:
785            msg = f"User [{login_name}] is unknown to PPMS"
786            log.debug(msg)
787            raise KeyError(msg)
788
789        user = PpmsUser(response.text)
790        self.users[user.username] = user  # update / add to the cached user objs
791        self.fullname_mapping[user.fullname] = user.username
792        return user

Fetch user details from PPMS and create a PpmsUser object from it.

Parameters
  • login_name (str): The user's PPMS login name.
  • skip_cache (bool, optional): Passed as-is to the request()() method
Returns
  • pyppms.user.PpmsUser: The user object created from the PUMAPI response. The object will be additionally stored in the self.users dict using the login_name as the dict's key.
Raises
  • KeyError: Raised if the user doesn't exist in PPMS.
def get_user_dict(self, login_name, skip_cache=False):
794    def get_user_dict(self, login_name, skip_cache=False):
795        """Get details on a given user from PPMS.
796
797        Parameters
798        ----------
799        login_name : str
800            The PPMS account / login name of the user to query.
801        skip_cache : bool, optional
802            Passed as-is to the :py:meth:`request()` method
803
804        Returns
805        -------
806        dict
807            A dict with the user details returned by the PUMAPI.
808
809        Example
810        -------
811        >>> conn.get_user_dict('pyppms')
812        ... {
813        ...     u'active': True,
814        ...     u'affiliation': u'',
815        ...     u'bcode': u'',
816        ...     u'email': u'pyppms@python-facility.example',
817        ...     u'fname': u'PumAPI',
818        ...     u'lname': u'Python',
819        ...     u'login': u'pyppms',
820        ...     u'mustchbcode': False,
821        ...     u'mustchpwd': False',
822        ...     u'phone': u'+98 (76) 54 3210',
823        ...     u'unitlogin': u'pyppms'
824        ... }
825
826        Raises
827        ------
828        KeyError
829            Raised in case the user account is unknown to PPMS.
830        ValueError
831            Raised if the user details can't be parsed from the PUMAPI response.
832        """
833        response = self.request("getuser", {"login": login_name}, skip_cache=skip_cache)
834
835        if not response.text:
836            msg = f"User [{login_name}] is unknown to PPMS"
837            log.error(msg)
838            raise KeyError(msg)
839
840        # EXAMPLE:
841        # response.text = (
842        #     u'login,lname,fname,email,'
843        #     u'phone,bcode,affiliation,unitlogin,mustchpwd,mustchbcode,'
844        #     u'active\r\n'
845        #     u'"pyppms","Python","PumAPI","pyppms@python-facility.example",'
846        #     u'"+98 (76) 54 3210","","","pyppms",false,false,'
847        #     u'true\r\n'
848        # )
849        details = dict_from_single_response(response.text)
850        log.trace("Details for user [{}]: {}", login_name, details)
851        return details

Get details on a given user from PPMS.

Parameters
  • login_name (str): The PPMS account / login name of the user to query.
  • skip_cache (bool, optional): Passed as-is to the request()() method
Returns
  • dict: A dict with the user details returned by the PUMAPI.
Example
>>> conn.get_user_dict('pyppms')
... {
...     u'active': True,
...     u'affiliation': u'',
...     u'bcode': u'',
...     u'email': u'pyppms@python-facility.example',
...     u'fname': u'PumAPI',
...     u'lname': u'Python',
...     u'login': u'pyppms',
...     u'mustchbcode': False,
...     u'mustchpwd': False',
...     u'phone': u'+98 (76) 54 3210',
...     u'unitlogin': u'pyppms'
... }
Raises
  • KeyError: Raised in case the user account is unknown to PPMS.
  • ValueError: Raised if the user details can't be parsed from the PUMAPI response.
def get_user_experience(self, login=None, system_id=None):
853    def get_user_experience(self, login=None, system_id=None):
854        """Get user experience ("User rights") from PPMS.
855
856        Parameters
857        ----------
858        login : str, optional
859            An optional login name to request the experience / permissions for,
860            by default None
861        system_id : int, optional
862            An optional system ID to request the experience / permissions for,
863            by default None
864
865        Returns
866        -------
867        list(dict)
868            A list with dicts parsed from the user experience response.
869        """
870        data = {}
871        if login is not None:
872            data["login"] = login
873        if system_id is not None:
874            data["id"] = system_id
875        response = self.request("getuserexp", parameters=data)
876
877        parsed = parse_multiline_response(response.text)
878        log.trace(
879            "Received {} experience entries for filters [user:{}] and [id:{}]",
880            len(parsed),
881            login,
882            system_id,
883        )
884        return parsed

Get user experience ("User rights") from PPMS.

Parameters
  • login (str, optional): An optional login name to request the experience / permissions for, by default None
  • system_id (int, optional): An optional system ID to request the experience / permissions for, by default None
Returns
  • list(dict): A list with dicts parsed from the user experience response.
def get_user_ids(self, active=False):
886    def get_user_ids(self, active=False):
887        """Get a list with all user IDs in the PPMS system.
888
889        Parameters
890        ----------
891        active : bool, optional
892            Request only users marked as active in PPMS, by default False.
893            NOTE: "active" is a tri-state parameter in PPMS: "true", "false"
894            or empty!
895
896        Returns
897        -------
898        list
899            A list of all (or active-only) user IDs in PPMS.
900        """
901        # TODO: describe format of returned list and / or give an example!
902        parameters = {}
903        if active:
904            parameters["active"] = "true"
905
906        response = self.request("getusers", parameters)
907
908        users = response.text.splitlines()
909        active_desc = "active " if active else ""
910        log.trace("{} {}users in the PPMS database", len(users), active_desc)
911        log.trace(", ".join(users))
912        return users

Get a list with all user IDs in the PPMS system.

Parameters
  • active (bool, optional): Request only users marked as active in PPMS, by default False. NOTE: "active" is a tri-state parameter in PPMS: "true", "false" or empty!
Returns
  • list: A list of all (or active-only) user IDs in PPMS.
def get_users(self, force_refresh=False, active_only=True):
914    def get_users(self, force_refresh=False, active_only=True):
915        """Get user objects for all (or cached) PPMS users.
916
917        Parameters
918        ----------
919        force_refresh : bool, optional
920            Re-request information from PPMS even if user details have been
921            cached locally before, by default False.
922        active_only : bool, optional
923            If set to `False` also "inactive" users will be fetched from PPMS,
924            by default `True`.
925
926        Returns
927        -------
928        dict(pyppms.user.PpmsUser)
929            A dict of PpmsUser objects with the username (login) as key.
930        """
931        if self.users and not force_refresh:
932            log.trace("Using cached details for {} users", len(self.users))
933        else:
934            self.update_users(active_only=active_only)
935
936        return self.users

Get user objects for all (or cached) PPMS users.

Parameters
  • force_refresh (bool, optional): Re-request information from PPMS even if user details have been cached locally before, by default False.
  • active_only (bool, optional): If set to False also "inactive" users will be fetched from PPMS, by default True.
Returns
def get_users_emails(self, users=None, active=False):
938    def get_users_emails(self, users=None, active=False):
939        """Get a list of user email addresses. WARNING - very slow!
940
941        Parameters
942        ----------
943        users : list(str), optional
944            A list of login names to retrieve the email addresses for, if
945            omitted addresses for all (or active ones) will be requested.
946        active : bool, optional
947            Request only addresses of users marked as active in PPMS, by default
948            False. Will be ignored if a list of usernames is given explicitly.
949
950        Returns
951        -------
952        list(str)
953            Email addresses of the users requested.
954        """
955        emails = []
956        if users is None:
957            users = self.get_user_ids(active=active)
958        for user in users:
959            email = self.get_user_dict(user)["email"]
960            if not email:
961                log.warning("--- WARNING: no email for user [{}]! ---", user)
962                continue
963            # log.trace("{}: {}", user, email)
964            emails.append(email)
965
966        return emails

Get a list of user email addresses. WARNING - very slow!

Parameters
  • users (list(str), optional): A list of login names to retrieve the email addresses for, if omitted addresses for all (or active ones) will be requested.
  • active (bool, optional): Request only addresses of users marked as active in PPMS, by default False. Will be ignored if a list of usernames is given explicitly.
Returns
  • list(str): Email addresses of the users requested.
def get_users_with_access_to_system(self, system_id):
 968    def get_users_with_access_to_system(self, system_id):
 969        """Get a list of usernames allowed to book the system with the given ID.
 970
 971        Parameters
 972        ----------
 973        system_id : int or int-like
 974            The ID of the system to query permitted users for.
 975
 976        Returns
 977        -------
 978        list(str)
 979            A list of usernames ('login') with permissions to book the system
 980            with the given ID in PPMS.
 981
 982        Raises
 983        ------
 984        ValueError
 985            Raised in case parsing the response failes for any reason.
 986        """
 987        users = []
 988
 989        response = self.request("getsysrights", {"id": system_id})
 990        # this response has a unique format, so parse it directly here:
 991        try:
 992            lines = response.text.splitlines()
 993            for line in lines:
 994                permission, username = line.split(":")
 995                if permission.upper() == "D":
 996                    log.trace(
 997                        "User [{}] is deactivated for booking system [{}], skipping",
 998                        username,
 999                        system_id,
1000                    )
1001                    continue
1002
1003                log.trace(
1004                    "User [{}] has permission to book system [{}]", username, system_id
1005                )
1006                users.append(username)
1007
1008        except Exception as err:
1009            msg = (
1010                f"Unable to parse data returned by PUMAPI: {response.text} - "
1011                f"ERROR: {err}"
1012            )
1013            log.error(msg)
1014            raise ValueError(msg) from err
1015
1016        return users

Get a list of usernames allowed to book the system with the given ID.

Parameters
  • system_id (int or int-like): The ID of the system to query permitted users for.
Returns
  • list(str): A list of usernames ('login') with permissions to book the system with the given ID in PPMS.
Raises
  • ValueError: Raised in case parsing the response failes for any reason.
def give_user_access_to_system(self, username, system_id):
1018    def give_user_access_to_system(self, username, system_id):
1019        """Add permissions for a user to book a given system in PPMS.
1020
1021        Parameters
1022        ----------
1023        username : str
1024            The username ('login') to allow for booking the system.
1025        system_id : int or int-like
1026            The ID of the system to add the permission for.
1027
1028        Returns
1029        -------
1030        bool
1031            True in case the given username now has the permissions to book the
1032            system with the specified ID (or if the user already had them
1033            before), False otherwise.
1034        """
1035        return self.set_system_booking_permissions(username, system_id, "A")

Add permissions for a user to book a given system in PPMS.

Parameters
  • username (str): The username ('login') to allow for booking the system.
  • system_id (int or int-like): The ID of the system to add the permission for.
Returns
  • bool: True in case the given username now has the permissions to book the system with the specified ID (or if the user already had them before), False otherwise.
def new_user( self, login, lname, fname, email, ppms_group, phone=None, password=None):
1037    def new_user(  # pylint: disable-msg=too-many-arguments
1038        self, login, lname, fname, email, ppms_group, phone=None, password=None
1039    ):
1040        """Create a new user in PPMS.
1041
1042        The method is asking PPMS to create a new user account with the given details.
1043        In case an account with that login name already exists, it will log a warning
1044        and return without sending any further requests to PPMS.
1045
1046        Parameters
1047        ----------
1048        login : str
1049            The unique identifier for the user.
1050        lname : str
1051            The last name of the user.
1052        fname : str
1053            The first name of the user.
1054        email : str
1055            The email address of the user.
1056        ppms_group : str
1057            The unique identifier of the primary group of the new user. A new group will
1058            be created if no group with the given name exists.
1059        phone : str, optional
1060            The phone number of the user.
1061        password : str, optional
1062            The password for the user. If no password is set the user will not be able
1063            to log on to PPMS.
1064
1065        Raises
1066        ------
1067        RuntimeError
1068            Will be raised in case creating the user fails.
1069        """
1070        if self.user_exists(login):
1071            log.warning("NOT creating user [{}] as it already exists!", login)
1072            return
1073
1074        req_data = {
1075            "login": login,
1076            "lname": lname,
1077            "fname": fname,
1078            "email": email,
1079            "unitlogin": ppms_group,
1080        }
1081        if phone:
1082            req_data["phone"] = phone
1083        if password:
1084            req_data["pwd"] = password
1085
1086        response = self.request("newuser", req_data)
1087        if not "OK newuser" in response.text:
1088            msg = f"Creating new user failed: {response.text}"
1089            log.error(msg)
1090            raise RuntimeError(msg)
1091
1092        log.debug("Created user [{}] in PPMS.", login)
1093        log.trace("Response was: {}", response.text)

Create a new user in PPMS.

The method is asking PPMS to create a new user account with the given details. In case an account with that login name already exists, it will log a warning and return without sending any further requests to PPMS.

Parameters
  • login (str): The unique identifier for the user.
  • lname (str): The last name of the user.
  • fname (str): The first name of the user.
  • email (str): The email address of the user.
  • ppms_group (str): The unique identifier of the primary group of the new user. A new group will be created if no group with the given name exists.
  • phone (str, optional): The phone number of the user.
  • password (str, optional): The password for the user. If no password is set the user will not be able to log on to PPMS.
Raises
  • RuntimeError: Will be raised in case creating the user fails.
def remove_user_access_from_system(self, username, system_id):
1095    def remove_user_access_from_system(self, username, system_id):
1096        """Remove permissions for a user to book a given system in PPMS.
1097
1098        Parameters
1099        ----------
1100        username : str
1101            The username ('login') to remove booking permissions on the system.
1102        system_id : int or int-like
1103            The ID of the system to modify the permission for.
1104
1105        Returns
1106        -------
1107        bool
1108            True in case the given username now has the permissions to book the
1109            system with the specified ID (or if the user already had them
1110            before), False otherwise.
1111        """
1112        return self.set_system_booking_permissions(username, system_id, "D")

Remove permissions for a user to book a given system in PPMS.

Parameters
  • username (str): The username ('login') to remove booking permissions on the system.
  • system_id (int or int-like): The ID of the system to modify the permission for.
Returns
  • bool: True in case the given username now has the permissions to book the system with the specified ID (or if the user already had them before), False otherwise.
def set_system_booking_permissions(self, login, system_id, permission):
1114    def set_system_booking_permissions(self, login, system_id, permission):
1115        """Set permissions for a user on a given system in PPMS.
1116
1117        Parameters
1118        ----------
1119        username : str
1120            The username ('login') to allow for booking the system.
1121        system_id : int or int-like
1122            The ID of the system to add the permission for.
1123        permission : {'D', 'A', 'N', 'S'}
1124            The permission level to set for the user, one of:
1125              - ``D`` : deactivated
1126              - ``A`` : autonomous
1127              - ``N`` : novice
1128              - ``S`` : superuser
1129
1130        Returns
1131        -------
1132        bool
1133            True in case setting permissions for the given username on the
1134            system with the specified ID succeeded (or if the user already had
1135            those permissions before), False otherwise.
1136        """
1137
1138        def permission_name(shortname):
1139            """Closure to validate a permission level and return its long name.
1140
1141            Parameters
1142            ----------
1143            shortname : str
1144                A single character defining the permission level.
1145
1146            Returns
1147            -------
1148            str
1149                The long (human-readable) name of the permission level.
1150
1151            Raises
1152            ------
1153            KeyError
1154                Raised in case an invalid permission level was given.
1155            """
1156            mapping = {
1157                "D": "deactivated",
1158                "A": "autonomous",
1159                "N": "novice",
1160                "S": "superuser",
1161            }
1162            try:
1163                return mapping[shortname]
1164            except KeyError as err:
1165                raise KeyError(f"Invalid permission [{shortname}] given") from err
1166
1167        log.debug(
1168            "Setting permission level [{}] for user [{}] on system [{}]",
1169            permission_name(permission),
1170            login,
1171            system_id,
1172        )
1173
1174        parameters = {"id": system_id, "login": login, "type": permission}
1175        response = self.request("setright", parameters)
1176
1177        # NOTE: the 'setright' action will accept ANY permission type and return 'done'
1178        # on the request, so there is no way to check from the response if setting the
1179        # permission really worked!!
1180        # log.trace('Request returned text: {}', response.text)
1181        if response.text.lower().strip() == "done":
1182            log.trace(
1183                "User [{}] now has permission level [{}] on system [{}]",
1184                login,
1185                permission_name(permission),
1186                system_id,
1187            )
1188            return True
1189
1190        if "invalid user" in response.text.lower():
1191            log.warning("User [{}] doesn't seem to exist in PPMS", login)
1192        elif "system right not authorized" in response.text.lower():
1193            log.error(
1194                "Unable to set permissions for system {}: {}", system_id, response.text
1195            )
1196        else:
1197            log.error("Unexpected response, assuming request failed: {}", response.text)
1198
1199        return False

Set permissions for a user on a given system in PPMS.

Parameters
  • username (str): The username ('login') to allow for booking the system.
  • system_id (int or int-like): The ID of the system to add the permission for.
  • permission ({'D', 'A', 'N', 'S'}): The permission level to set for the user, one of:
    • D : deactivated
    • A : autonomous
    • N : novice
    • S : superuser
Returns
  • bool: True in case setting permissions for the given username on the system with the specified ID succeeded (or if the user already had those permissions before), False otherwise.
def update_systems(self):
1201    def update_systems(self):
1202        """Update cached details for all bookable systems from PPMS.
1203
1204        Get the details on all bookable systems from PPMS and store them in the local
1205        cache. If parsing the PUMAPI response for a system fails for any reason, the
1206        system is skipped entirely.
1207        """
1208        log.trace("Updating list of bookable systems...")
1209        systems = {}
1210        parse_fails = 0
1211        response = self.request("getsystems")
1212        details = parse_multiline_response(response.text, graceful=False)
1213        for detail in details:
1214            try:
1215                system = PpmsSystem(detail)
1216            except ValueError as err:
1217                log.error("Error processing `getsystems` response: {}", err)
1218                parse_fails += 1
1219                continue
1220
1221            systems[system.system_id] = system
1222
1223        log.trace(
1224            "Updated {} bookable systems from PPMS ({} systems failed parsing)",
1225            len(systems),
1226            parse_fails,
1227        )
1228
1229        self.systems = systems

Update cached details for all bookable systems from PPMS.

Get the details on all bookable systems from PPMS and store them in the local cache. If parsing the PUMAPI response for a system fails for any reason, the system is skipped entirely.

def update_users(self, user_ids=[], active_only=True):
1231    def update_users(self, user_ids=[], active_only=True):
1232        """Update cached details for a list of users from PPMS.
1233
1234        Get the user details on a list of users (or all active ones) from PPMS and store
1235        them in the object's `users` dict. As a side effect, this will also fill the
1236        cache directory in case the object's `cache_path` attribute is set.
1237
1238        WARNING - very slow, especially when the PPMS instance has many users!
1239
1240        Parameters
1241        ----------
1242        user_ids : list(str), optional
1243            A list of user IDs (login names) to request the cache for, by
1244            default [] which will result in all *active* users to be requested.
1245        active_only : bool, optional
1246            If set to `False` also "inactive" users will be fetched from PPMS,
1247            by default `True`.
1248        """
1249        if not user_ids:
1250            user_ids = self.get_user_ids(active=active_only)
1251
1252        log.trace("Updating details on {} users", len(user_ids))
1253        for user_id in user_ids:
1254            self.get_user(user_id, skip_cache=True)
1255
1256        log.debug("Collected details on {} users", len(self.users))

Update cached details for a list of users from PPMS.

Get the user details on a list of users (or all active ones) from PPMS and store them in the object's users dict. As a side effect, this will also fill the cache directory in case the object's cache_path attribute is set.

WARNING - very slow, especially when the PPMS instance has many users!

Parameters
  • user_ids (list(str), optional): A list of user IDs (login names) to request the cache for, by default [] which will result in all active users to be requested.
  • active_only (bool, optional): If set to False also "inactive" users will be fetched from PPMS, by default True.
def user_exists(self, login):
1258    def user_exists(self, login):
1259        """Check if an account with the given login name already exists in PPMS.
1260
1261        Parameters
1262        ----------
1263        login : str
1264            The login name to check for.
1265
1266        Returns
1267        -------
1268        bool
1269            True in case an account with that name exists in PPMS, false otherwise.
1270        """
1271        try:
1272            self.get_user(login)
1273            return True
1274        except KeyError:
1275            return False

Check if an account with the given login name already exists in PPMS.

Parameters
  • login (str): The login name to check for.
Returns
  • bool: True in case an account with that name exists in PPMS, false otherwise.