网页设计模板图片素材上海关键词优化按天计费
想要将内部系统认证与superset打通,必须要了解superset的认证体系。
Superset的认证体系
Superset的认证体系可以通过以下几种方式进行配置:
-
基于LDAP认证:Superset可以集成LDAP以验证用户身份。在这种情况下,Superset将根据LDAP中的用户信息进行身份验证,并从LDAP中获取用户属性和组织结构信息。
-
基于OAuth2认证:Superset支持OAuth2认证和授权。在这种情况下,Superset将重定向用户到OAuth2提供商的登录页,并在用户授权时获取访问令牌以进行身份验证。
-
本地认证:Superset还支持本地认证,其中用户可以在Superset中创建帐户并设置密码。在这种情况下,Superset将使用这些凭据来验证用户身份。
无论使用哪种认证方式,管理员都可以控制哪些用户可以访问Superset中的特定资源。例如,可以定义哪些用户可以访问特定的数据源、仪表板或视图。此外,管理员还可以定义角色和权限,以控制用户在Superset中的操作范围。
二次开发,重写认证
系统配置文件: \superset\config.py
配置缓存
在config.py里面配置
Reids_Url = '127.0.0.1'
Reids_Port = 6379# Cache for datasource metadata and query results
DATA_CACHE_CONFIG: CacheConfig = {'CACHE_TYPE': 'redis', # 使用 Redis'CACHE_REDIS_HOST': Reids_Url, # 配置域名'CACHE_REDIS_PORT': Reids_Port , # 配置端口号'CACHE_REDIS_URL': 'redis://' + Reids_Url + ':' + str(Reids_Port)}
配置认证管理类
在config.py里面配置
from superset.custom_sso_security_manager import CustomSsoSecurityManagerCUSTOM_SECURITY_MANAGER = CustomSsoSecurityManager
新建类
自定义视图:MyAuthRemoteUserView
class MyAuthRemoteUserView(AuthRemoteUserView):# this front-end template should be put under the folder `superset/templates/appbuilder/general/security`# so that superset could find this templates to renderlogin_template = 'appbuilder/general/security/login_my.html'title = "My Login"@expose('/test', methods=['GET', 'POST'])def test(self):data = {}# for name in request.environ:# if (type(request.environ[name]) == str or type(request.environ[name]) == int):# data[name] = request.environ[name]# return dataresponse = _wz_redirect('/login/')print(response.headers)return response# this method is going to overwrite# https://github.com/dpgaspar/Flask-AppBuilder/blob/master/flask_appbuilder/security/views.py#L556@expose('/login/', methods=['GET', 'POST'])def login(self):next_url = get_real_scheme_next_url()if g.user and g.user.get_id():return redirect(next_url)token= request.cookies.get("token")if (tokenNone or token== ''):print("token未获取到!")return redirect(CustomSsoSecurityManager.CAS_LOGIN_SERVER_URL + "?redirect_uri=" + self.get_login_redirect_uri())manager = self.appbuilder.smheader = {"cookie": "token=" + userToken,}result = requests.post(CustomSsoSecurityManager.CAS_CHECK_SERVER_URL,headers=header)resultWarp = json.loads(result.content)if (not resultWarp["success"]):return redirect(CustomSsoSecurityManager.CAS_LOGIN_SERVER_URL + "?redirect_uri=" + self.get_login_redirect_uri())cas_user = resultWarp["result"]username = cas_user['username']user = manager.find_user(username=username)print(user)# User does not exist, create one if auto user registration.if user is None and manager.auth_user_registration:user = manager.add_user(# All we have is REMOTE_USER, so we set# the other fields to blank.username=username,first_name=cas_user['name'],last_name='',email=username,role=manager.find_role(manager.auth_user_registration_role))# If user does not exist on the DB and not auto user registration,# or user is inactive, go away.elif user is None or (not user.is_active):logger.info(LOGMSG_WAR_SEC_LOGIN_FAILED.format(username))return Nonemanager.update_user_auth_stat(user)login_user(user, remember=False)session[SESSION_CAS_USER] = cas_usersession[SESSION_HCP] = hcpCustomSsoSecurityManager.get_data_auth(hcp, cas_user['id'])return redirect(next_url)def get_Login_url(self):return get_root_url() + '/login/'def get_login_redirect_uri(self):next_url = get_real_scheme_next_url()return parse.quote(self.get_Login_url() + "?next=" + next_url)@expose("/logout/")def logout(self):logout_user()userToken= request.cookies.get("token")if userToken:header = {"cookie": "token=" + userToken,}data = {"token": hcp}requests.post(CustomSsoSecurityManager.CAS_LOGINOUT_SERVER_URL, json=data,headers=header)if CustomSsoSecurityManager.SESSION_CAS_USER in session:session.pop(CustomSsoSecurityManager.SESSION_CAS_USER)if CustomSsoSecurityManager.SESSION_HCP in session:session.pop(CustomSsoSecurityManager.SESSION_HCP)if CustomSsoSecurityManager.SESSION_CAS_DATA_AUTH in session:session.pop(CustomSsoSecurityManager.SESSION_CAS_DATA_AUTH)return redirect(CustomSsoSecurityManager.CAS_LOGIN_SERVER_URL + '?redirect_uri=' + self.get_Login_url())
自定义管理类:CustomSsoSecurityManager
class CustomSsoSecurityManager(SupersetSecurityManager):authremoteuserview = MyAuthRemoteUserViewSESSION_CAS_USER = "_CAS_USER"SESSION_CAS_DATA_AUTH = "_CAS_DATA_AUTH"CAS_LOGIN_SERVER_URL = NoneCAS_CHECK_SERVER_URL = NoneCAS_LOGINOUT_SERVER_URL = NoneCAS_DATA_AUTH_URL = Noneconfig = Nonedef __init__(self, appbuilder):super().__init__(appbuilder)CustomSsoSecurityManager.get_cas_url(appbuilder.app.config)@classmethoddef get_cas_url(cls, config):cls.config = configcls.CAS_LOGIN_SERVER_URL = cls.config.get("SSO_LOGIN_URL")cls.CAS_CHECK_SERVER_URL = cls.config.get("SSO_SERVER_API_URL") + '/getCurrentUser'cls.CAS_LOGINOUT_SERVER_URL = cls.config.get("SSO_SERVER_API_URL") + '/logout'cls.CAS_DATA_AUTH_URL = cls.config.get("SSO_SERVER_API_URL") + '/getDataAuthByUserId'# 获取数据权限@classmethoddef get_data_auth(cls, token, user_id):data = {"moduleId": "SAAS_BI_DATA_AUTHORITY", "userId": user_id}result = requests.post(cls.CAS_DATA_AUTH_URL, json=data,headers=header)# print(result.content)result_warp = json.loads(result.content)if (not result_warp["success"]):else:session[cls.SESSION_CAS_DATA_AUTH] = result_warp['result']def load_user(self, pk):return self.get_user_by_id(int(pk))def setCache(self, key, value, expire):r.set(key, value)if expire is None or expire < 10:expire = 60 * 5r.expire(key, expire)def load_user3(self):token= request.cookies.get("token")biUserCacheData = {}if (tokenis None or token== ''):user = self.lm.anonymous_user()return self.lm._update_request_context_with_user(user)biUserKey = tokenbiUserInfoCache = NoneuserCache = NonebiUserCacheData = Noneif r.hexists(biUserKey, "apiUserInfo"):biUserInfoCache = r.hget(biUserKey, "apiUserInfo")userCache = biUserInfoCache.decode()biUserCacheData = json.loads(userCache)if biUserCacheData["success"] == False:biUserInfoCache = Noneif biUserInfoCache:cas_user = biUserCacheData["result"]session[CustomSsoSecurityManager.SESSION_CAS_USER] = cas_usersession[CustomSsoSecurityManager.SESSION_HCP] = tokenusername = cas_user['username']manager = self.appbuilder.smuser = manager.find_user(username=username)apiUserAuth = r.hget(biUserKey, "apiUserAuth")result_warp = json.loads(apiUserAuth.decode())if (not result_warp["success"]):user = self.lm.anonymous_user()return self.lm._update_request_context_with_user(user)else:session[self.SESSION_CAS_DATA_AUTH] = result_warp['result']else:manager = self.appbuilder.smheader = {"cookie": "token=" + token}result = requests.post(CustomSsoSecurityManager.CAS_CHECK_SERVER_URL,headers=header)resultWarp = json.loads(result.content)r.hset(biUserKey, "apiUserInfo", result.content)if (not resultWarp["success"]):user = self.lm.anonymous_user()return self.lm._update_request_context_with_user(user)cas_user = resultWarp["result"]username = cas_user['username']user = manager.find_user(username=username)# User does not exist, create one if auto user registration.if user is None and manager.auth_user_registration:user = manager.add_user(# All we have is REMOTE_USER, so we set# the other fields to blank.username=username,first_name=cas_user['name'],last_name='',email=username,role=manager.find_role(manager.auth_user_registration_role))# If user does not exist on the DB and not auto user registration,# or user is inactive, go away.elif user is None or (not user.is_active):logger.info(LOGMSG_WAR_SEC_LOGIN_FAILED.format(username))user = self.lm.anonymous_user()return self.lm._update_request_context_with_user(user)manager.update_user_auth_stat(user)login_user(user, remember=False)session[CustomSsoSecurityManager.SESSION_CAS_USER] = cas_usersession[CustomSsoSecurityManager.SESSION_HCP] = tokenheader = {"cookie": "token=" + token}data = {"moduleId": "SAAS_BI_DATA_AUTHORITY", "userId": cas_user['id']}result = requests.post(self.CAS_DATA_AUTH_URL, json=data,headers=header)# print(result.content)result_warp = json.loads(result.content)r.hset(biUserKey, "apiUserAuth", result.content)if (not result_warp["success"]):user = self.lm.anonymous_user()return self.lm._update_request_context_with_user(user)else:session[self.SESSION_CAS_DATA_AUTH] = result_warp['result']return self.lm._update_request_context_with_user(user)def create_login_manager(self, app: Flask) -> LoginManager:lm = super().create_login_manager(app)lm.request_loader(self.request_loader)lm._load_user = self.load_user3self.lm = lmreturn lm@classmethoddef get_data_auth_from_session(cls):return session[cls.SESSION_CAS_DATA_AUTH]@classmethoddef get_cas_user_from_session(cls):return session[cls.SESSION_CAS_USER]
参考资料:
- zhuanlan.zhihu.com/p/516553212