67 lines
1.9 KiB
Python
67 lines
1.9 KiB
Python
|
import logging
|
||
|
from ldap3 import Server, Connection, ALL
|
||
|
from ldap3.core.exceptions import LDAPException
|
||
|
|
||
|
|
||
|
class LLDAP:
|
||
|
def __init__(
|
||
|
self,
|
||
|
ldap_server: str,
|
||
|
bind_dn: str,
|
||
|
bind_password: str,
|
||
|
user_dn: str = "ou=people,dc=example,dc=com",
|
||
|
group_dn: str = "ou=people,dc=example,dc=com"
|
||
|
):
|
||
|
self.bind_dn = bind_dn
|
||
|
self.bind_password = bind_password
|
||
|
|
||
|
self.server = Server(ldap_server, get_info=ALL)
|
||
|
self.conn = None
|
||
|
|
||
|
self.user_dn = user_dn
|
||
|
self.group_dn = group_dn
|
||
|
|
||
|
def do_bind(self):
|
||
|
try:
|
||
|
if not self.conn:
|
||
|
self.conn = Connection(
|
||
|
self.server,
|
||
|
user=self.bind_dn,
|
||
|
password=self.bind_password,
|
||
|
auto_bind=True
|
||
|
)
|
||
|
logging.info("Successfully bound to LDAP server.")
|
||
|
except LDAPException as e:
|
||
|
logging.error(f"Failed to bind to LDAP server: {e}")
|
||
|
raise
|
||
|
|
||
|
def do_unbind(self):
|
||
|
if self.conn:
|
||
|
self.conn.unbind()
|
||
|
self.conn = None
|
||
|
logging.info("Successfully unbound from the LDAP server.")
|
||
|
|
||
|
def list_groups(self):
|
||
|
try:
|
||
|
self.do_bind()
|
||
|
self.conn.search(self.group_dn, "(objectClass=*)", attributes=['cn'])
|
||
|
entries = self.conn.entries
|
||
|
return entries
|
||
|
except LDAPException as e:
|
||
|
logging.error(f"Error during LDAP search: {e}")
|
||
|
raise
|
||
|
finally:
|
||
|
self.do_unbind()
|
||
|
|
||
|
def list_users(self):
|
||
|
try:
|
||
|
self.do_bind()
|
||
|
self.conn.search(self.user_dn, "(uid=*)", attributes=['dn'])
|
||
|
entries = self.conn.entries
|
||
|
return entries
|
||
|
except LDAPException as e:
|
||
|
logging.error(f"Error during LDAP search: {e}")
|
||
|
raise
|
||
|
finally:
|
||
|
self.do_unbind()
|