File: //proc/self/root/proc/thread-self/root/lib/python3.9/site-packages/up2date_client/rhnreg.py
#
# RHN Registration Client
# Copyright (c) 2000--2016 Red Hat, Inc.
#
# Authors:
# Adrian Likins <alikins@redhat.com>
# Preston Brown <pbrown@redhat.com>
# Daniel Benamy <dbenamy@redhat.com>
import os
import sys
from up2date_client import up2dateUtils
from up2date_client import up2dateErrors
from up2date_client import up2dateAuth
from up2date_client import rhnserver
from up2date_client import up2dateLog
from up2date_client import statistics
from up2date_client.pkgplatform import getPlatform
from rhn.i18n import ustr, sstr
try: # python2
import urlparse
import xmlrpclib
from types import ListType, TupleType, StringType, UnicodeType, DictType, DictionaryType
except ImportError: # python3
import urllib.parse as urlparse
import xmlrpc.client as xmlrpclib
ListType = list
TupleType = tuple
StringType = bytes
UnicodeType = str
DictType = dict
DictionaryType = dict
long = int
import gettext
t = gettext.translation('rhn-client-tools', fallback=True)
# Python 3 translations don't have a ugettext method
if not hasattr(t, 'ugettext'):
t.ugettext = t.gettext
_ = t.ugettext
SYSID_DIR = "/etc/sysconfig/rhn"
JWT_TOKEN = '/etc/sysconfig/rhn/jwt.token'
_human_readable_to_product = {
'CloudLinux OS Shared Pro': 'shared_pro',
'CloudLinux OS Shared': 'shared',
'CloudLinux OS Solo': 'solo',
'CloudLinux OS Admin': 'admin',
}
_product_to_human_readable = {
v: k for k, v in _human_readable_to_product.items()}
from up2date_client import config
cfg = config.initUp2dateConfig()
log = up2dateLog.initLog()
def registered():
return os.access(cfg['systemIdPath'], os.R_OK)
def _write_secure_file(secure_file, file_contents):
""" Write a file to disk that is not readable by other users. """
dir_name = os.path.dirname(secure_file)
if not os.access(dir_name, os.W_OK):
return False
if os.access(secure_file, os.F_OK):
# already have file there; let's back it up
try:
os.rename(secure_file, secure_file + '.save')
except:
return False
fd = os.open(secure_file, os.O_WRONLY | os.O_CREAT, int('0600', 8))
fd_file = os.fdopen(fd, 'w')
try:
fd_file.write(sstr(file_contents))
finally:
fd_file.close()
return True
def writeSystemId(systemId):
res = _write_secure_file(cfg['systemIdPath'], systemId)
return res
def _execute_pre_jwt_update_hook(token: str, allowTransition: bool):
"""
Execute binary file which we use as hook for jwt token updates
"""
import subprocess, os
# during cldeploy we don't have python or any other packages, so just exit
if not os.path.exists('/opt/cloudlinux/venv/bin/python3'):
return
cmd = ['/usr/sbin/cl-pre-jwt-update', '--new-token', token]
if allowTransition:
cmd.append('--allow-transition')
p = subprocess.Popen(cmd)
stdout, stderr = p.communicate()
if p.returncode != 0:
log.log_me("Pre jwt update hook failed with stdout=%s and stderr=%s" % (stdout, stderr))
def _execute_post_jwt_update_hook(allowTransition: bool):
"""
Execute binary file which we use as hook for jwt token updates
"""
import subprocess, os
# during cldeploy we don't have python or any other packages, so just exit
if not os.path.exists('/opt/cloudlinux/venv/bin/python3'):
return
cmd = ['/usr/sbin/cl-post-jwt-update']
if allowTransition:
cmd.append('--allow-transition')
p = subprocess.Popen(cmd)
stdout, stderr = p.communicate()
if p.returncode != 0:
log.log_me("Post jwt update hook failed with stdout=%s and stderr=%s" % (stdout, stderr))
def getAndWriteJWTTokenToFile(systemId, allowTransition = False):
"""
Get a JWT token from CLN and save it to the file
:param systemId: content of file `/etc/sysconfig/rhn/systemid`
:return: None
"""
xmlrpm_server = rhnserver.RhnServer()
try:
result = xmlrpm_server.up2date.getJWTToken(systemId)
except up2dateErrors.UnknownMethodException:
# if CLN doesn't have this method we do nothing
return
except (up2dateErrors.AuthenticationTicketError,
up2dateErrors.RhnUuidUniquenessError,
up2dateErrors.CommunicationError,
up2dateErrors.AuthenticationOrAccountCreationError):
log.log_exception(*sys.exc_info())
return
_execute_pre_jwt_update_hook(result, allowTransition)
_write_secure_file(JWT_TOKEN, result)
_execute_post_jwt_update_hook(allowTransition)
def getServerEdition(human_readable: bool = False):
edition_cache_file = '/opt/cloudlinux/cl_edition'
# edition cache does not exist only in the case when
# we did not register server yet
if not os.path.exists(edition_cache_file):
return 'shared'
with open(edition_cache_file) as f:
raw_edition = f.read().strip('\n')
if human_readable:
return raw_edition
return _human_readable_to_product[raw_edition]
def checkLicenseKey(activationKey, strictEdition, silentMigration):
try:
licenseInformation = checkKey(activationKey)
except up2dateErrors.CommunicationError as e:
print("%s" % e.errmsg)
sys.exit(1)
except up2dateErrors.UnknownMethodException:
return
currentEdition = getServerEdition()
licenseEdition = licenseInformation['edition']
if licenseEdition == currentEdition:
return
if strictEdition:
print(
"WARNING: Automatic registration in yum transactions is "
"only available when edition matches the provided license. "
"Your current edition is {current_edition} and your license is {new_edition}.".format(
current_edition=_product_to_human_readable[currentEdition],
new_edition=_product_to_human_readable[licenseEdition]
)
)
print("Run clnreg_ks manually to complete registration.")
sys.exit(1)
if not silentMigration:
if not sys.stdin.isatty():
print('Error: interactive input required for edition migration, but tool '
'is running in non-interactive mode. Please try running the tool again '
'in interactive shell or add `--migrate-silently` flag to accept all'
'questions and perform the edition migration silently.')
exit(1)
message = (f"{_product_to_human_readable[currentEdition]} edition installed on your server "
f"does not match license you are trying to register server with: "
f"{_product_to_human_readable[licenseEdition]}. Migration is required. "
f"You may lose access to the services which are not supported by the new edition.")
edition_to_users_limit = {
'admin': 5,
'solo': 1
}
license_users_limit = edition_to_users_limit.get(licenseEdition)
if license_users_limit is not None:
users_on_server = statistics.count_server_users()
if users_on_server > license_users_limit:
print(f"The license you are trying to register with allows a maximum of "
f"{edition_to_users_limit[licenseEdition]} hosting accounts which is less "
f"than {users_on_server} users detected on this server. Aborting.")
sys.exit(1)
else:
message = (f"{message} Also, the license you are trying to register with allows a maximum of "
f"{edition_to_users_limit[licenseEdition]} hosting accounts. "
f"Make sure that your system complies with this requirement.")
_askConfirmation(message)
def _askConfirmation(confirmationMessage: str):
"""
Prints message and makes sure that client is ready for edition migration.
"""
print(confirmationMessage)
response = input("Do you want to continue? [N/y]: ", )
if response.lower() != 'y':
print('Aborted.')
sys.exit(1)
def registerSystem(username = None, password = None,
profileName = None,
token = None, other = None, edition=None):
"""Wrapper for the old xmlrpc to register a system. Activates subscriptions
if a reg num is given.
"""
assert username is None and password is None, \
"username and password usage is deprecated"
auth_dict = { "profile_name" : profileName,
"os_release" : up2dateUtils.getVersion(),
"release_name" : up2dateUtils.getOSRelease(),
"architecture" : up2dateUtils.getArch()
}
# send information about previous registration
system_id_xml = up2dateAuth.getSystemId()
if system_id_xml is not None:
auth_dict["system_id"] = system_id_xml
# dict of other bits to send
if other:
for (key, item) in other.items():
auth_dict[key] = item
if token:
auth_dict["token"] = token
else:
auth_dict["username"] = username
auth_dict["password"] = password
if edition is not None:
auth_dict['edition'] = edition
else:
auth_dict['edition'] = 'solo' \
if os.path.exists('/etc/cloudlinux-edition-solo') \
else 'admin' if os.path.exists('/etc/cloudlinux-edition-admin') else 'shared'
s = rhnserver.RegistrationRhnServer()
ret = s.registration.new_system(auth_dict)
return ret
def checkKey(activationKey):
"""
Check the activation key and return it's edition and customer
"""
s = rhnserver.RegistrationRhnServer()
ret = s.registration.license_check(activationKey)
# {'customerId': 10000327, 'edition': 'solo'}
return ret
def _encode_characters(*args):
""" All the data we gathered from dmi, bios, gudev are in utf-8,
we need to convert characters beyond ord(127) - e.g \xae to unicode.
"""
result=[]
for item in args:
item_type = type(item)
if item_type == StringType:
item = ustr(item)
elif item_type == TupleType:
item = tuple(_encode_characters(i) for i in item)
elif item_type == ListType:
item = [_encode_characters(i) for i in item]
elif item_type == DictType or item_type == DictionaryType:
item = dict([(_encode_characters(name, val)) for name, val in item.items()])
# else: numbers or UnicodeType - are safe
result.append(item)
if len(result) == 1:
return result[0]
else:
return tuple(result)