HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/self/root/proc/thread-self/root/lib/python3.9/site-packages/up2date_client/rhnreg.py
#
# RHN Registration Client
# Copyright (c) 2000--2016 Red Hat, Inc.
#
# Authors:
#     Adrian Likins <alikins@redhat.com>
#     Preston Brown <pbrown@redhat.com>
#     Daniel Benamy <dbenamy@redhat.com>
import os
import sys

from up2date_client import up2dateUtils
from up2date_client import up2dateErrors
from up2date_client import up2dateAuth
from up2date_client import rhnserver
from up2date_client import up2dateLog
from up2date_client import statistics
from up2date_client.pkgplatform import getPlatform
from rhn.i18n import ustr, sstr

try: # python2
    import urlparse
    import xmlrpclib
    from types import ListType, TupleType, StringType, UnicodeType, DictType, DictionaryType
except ImportError: # python3
    import urllib.parse as urlparse
    import xmlrpc.client as xmlrpclib
    ListType = list
    TupleType = tuple
    StringType = bytes
    UnicodeType = str
    DictType = dict
    DictionaryType = dict
    long = int

import gettext
t = gettext.translation('rhn-client-tools', fallback=True)
# Python 3 translations don't have a ugettext method
if not hasattr(t, 'ugettext'):
    t.ugettext = t.gettext
_ = t.ugettext

SYSID_DIR = "/etc/sysconfig/rhn"
JWT_TOKEN = '/etc/sysconfig/rhn/jwt.token'

_human_readable_to_product = {
    'CloudLinux OS Shared Pro': 'shared_pro',
    'CloudLinux OS Shared': 'shared',
    'CloudLinux OS Solo': 'solo',
    'CloudLinux OS Admin': 'admin',
}
_product_to_human_readable = {
    v: k for k, v in _human_readable_to_product.items()}

from up2date_client import config
cfg = config.initUp2dateConfig()
log = up2dateLog.initLog()


def registered():
    return os.access(cfg['systemIdPath'], os.R_OK)

def _write_secure_file(secure_file, file_contents):
    """ Write a file to disk that is not readable by other users. """
    dir_name = os.path.dirname(secure_file)
    if not os.access(dir_name, os.W_OK):
        return False

    if os.access(secure_file, os.F_OK):
        # already have file there; let's back it up
        try:
            os.rename(secure_file, secure_file + '.save')
        except:
            return False

    fd = os.open(secure_file, os.O_WRONLY | os.O_CREAT, int('0600', 8))
    fd_file = os.fdopen(fd, 'w')
    try:
        fd_file.write(sstr(file_contents))
    finally:
        fd_file.close()

    return True

def writeSystemId(systemId):
    res = _write_secure_file(cfg['systemIdPath'], systemId)

    return res

def _execute_pre_jwt_update_hook(token: str, allowTransition: bool):
    """
    Execute binary file which we use as hook for jwt token updates
    """
    import subprocess, os
    # during cldeploy we don't have python or any other packages, so just exit
    if not os.path.exists('/opt/cloudlinux/venv/bin/python3'):
        return

    cmd = ['/usr/sbin/cl-pre-jwt-update', '--new-token', token]
    if allowTransition:
        cmd.append('--allow-transition')

    p = subprocess.Popen(cmd)
    stdout, stderr = p.communicate()
    if p.returncode != 0:
        log.log_me("Pre jwt update hook failed with stdout=%s and stderr=%s" % (stdout, stderr))


def _execute_post_jwt_update_hook(allowTransition: bool):
    """
    Execute binary file which we use as hook for jwt token updates
    """
    import subprocess, os
    # during cldeploy we don't have python or any other packages, so just exit
    if not os.path.exists('/opt/cloudlinux/venv/bin/python3'):
        return

    cmd = ['/usr/sbin/cl-post-jwt-update']
    if allowTransition:
        cmd.append('--allow-transition')

    p = subprocess.Popen(cmd)
    stdout, stderr = p.communicate()
    if p.returncode != 0:
        log.log_me("Post jwt update hook failed with stdout=%s and stderr=%s" % (stdout, stderr))


def getAndWriteJWTTokenToFile(systemId, allowTransition = False):
    """
    Get a JWT token from CLN and save it to the file
    :param systemId: content of file `/etc/sysconfig/rhn/systemid`
    :return: None
    """

    xmlrpm_server = rhnserver.RhnServer()
    try:
        result = xmlrpm_server.up2date.getJWTToken(systemId)

    except up2dateErrors.UnknownMethodException:
        # if CLN doesn't have this method we do nothing
        return
    except (up2dateErrors.AuthenticationTicketError,
            up2dateErrors.RhnUuidUniquenessError,
            up2dateErrors.CommunicationError,
            up2dateErrors.AuthenticationOrAccountCreationError):
        log.log_exception(*sys.exc_info())
        return

    _execute_pre_jwt_update_hook(result, allowTransition)
    _write_secure_file(JWT_TOKEN, result)
    _execute_post_jwt_update_hook(allowTransition)


