|
| 1 | +""" |
| 2 | +connection.py - wraps class _ldap.LDAPObject |
| 3 | +
|
| 4 | +See https://www.python-ldap.org/ for details. |
| 5 | +""" |
| 6 | + |
| 7 | +from ldap.pkginfo import __version__, __author__, __license__ |
| 8 | + |
| 9 | +__all__ = [ |
| 10 | + 'Connection', |
| 11 | +] |
| 12 | + |
| 13 | + |
| 14 | +from numbers import Real |
| 15 | +from typing import Optional, Union |
| 16 | + |
| 17 | +import ldap |
| 18 | +from ldap.controls import DecodeControlTuples, LDAPControl |
| 19 | +from ldap.extop import ExtendedRequest |
| 20 | +from ldap.ldapobject import SimpleLDAPObject, NO_UNIQUE_ENTRY |
| 21 | +from ldap.response import ( |
| 22 | + Response, |
| 23 | + SearchEntry, SearchReference, SearchResult, |
| 24 | + IntermediateResponse, ExtendedResult, |
| 25 | +) |
| 26 | + |
| 27 | +Controls = Optional[list[LDAPControl]] |
| 28 | + |
| 29 | + |
| 30 | +# TODO: remove _ext and _s functions as we rework request API |
| 31 | +class Connection(SimpleLDAPObject): |
| 32 | + resp_ctrl_classes = None |
| 33 | + |
| 34 | + def result(self, msgid: int = ldap.RES_ANY, *, all: int = 1, |
| 35 | + timeout: Optional[float] = None) -> Optional[list[Response]]: |
| 36 | + """ |
| 37 | + result([msgid: int = RES_ANY [, all: int = 1 [, timeout : |
| 38 | + Optional[float] = None]]]) -> Optional[list[Response]] |
| 39 | +
|
| 40 | + This method is used to wait for and return the result of an |
| 41 | + operation previously initiated by one of the LDAP asynchronous |
| 42 | + operation routines (e.g. search(), modify(), etc.) They all |
| 43 | + return an invocation identifier (a message id) upon successful |
| 44 | + initiation of their operation. This id is guaranteed to be |
| 45 | + unique across an LDAP session, and can be used to request the |
| 46 | + result of a specific operation via the msgid parameter of the |
| 47 | + result() method. |
| 48 | +
|
| 49 | + If the result of a specific operation is required, msgid should |
| 50 | + be set to the invocation message id returned when the operation |
| 51 | + was initiated; otherwise RES_ANY should be supplied. |
| 52 | +
|
| 53 | + The all parameter is used to wait until a final response for |
| 54 | + a given operation is received, this is useful with operations |
| 55 | + (like search) that generate multiple responses and is used |
| 56 | + to select whether a single item should be returned or to wait |
| 57 | + for all the responses before returning. |
| 58 | +
|
| 59 | + Using search as an example: A search response is made up of |
| 60 | + zero or more search entries followed by a search result. If all |
| 61 | + is 0, search entries will be returned one at a time as they |
| 62 | + come in, via separate calls to result(). If all is 1, the |
| 63 | + search response will be returned in its entirety, i.e. after |
| 64 | + all entries and the final search result have been received. If |
| 65 | + all is 2, all search entries that have been received so far |
| 66 | + will be returned. |
| 67 | +
|
| 68 | + The method returns a list of messages or None if polling and no |
| 69 | + messages arrived yet. |
| 70 | +
|
| 71 | + The result() method will block for timeout seconds, or |
| 72 | + indefinitely if timeout is negative. A timeout of 0 will |
| 73 | + effect a poll. The timeout can be expressed as a floating-point |
| 74 | + value. If timeout is None the default in self.timeout is used. |
| 75 | +
|
| 76 | + If a timeout occurs, a TIMEOUT exception is raised, unless |
| 77 | + polling (timeout = 0), in which case None is returned. |
| 78 | + """ |
| 79 | + |
| 80 | + if timeout is None: |
| 81 | + timeout = self.timeout |
| 82 | + |
| 83 | + messages = self._ldap_call(self._l.result, msgid, all, timeout) |
| 84 | + |
| 85 | + if messages is None: |
| 86 | + return None |
| 87 | + |
| 88 | + results = [] |
| 89 | + for msgid, msgtype, controls, data in messages: |
| 90 | + controls = DecodeControlTuples(controls, self.resp_ctrl_classes) |
| 91 | + |
| 92 | + m = Response(msgid, msgtype, controls, **data) |
| 93 | + results.append(m) |
| 94 | + |
| 95 | + return results |
| 96 | + |
| 97 | + def bind_s(self, dn: Optional[str] = None, |
| 98 | + cred: Union[None, str, bytes] = None, *, |
| 99 | + method: int = ldap.AUTH_SIMPLE, |
| 100 | + ctrls: Controls = None) -> ldap.response.BindResult: |
| 101 | + msgid = self.bind(dn, cred, method) |
| 102 | + responses = self.result(msgid) |
| 103 | + result, = responses |
| 104 | + return result |
| 105 | + |
| 106 | + def compare_s(self, dn: str, attr: str, value: bytes, *, |
| 107 | + ctrls: Controls = None) -> ldap.response.CompareResult: |
| 108 | + "TODO: remove _s functions introducing a better request API" |
| 109 | + msgid = self.compare_ext(dn, attr, value, serverctrls=ctrls) |
| 110 | + responses = self.result(msgid) |
| 111 | + result, = responses |
| 112 | + return bool(result) |
| 113 | + |
| 114 | + def delete_s(self, dn: str, *, |
| 115 | + ctrls: Controls = None) -> ldap.response.DeleteResult: |
| 116 | + msgid = self.delete_ext(dn, serverctrls=ctrls) |
| 117 | + responses = self.result(msgid) |
| 118 | + result, = responses |
| 119 | + return result |
| 120 | + |
| 121 | + def extop_s(self, oid: Optional[str] = None, |
| 122 | + value: Optional[bytes] = None, *, |
| 123 | + request: Optional[ExtendedRequest] = None, |
| 124 | + ctrls: Controls = None |
| 125 | + ) -> list[Union[IntermediateResponse, ExtendedResult]]: |
| 126 | + if request is not None: |
| 127 | + oid = request.requestName |
| 128 | + value = request.encodedRequestValue() |
| 129 | + |
| 130 | + msgid = self.extop(oid, value, serverctrls=ctrls) |
| 131 | + return self.result(msgid) |
| 132 | + |
| 133 | + def search_s(self, base: Optional[str] = None, |
| 134 | + scope: int = ldap.SCOPE_SUBTREE, |
| 135 | + filter: str = "(objectClass=*)", |
| 136 | + attrlist: Optional[list[str]] = None, *, |
| 137 | + attrsonly: bool = False, |
| 138 | + ctrls: Controls = None, |
| 139 | + sizelimit: int = 0, timelimit: int = -1, |
| 140 | + timeout: Optional[Real] = None |
| 141 | + ) -> list[Union[SearchEntry, SearchReference]]: |
| 142 | + if timeout is None: |
| 143 | + timeout = timelimit |
| 144 | + |
| 145 | + msgid = self.search_ext(base, scope, filter, attrlist=attrlist, |
| 146 | + attrsonly=attrsonly, serverctrls=ctrls, |
| 147 | + sizelimit=sizelimit, timeout=timelimit) |
| 148 | + result = self.result(msgid, timeout=timeout) |
| 149 | + result[-1].raise_for_result() |
| 150 | + return result[:-1] |
| 151 | + |
| 152 | + def search_subschemasubentry_s( |
| 153 | + self, dn: Optional[str] = None) -> Optional[str]: |
| 154 | + """ |
| 155 | + Returns the distinguished name of the sub schema sub entry |
| 156 | + for a part of a DIT specified by dn. |
| 157 | +
|
| 158 | + None as result indicates that the DN of the sub schema sub entry could |
| 159 | + not be determined. |
| 160 | + """ |
| 161 | + empty_dn = '' |
| 162 | + attrname = 'subschemaSubentry' |
| 163 | + if dn is None: |
| 164 | + dn = empty_dn |
| 165 | + try: |
| 166 | + r = self.search_s(dn, ldap.SCOPE_BASE, None, [attrname]) |
| 167 | + except (ldap.NO_SUCH_OBJECT, ldap.NO_SUCH_ATTRIBUTE, |
| 168 | + ldap.INSUFFICIENT_ACCESS): |
| 169 | + r = [] |
| 170 | + except ldap.UNDEFINED_TYPE: |
| 171 | + return None |
| 172 | + |
| 173 | + attr = r and ldap.cidict.cidict(r[0].attrs).get(attrname) |
| 174 | + if attr: |
| 175 | + return attr[0].decode('utf-8') |
| 176 | + elif dn: |
| 177 | + # Try to find sub schema sub entry in root DSE |
| 178 | + return self.search_subschemasubentry_s(dn=empty_dn) |
| 179 | + else: |
| 180 | + # If dn was already rootDSE we can return here |
| 181 | + return None |
| 182 | + |
| 183 | + def read_s(self, dn: str, filterstr: Optional[str] = None, |
| 184 | + attrlist: Optional[list[str]] = None, ctrls: Controls = None, |
| 185 | + timeout: int = -1) -> dict[str, bytes]: |
| 186 | + """ |
| 187 | + Reads and returns a single entry specified by `dn'. |
| 188 | +
|
| 189 | + Other attributes just like those passed to `search_s()' |
| 190 | + """ |
| 191 | + r = self.search_s(dn, ldap.SCOPE_BASE, filterstr, |
| 192 | + attrlist=attrlist, ctrls=ctrls, timeout=timeout) |
| 193 | + if r: |
| 194 | + return r[0].attrs |
| 195 | + else: |
| 196 | + return None |
| 197 | + |
| 198 | + def find_unique_entry(self, base: Optional[str] = None, |
| 199 | + scope: int = ldap.SCOPE_SUBTREE, |
| 200 | + filter: str = "(objectClass=*)", |
| 201 | + attrlist: Optional[list[str]] = None, *, |
| 202 | + attrsonly: bool = False, |
| 203 | + ctrls: Controls = None, |
| 204 | + timelimit: int = -1, |
| 205 | + timeout: Optional[Real] = None |
| 206 | + ) -> list[Union[SearchEntry, SearchReference]]: |
| 207 | + """ |
| 208 | + Returns a unique entry, raises exception if not unique |
| 209 | + """ |
| 210 | + r = self.search_s(base, scope, filter, attrlist=attrlist, |
| 211 | + attrsonly=attrsonly, ctrls=ctrls, timeout=timeout, |
| 212 | + sizelimit=2) |
| 213 | + if len(r) != 1: |
| 214 | + raise NO_UNIQUE_ENTRY(f'No or non-unique search result for {filter}') |
| 215 | + return r[0] |
0 commit comments