Source code for slixmpp_omemo.base_session_manager

from abc import abstractmethod
import enum
import sys
from typing import FrozenSet, Optional, Set

from omemo.session_manager import SessionManager, UnknownTrustLevel
from omemo.types import DeviceInformation, TrustLevel as CoreTrustLevel

if sys.version_info >= (3, 11):
    from typing import assert_never
else:
    from typing_extensions import assert_never


__all__ = [
    "TrustLevel",
    "BaseSessionManager"
]


[docs] @enum.unique class TrustLevel(enum.Enum): """ Trust levels modeling Blind Trust Before Verification (BTBV) and manual trust. """ TRUSTED = "TRUSTED" BLINDLY_TRUSTED = "BLINDLY_TRUSTED" UNDECIDED = "UNDECIDED" DISTRUSTED = "DISTRUSTED"
[docs] class BaseSessionManager(SessionManager): """ Partial :class:`~omemo.session_manager.SessionManager` implementation with BTBV and manual trust as its trust systems. """
[docs] async def _evaluate_custom_trust_level(self, device: DeviceInformation) -> CoreTrustLevel: try: trust_level = TrustLevel(device.trust_level_name) except ValueError as e: raise UnknownTrustLevel(f"Unknown trust level name: {device.trust_level_name}") from e # Those custom trust levels map directly to core trust levels if trust_level is TrustLevel.TRUSTED: return CoreTrustLevel.TRUSTED if trust_level is TrustLevel.UNDECIDED: return CoreTrustLevel.UNDECIDED if trust_level is TrustLevel.DISTRUSTED: return CoreTrustLevel.DISTRUSTED # The blindly trusted state maps differently depending on whether BTBV is enabled if trust_level is TrustLevel.BLINDLY_TRUSTED: if self._btbv_enabled: # The blindly trusted state is equivalent to the trusted state when BTBV is enabled. return CoreTrustLevel.TRUSTED # The blindly trusted state is equivalent to the undecided state when BTBV is disabled. return CoreTrustLevel.UNDECIDED assert_never(trust_level)
[docs] async def _make_trust_decision( self, undecided: FrozenSet[DeviceInformation], identifier: Optional[str] ) -> None: # For BTBV, affected JIDs can be separated into two pools: one pool of JIDs for which blind trust is # active, i.e. no manual verification was performed before, and one pool of JIDs to use manual trust # with instead. bare_jids = { device.bare_jid for device in undecided } blind_trust_bare_jids: Set[str] = set() manual_trust_bare_jids: Set[str] = set() # For each bare JID, decide whether blind trust applies for bare_jid in bare_jids: # Get all known devices belonging to the bare JID devices = await self.get_device_information(bare_jid) # If BTBV is disabled, use manual trust if not self._btbv_enabled: manual_trust_bare_jids.add(bare_jid) continue # If the trust levels of all devices correspond to those used by blind trust, blind trust # applies. # Otherwise, fall back to manual trust. if all(TrustLevel(device.trust_level_name) in { TrustLevel.UNDECIDED, TrustLevel.BLINDLY_TRUSTED } for device in devices): blind_trust_bare_jids.add(bare_jid) else: manual_trust_bare_jids.add(bare_jid) # With the JIDs sorted into their respective pools, the undecided devices can be categorized too blindly_trusted_devices = { dev for dev in undecided if dev.bare_jid in blind_trust_bare_jids } manually_trusted_devices = { dev for dev in undecided if dev.bare_jid in manual_trust_bare_jids } # Blindly trust devices handled by blind trust if len(blindly_trusted_devices) > 0: for device in blindly_trusted_devices: await self.set_trust( device.bare_jid, device.identity_key, TrustLevel.BLINDLY_TRUSTED.name ) await self._devices_blindly_trusted(frozenset(blindly_trusted_devices), identifier) # Prompt the user for manual trust decisions on the devices handled by manual trust if len(manually_trusted_devices) > 0: await self._prompt_manual_trust(frozenset(manually_trusted_devices), identifier)
@property @abstractmethod def _btbv_enabled(self) -> bool: """ Returns: Whether BTBV is enabled. """
[docs] async def _devices_blindly_trusted( self, blindly_trusted: FrozenSet[DeviceInformation], identifier: Optional[str] ) -> None: """ Get notified about newly blindly trusted devices. This method is called automatically by :meth:`_make_trust_decision` whenever at least one device was blindly trusted. You can use this method for example to notify the user about the automated change in trust. Does nothing by default. Args: blindly_trusted: A set of devices that were blindly trusted. identifier: Forwarded from :meth:`_make_trust_decision`, refer to its documentation for details. """
[docs] @abstractmethod async def _prompt_manual_trust( self, manually_trusted: FrozenSet[DeviceInformation], identifier: Optional[str] ) -> None: """ Prompt manual trust decision on a set of undecided identity keys. The trust decisions are expected to be persisted by calling :meth:`~omemo.session_manager.SessionManager.set_trust`. Args: manually_trusted: A set of devices whose trust has to be manually decided by the user. identifier: Forwarded from :meth:`_make_trust_decision`, refer to its documentation for details. Raises: TrustDecisionFailed: if for any reason the trust decision failed/could not be completed. Feel free to raise a subclass instead. Note: This is called when the encryption needs to know whether it is allowed to encrypt for these devices or not. When this method returns, all previously undecided trust levels should have been replaced by calling :meth:`~omemo.session_manager.SessionManager.set_trust` with a different trust level. If they are not replaced or still evaluate to the undecided trust level after the call, the encryption will fail with an exception. See :meth:`~omemo.session_manager.SessionManager.encrypt` for details. """