def getServerEdition(human_readable: bool = False):
    edition_cache_file = '/opt/cloudlinux/cl_edition'

    # edition cache does not exist only in the case when
    # we did not register server yet
    if not os.path.exists(edition_cache_file):
        return 'shared'

    with open(edition_cache_file) as f:
        raw_edition = f.read().strip('\n')
        if human_readable:
            return raw_edition
        return _human_readable_to_product[raw_edition]


def checkLicenseKey(activationKey, strictEdition, silentMigration):
    try:
        licenseInformation = checkKey(activationKey)
    except up2dateErrors.CommunicationError as e:
        print("%s" % e.errmsg)
        sys.exit(1)
    except up2dateErrors.UnknownMethodException:
        return

    currentEdition = getServerEdition()
    licenseEdition = licenseInformation['edition']

    if licenseEdition == currentEdition:
        return

    if strictEdition:
        print(
            "WARNING: Automatic registration in yum transactions is "
            "only available when edition matches the provided license. "
            "Your current edition is {current_edition} and your license is {new_edition}.".format(
                current_edition=_product_to_human_readable[currentEdition],
                new_edition=_product_to_human_readable[licenseEdition]
            )
        )
        print("Run clnreg_ks manually to complete registration.")
        sys.exit(1)

    if not silentMigration:
        if not sys.stdin.isatty():
            print('Error: interactive input required for edition migration, but tool '
                  'is running in non-interactive mode. Please try running the tool again '
                  'in interactive shell or add `--migrate-silently` flag to accept all'
                  'questions and perform the edition migration silently.')
            exit(1)

        message = (f"{_product_to_human_readable[currentEdition]} edition installed on your server "
                   f"does not match license you are trying to register server with: "
                   f"{_product_to_human_readable[licenseEdition]}. Migration is required. "
                   f"You may lose access to the services which are not supported by the new edition.")

        edition_to_users_limit = {
            'admin': 5,
            'solo': 1
        }
        license_users_limit = edition_to_users_limit.get(licenseEdition)
        if license_users_limit is not None:
            users_on_server = statistics.count_server_users()

            if users_on_server > license_users_limit:
                print(f"The license you are trying to register with allows a maximum of "
                      f"{edition_to_users_limit[licenseEdition]} hosting accounts which is less "
                      f"than {users_on_server} users detected on this server. Aborting.")
                sys.exit(1)
            else:
                message = (f"{message} Also, the license you are trying to register with allows a maximum of "
                           f"{edition_to_users_limit[licenseEdition]} hosting accounts. "
                           f"Make sure that your system complies with this requirement.")

        _askConfirmation(message)



def _askConfirmation(confirmationMessage: str):
    """
    Prints message and makes sure that client is ready for edition migration.
    """
    print(confirmationMessage)

    response = input("Do you want to continue? [N/y]: ", )
    if response.lower() != 'y':
        print('Aborted.')
        sys.exit(1)


def registerSystem(username = None, password = None,
                   profileName = None,
                   token = None, other = None, edition=None):
    """Wrapper for the old xmlrpc to register a system. Activates subscriptions
    if a reg num is given.

    """
    assert username is None and password is None, \
        "username and password usage is deprecated"

    auth_dict = { "profile_name" : profileName,
                  "os_release" : up2dateUtils.getVersion(),
                  "release_name" : up2dateUtils.getOSRelease(),
                  "architecture" : up2dateUtils.getArch()
                }

    # send information about previous registration
    system_id_xml = up2dateAuth.getSystemId()
    if system_id_xml is not None:
        auth_dict["system_id"] = system_id_xml

    # dict of other bits to send
    if other:
        for (key, item) in other.items():
            auth_dict[key] = item
    if token:
        auth_dict["token"] = token
    else:
        auth_dict["username"] = username
        auth_dict["password"] = password

    if edition is not None:
        auth_dict['edition'] = edition
    else:
        auth_dict['edition'] = 'solo' \
            if os.path.exists('/etc/cloudlinux-edition-solo') \
            else 'admin' if os.path.exists('/etc/cloudlinux-edition-admin') else 'shared'

    s = rhnserver.RegistrationRhnServer()
    ret = s.registration.new_system(auth_dict)

    return ret


def checkKey(activationKey):
    """
    Check the activation key and return it's edition and customer
    """
    s = rhnserver.RegistrationRhnServer()
    ret = s.registration.license_check(activationKey)

    # {'customerId': 10000327, 'edition': 'solo'}
    return ret


def _encode_characters(*args):
        """ All the data we gathered from dmi, bios, gudev are in utf-8,
            we need to convert characters beyond ord(127) - e.g \xae to unicode.
        """
        result=[]
        for item in args:
            item_type = type(item)
            if item_type == StringType:
                item = ustr(item)
            elif item_type == TupleType:
                item = tuple(_encode_characters(i) for i in item)
            elif item_type == ListType:
                item = [_encode_characters(i) for i in item]
            elif item_type == DictType or item_type == DictionaryType:
                item = dict([(_encode_characters(name, val)) for name, val in item.items()])
            # else: numbers or UnicodeType - are safe
            result.append(item)
        if len(result) == 1:
            return result[0]
        else:
            return tuple(result)