From ff55f10a9533538010ece6170ce462d9085d671f Mon Sep 17 00:00:00 2001 From: Daniel afx Date: Mon, 15 Feb 2016 12:30:43 +0200 Subject: [PATCH] First. --- .gitignore | 66 +++++++ LICENSE | 18 ++ README.md | 13 ++ clientsdb.py | 62 +++++++ config.ini.dist | 28 +++ grid.py | 396 ++++++++++++++++++++++++++++++++++++++++ ioconfig.py | 21 +++ journaldb.py | 59 ++++++ nginx_example_vhost.txt | 10 + novnc.py | 44 +++++ plugin.py | 301 ++++++++++++++++++++++++++++++ proxmaster.py | 269 +++++++++++++++++++++++++++ requirements.txt | 8 + runwebsockify.py | 5 + start.sh | 18 ++ utils.py | 73 ++++++++ 16 files changed, 1391 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 clientsdb.py create mode 100644 config.ini.dist create mode 100644 grid.py create mode 100644 ioconfig.py create mode 100644 journaldb.py create mode 100644 nginx_example_vhost.txt create mode 100644 novnc.py create mode 100644 plugin.py create mode 100644 proxmaster.py create mode 100644 requirements.txt create mode 100644 runwebsockify.py create mode 100755 start.sh create mode 100644 utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f54d41a --- /dev/null +++ b/.gitignore @@ -0,0 +1,66 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +#proxmaster custom ignores + +log/ +config.ini +*.json + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3ee9ff6 --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +Copyright (c) 2015 deflax.net + +This software is provided 'as-is', without any express or implied +warranty. In no event will the authors be held liable for any damages +arising from the use of this software. + +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it +freely, subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgement in the product documentation would be + appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. + diff --git a/README.md b/README.md new file mode 100644 index 0000000..ddc790e --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +#Proxmaster +Python RESTful API for managing a grid of vm slaves + +##Installation instructions: +``` +1. sudo pip3 install -r requirements.txt +2. create config.ini with the following format: +3. chmod +x start.sh +4. create nginx vhost via the provided template files: +- config.ini.dist +- nginx_example_vhost.txt +5. o/ +``` diff --git a/clientsdb.py b/clientsdb.py new file mode 100644 index 0000000..1d466ec --- /dev/null +++ b/clientsdb.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 +# +# manage clientsdb.json + +#import site packages +import json + +#import local packages +import ioconfig +import utils + +def addclient(vmid, vmname, clientid, clientname): + """ add new client to the clientsdb.json """ + clientsdb = readclientsdb() + if str(clientid) in clientsdb: + ioconfig.logger.info('clients> client ' + clientid + ' already exists. merging.') + else: + ioconfig.logger.info('clients> client ' + clientid + ' does not exist. creating.') + vcard = { 'name':str(clientname) } + newclient = { str(clientid):vcard } + clientsdb.update(newclient) + + ioconfig.logger.info('clients> vmid ' + vmid + ' will be owned by ' + clientid + ' (' + clientname + ')') + vmdata = { 'name':str(vmname), 'vmid':str(vmid), 'ownerid':str(clientid) } + clientsdb[str(clientid)][str(vmid)] = vmdata + writeclientsdb(clientsdb) + + +def vmowner(vmid, vmname, verbose): + """ find the owner of the vm """ + clientsdb = readclientsdb() + try: + clientid = utils.get_rec(clientsdb, str(vmid))[0]['ownerid'] + clientname = clientsdb[str(clientid)]['name'] + except: + raise + clientid = '0' #unknown owner + clientname = 'unknown' + if verbose: + ioconfig.logger.info('clients> the owner of ' + str(vmid) + ' (' + vmname + ') is ' + str(clientid) + ' (' + clientname + ')') + return clientid + + +def readclientsdb(): + """ read client db """ + try: + with open('clients.json') as dbr: + clientsdb = json.load(dbr) + dbr.close() + except: + clientsdb = {} + ioconfig.logger.warning('clients> initializing...') + #writeclientsdb(clientsdb) + return clientsdb + + +def writeclientsdb(clientsdb): + """ write db """ + with open('clients.json', 'w') as dbw: + json.dump(clientsdb, dbw) + dbw.close() + diff --git a/config.ini.dist b/config.ini.dist new file mode 100644 index 0000000..1ac6eab --- /dev/null +++ b/config.ini.dist @@ -0,0 +1,28 @@ +[uwsgi] +socket = 127.0.0.1:5117 +workers = 3 + +[general] +logfile = log/proxmaster.log +adminuser = masteradmin@pve +apipass = CHANGEME +vmid_min = 1000 +vmid_max = 999999 +novnc_url = http://FQDN/noVNC + +[region_0] +name = CHANGEME +ipv4_min = 192.168.0.4 +ipv4_max = 192.168.0.254 + +[slave_0] +name = CHANGEME +masterip = 192.168.0.2 +password = CHANGEME +regionid = 0 + +[slave_1] +name = CHANGEME +masterip = 192.168.0.3 +password = CHANGEME +regionid = 0 diff --git a/grid.py b/grid.py new file mode 100644 index 0000000..7acb20b --- /dev/null +++ b/grid.py @@ -0,0 +1,396 @@ +#. -*- coding: utf-8 +# +# vdc + +#import site packages +import base64 +import json +import re +import datetime +import random +import netaddr + +#import local packages +import utils +import plugin +import ioconfig +import clientsdb +import journaldb + +logger = ioconfig.logger +config = ioconfig.parser + + +def sync(cached=True): + """ calls slave objects and mix their nodes in a common cluster grid """ + a = datetime.datetime.now() + grid_vmid_min = config.get('general', 'vmid_min') + grid_vmid_max = config.get('general', 'vmid_max') + real_grid = {'name':'real', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max } + cache_grid = {'name':'cache', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max } + regionselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'region' + r'\W*', x)] + for ri in range(len(regionselector)): + region_section = config.sections()[int(regionselector[ri])] + region_id = region_section.split("_")[1] + region_name = config.get(region_section, 'name') + region_range_min = config.get(region_section, 'ipv4_min') + region_range_max = config.get(region_section, 'ipv4_max') + slaveselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'slave' + r'\W*', x)] + real_region = { "id":region_id, "region":region_name, "ipv4_min":region_range_min, "ipv4_max":region_range_max } + cache_region = real_region.copy() #same region both in cache nad status + + for si in range(len(slaveselector)): + slave_section = config.sections()[int(slaveselector[si])] + slave_id = slave_section.split("_")[1] + slave_name = config.get(slave_section, 'name') + slave_masterip = config.get(slave_section, 'masterip') + slave_password = config.get(slave_section, 'password') + slave_regionid = config.get(slave_section, 'regionid') + enc_slave_password = base64.b64encode(slave_password.encode('ascii')) #encode base64 to avoid shoulder surfers + decoded_password = enc_slave_password.decode('utf-8') + real_slave = { "id":slave_id, "slave":slave_name, "masterip":slave_masterip, "password":decoded_password } + optional_slave = {} + cache_file = 'cache-slave-' + slave_id + '.json' + prefix = 'cache> [' + slave_id + '] ' + + # check if slave is in current region and include it in current dict if it is + if slave_regionid == region_id: + try: + #trying to connect to slave host + #vmlist = plugin.vmlist(slave_id, slave_masterip, enc_slave_password.decode('utf-8')) + proxobject = plugin.auth(slave_id, slave_masterip, enc_slave_password) + vmlist = plugin.vmlist(proxobject) + real_slave['alive'] = 'up' + logger.info(prefix + 'is up') + except: + #raise + #slave cant be read so it will be marked down. + real_slave['alive'] = 'down' + logger.warning(prefix + 'is down') + + if real_slave['alive'] == 'up': + #populate grid with vms then + for vm in vmlist: + #static parameters that CAN go to to cache: + vm_id = vm['vmid'] + vm_name = vm['name'] + vm_owner = clientsdb.vmowner(vm_id, vm_name, cached) #read clientsdb cache + static_vm = { "vmid":str(vm_id), "hostname":vm_name, 'type':vm['vmtype'], 'owner':vm_owner } + real_slave[str(vm_id)] = static_vm + #dynamic parameters that SHOULD NOT go to the cache: + dynamic_vm = { "uptime":vm['uptime'] } + optional_slave[str(vm_id)] = dynamic_vm + + #check current cache + cache_slave = real_slave.copy() #fallback to the current state + try: + with open(cache_file) as fcr: + cache_slave = json.load(fcr) + fcr.close() + except: + logger.info(prefix + 'does not exist in cache. Initializing...') + + if cache_slave['alive'] == 'up': + #slave was not down so it must be up... + cache_slave = update_cache(real_slave, cache_file, prefix, 'up') + logger.info(prefix + 'sync success o/') + else: + #if the slave was down before, compare the state before overwriting the cache + cache_slave['alive'] = 'up' #even if alive status in cache is still down we ignore it by forcing it to up + logger.info(prefix + 'was down') + #show the differences in log for manual (or maybe automatic at some point fixing) + findDiff(cache_slave, real_slave) + if cache_slave != real_slave: + logger.warning(prefix + 'cache != current status. please restore host!') + cache_slave = update_cache(cache_slave, cache_file, prefix, 'down') + else: + logger.info(prefix + 'cache == current status. host restored. o/') + cache_slave = update_cache(cache_slave, cache_file, prefix, 'up') + + #what to do with cache if host is down + if real_slave['alive'] == 'down': + try: + logger.warning(prefix + 'loading cache...') + with open(cache_file) as fscr: + cache_slave = json.load(fscr) + fscr.close() + logger.warning(prefix + '...done') + cache_slave = update_cache(cache_slave, cache_file, prefix, 'down') + except: + logger.error(prefix + 'sync failure!') + cache_slave = real_slave.copy() + #raise + + #we safely mix the dynamic ids now that we dont deal with cache anymore + mergedslave = utils.dict_merge({}, real_slave, optional_slave) + real_region[slave_id] = mergedslave + cache_region[slave_id] = cache_slave + #the region is finally included in the grid + real_grid[region_id] = real_region + cache_grid[region_id] = cache_region + + b = datetime.datetime.now() + real_grid["synctime"] = str(b-a) + #dump all data to json + WriteCache(cache_grid, 'grid-cache.json') + WriteCache(real_grid, 'grid-real.json') + if cached == True: + return cache_grid + else: + return real_grid + + +def update_cache(cachedata, cachefile, prefix, newstatus): + """ update administravite status """ + cachedata['alive'] = newstatus + WriteCache(cachedata, cachefile) + #TODO send mail + logger.info(prefix + 'administratively ' + newstatus) + return cachedata + + +def WriteCache(src_data, cache_file): + with open(cache_file, 'w') as fcw: + json.dump(src_data, fcw) + fcw.close() + + +def query_region(region_name): + """ translate region name to region id """ + grid_data = readcache() + + all_regions = [] + for element in grid_data: + try: + if str(element) == grid_data[element]['id']: + all_regions.append(element) + except: + continue + + for region in all_regions: + if grid_data[region]['region'] == region_name: + logger.info('grid> region ' + region_name + ' found') + return grid_data[region]['id'] + break + logger.error('grid> cant find region ' + region_name) + return "-1" + +def query_happiness(region_id): + """ analyzes grid data for the reuqested region and returns proposed slave_id, + based on a "happiness" factor. happiness means alive and free :) """ + grid_data = readcache() + grid_data = grid_data[str(region_id)] + + all_slaves = [] + for element in grid_data: + try: + if str(element) == grid_data[element]['id']: + all_slaves.append(element) + except: + continue + all_slaves = [ int(x) for x in all_slaves ] #convert values from str to int + + alive_slaves = [] + for slaveid in all_slaves: + if str(grid_data[str(slaveid)]['alive']) == 'up': + alive_slaves.append(slaveid) + logger.info('grid> alive slaves ' + str(alive_slaves)) + + #happy_slave = random.choice(alive_slaves) + if len(alive_slaves) < 1: + logger.error('grid> grid is full. add more slaves') + else: + happy_slave = 1 #TODO: analyze slaves and make informed decision. + logger.info('grid> ' + str(happy_slave) + ' selected') + return happy_slave + + +def generate_ipv4(region_id, how_many=1): + """ analyzes cached grid data and returns ip addresses for new machines. """ + grid_data = readcache() + ip_range_min = grid_data[str(region_id)]['ipv4_min'] + ip_range_max = grid_data[str(region_id)]['ipv4_max'] + region_ipset = netaddr.IPSet(netaddr.IPRange(ip_range_min, ip_range_max)) + region_ips = [] + for ip in region_ipset: + region_ips.append(ip) + ip_min = 0 + ip_max = len(region_ips) - 1 + tested_ips = [] #initialize ip cache + requested_ips = [] + all_ips = utils.get_rec(grid_data, 'ipaddr') + + for ips in range(int(how_many)): + counter = 0 + while True: + if counter == 50: + logger.error('grid> ip range full') + return None + else: + counter += 1 + + requested_ip_index = random.randint(ip_min, ip_max) + requested_ip = str(region_ips[requested_ip_index]) + + if requested_ip in tested_ips: + logger.warning('grid> ip address ' + str(requested_ip) + ' already tested. cache: ' + str(tested_ips)) + continue + + if requested_ip in requested_ips: + logger.warning('grid> ip address ' + str(requested_ip) + ' already generated') + tested_ips.append(requested_ip) + continue + + if requested_ip in all_ips: + position = used_ips.index(requested_ip) + logger.warning('grid> ip address ' + str(requested_ip) + ' already exist. location:' + str(position)) + tested_ips.append(requested_ip) + continue + else: + tested_ips = [] #clear ip cache + break + + logger.info('grid> ip address ' + requested_ip + ' selected') + requested_ips.append(requested_ip) + logger.info('grid> ip addresses ' + str(requested_ips) + ' selected') + return requested_ips + + +def generate_vmid(): + """ analyzes cached grid data and return proposed vmid for new machines """ + grid_data = readcache() + tested_vmids = [] #initialize id cache + id_min = grid_data['vmid_min'] + id_max = grid_data['vmid_max'] + all_vmid = utils.get_rec(grid_data, 'vmid') #get all vmid values from the nested grid + all_vmid = [ int(x) for x in all_vmid ] #convert values from str to int (its a vmid right?) + counter = 0 + while True: + if counter == 50: + logger.error('grid> ip range full') + return None + else: + counter += 1 + + requested_vmid = random.randint(int(id_min), int(id_max)) #max 90k machines + + if requested_vmid in tested_vmids: + logger.warning('grid> vmid ' + str(requested_vmid) + ' already tested. cache:' + str(tested_vmids)) + continue + + if requested_vmid in all_vmid: + position = all_vmid.index(requested_vmid) + logger.warning('grid> vmid ' + str(requested_vmid) + ' already exist. location:' + str(position)) + tested_vmids.append(requested_vmid) + else: + tested_vmids = [] #clear tested vmid cache + break + + logger.info('grid> vmid ' + str(requested_vmid) + ' selected') + return requested_vmid + + +def query_slave_data(slave_id): + """ read the cache for the requested slave_id """ + grid_data = readcache() + all_regions = [] + for element in grid_data: + try: + if str(element) == grid_data[element]['id']: + all_regions.append(element) + except: + continue + try: + for region in all_regions: + cslave = grid_data[region] + result_slave = cslave[str(slave_id)] #select the slave from all regions + if result_slave != None: + return result_slave + break + except: + raise + return {} + + +def query_vm(req_vmid): + """ returns slave_id and vm_type for the requested vmid """ + #read current state (no cache) + sync(False) + grid_data = readreal() + + #compare requested vmid to all vmid's from the grid + #TODO: maybe we should also check the owner somehow + all_vmid = utils.get_rec(grid_data, 'vmid') + target = int(0) + for running_vmid in all_vmid: + if str(req_vmid) == str(running_vmid): + target = req_vmid #=runn? + break + else: + continue + if target == 0: + logger.error('grid> vmid {} cannot be found.' + str(req_vmid)) + return "-1" + + region_id, slave_id = journaldb.getjnode(target) + try: + vm_type = grid_data[str(region_id)][str(slave_id)][str(target)]['type'] + except: + raise + + #we should know them by now. + return slave_id, vm_type + + +def findDiff(d1, d2, path=""): + for k in d1.keys(): + if not k in d2.keys(): + logger.warning('cache> ' + str(k) + ' as key not in d2') + else: + if type(d1[k]) is dict: + if path == "": + path = k + else: + path = path + "->" + k + findDiff(d1[k],d2[k], path) + else: + if d1[k] != d2[k]: + logger.warning('cache> ' + str(k) + ' ' + str(d1[k]) + ' [-]') + logger.warning('cache> ' + str(k) + ' ' + str(d2[k]) + ' [+]') + +def readcache(): + """ read the last saved cache and return its contents """ + try: + with open('grid-cache.json') as gridfile: + grid_data = json.load(gridfile) + gridfile.close() + except: + grid_data = {} + logger.error('cache> cannot read cache file') + return grid_data + + +def readreal(): + """ read the current state and return its contents """ + try: + with open('grid-real.json') as gridfile: + grid_data = json.load(gridfile) + gridfile.close() + resulttime = grid_data['synctime'] + logger.info('grid> sync for ' + resulttime) + except: + grid_data = {} + logger.error('cache> cannot read temp file') + return grid_data + + + +if __name__ == '__main__': + print(sync()) + #print(query_region('Plovdiv, Bulgaria')) + #print(query_happiness(0)) + #print(generate_ipv4(0,3)) + #print(generate_vmid()) + #print(query_slave_data(0)) + #print(query_vm(483039)) + diff --git a/ioconfig.py b/ioconfig.py new file mode 100644 index 0000000..60a7fc1 --- /dev/null +++ b/ioconfig.py @@ -0,0 +1,21 @@ +#. -*- coding: utf-8 +# +# ioconfig.py - read/write config/log etc. + +import configparser +import logging + +""" config reader """ +parser = configparser.ConfigParser() +parser.read('config.ini') + +""" log writer """ +logger = logging.getLogger('proxmaster') +logging.captureWarnings(True) +logger.setLevel(logging.DEBUG) +handler = logging.FileHandler(parser.get('general', 'logfile')) +handler.setLevel(logging.DEBUG) +#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) diff --git a/journaldb.py b/journaldb.py new file mode 100644 index 0000000..5edc56c --- /dev/null +++ b/journaldb.py @@ -0,0 +1,59 @@ +# -*- coding: utf-8 +# +# manage journaldb.json which is a table of vmid's as indexes update on vmcreate and +# values of region_id and slave_id. should support deletion of unused id's and be +# properly updated on vm migrations + +#site +import json + +#local +import ioconfig +import utils + +def createjnode(vmid, regionid, slaveid): + """ create new record into the journal. invoked on vm creation """ + journaldb = readjournal() + if str(vmid) in journaldb: + ioconfig.logger.warning('journal> overwriting id[{}] !'.format(vmid)) + else: + jnode = { str(vmid):{} } + journaldb.update(jnode) + + ioconfig.logger.info ('journal> r[{}] s[{}] -> id[{}]'.format(regionid, slaveid, vmid)) + jdata = { 'vmid':str(vmid), 'slaveid':str(slaveid), 'regionid':str(regionid) } + journaldb[str(vmid)] = jdata + writedb(journaldb) + + +def getjnode(vmid): + """ query the database for records with requested vmid. invoked on user commands """ + journaldb = readjournal() + try: + regionid = journaldb[str(vmid)]['regionid'] + slaveid = journaldb[str(vmid)]['slaveid'] + ioconfig.logger.info('journal> read: id[{}] -> r[{}] s[{}]'.format(vmid, regionid, slaveid)) + except: + ioconfig.logger.error('journal> invalid id[{}] !'.format(vmid)) + else: + return regionid, slaveid + + +def readjournal(): + """ read journal """ + try: + with open('journal.json') as dbr: + journaldb = json.load(dbr) + dbr.close() + except: + journaldb = {} + ioconfig.logger.warning('journal> initializing...') + return journaldb + + +def writedb(journaldb): + """ write journal """ + with open('journal.json', 'w') as dbw: + json.dump(journaldb, dbw) + dbw.close() + diff --git a/nginx_example_vhost.txt b/nginx_example_vhost.txt new file mode 100644 index 0000000..32f5b5b --- /dev/null +++ b/nginx_example_vhost.txt @@ -0,0 +1,10 @@ +server { + listen 80; + server_name EXAMPLE.com; + location / { + uwsgi_pass 127.0.0.1:5117; + include uwsgi_params; + uwsgi_param UWSGI_SCRIPT proxmaster; + uwsgi_param UWSGI_PYHOME /home/USER/proxmaster; + } +} diff --git a/novnc.py b/novnc.py new file mode 100644 index 0000000..610b2aa --- /dev/null +++ b/novnc.py @@ -0,0 +1,44 @@ +#. -*- coding: utf-8 +# +# novnc daemon spawner + +#import site packages +import shlex, subprocess + + +def spawn(target, options): + """ spawn """ + vnctarget = '{}:{} {}:{} '.format(target['listen_host'], target['listen_port'], target['target_host'], target['target_port']) + + a_options = '' + for key, value in options.items(): + if value == True: + c_option = '--{} '.format(key) + else: + c_option = '--{} {} '.format(key, value) + a_options += c_option + + command_line = 'python3 runwebsockify.py ' + a_options + vnctarget + args = shlex.split(command_line) + p = subprocess.Popen(args) + print('spawn!') + + +def spawn2(options): + """ spawn novnc daemon """ + print('daemon spawned') + novncd = threading.Thread(name='novncd', target=start_websockify, args=(options,)) + novncd.setDaemon(True) #daemonic ]:> + novncd.start() + print('stauts', novncd.isAlive()) + + +def start_websockify(options): + """ spawn websockify process """ + print(options) + server = websockify.WebSocketProxy(**options) + server.start_server() + print('daemon exited') + #while True: + # print('daemon') + diff --git a/plugin.py b/plugin.py new file mode 100644 index 0000000..99e82d7 --- /dev/null +++ b/plugin.py @@ -0,0 +1,301 @@ +#. -*- coding: utf-8 - +# required proxmox permissions: PVESysAdmin, PVEVMAdmin +# +# afx 2015-2016 + +# site +from proxmoxer import ProxmoxAPI +import base64 +import json +import time +import socket + +#local +import grid +import clientsdb +import journaldb +import utils +import ioconfig +import novnc + +def auth(slave_id, masterip=None, enc_password=None): + """ captures slave we want to auth from the cache and extract the credentials """ + adminuser = ioconfig.parser.get('general', 'adminuser') + if masterip is None: + result_slave = grid.query_slave_data(slave_id) + masterip = result_slave['masterip'] + enc_password = result_slave['password'] + adminpassword = base64.b64decode(enc_password).decode('ascii') + + #vendor specific + #connection = lib_proxmoxia.Connector(masterip) + #auth_token = connection.get_auth_token(adminuser, adminpassword) + #proxobject = lib_proxmoxia.Proxmox(connection) + proxobject = ProxmoxAPI(masterip, user=adminuser, password=adminpassword, verify_ssl=False) + + return proxobject + + +def vmlist(proxobject): + """ get vmlist """ + #we keep a single node proxmoxes so node id = 0 + #slave_name = proxobject.get('cluster/status')#'name'] + slave_name = proxobject.cluster.status.get()[0]['name'] + #query_kvm = proxobject.get('nodes/%s/qemu' % slave_name) + query_kvm = proxobject.nodes(slave_name).qemu.get() + query_lxc = proxobject.nodes(slave_name).lxc.get() + for kvm_dict in query_kvm: + kvm_dict['vmtype'] = 'kvm' + for lxc_dict in query_lxc: + lxc_dict['vmtype'] = 'lxc' + vmlist = query_kvm + query_lxc #merge machine list + return vmlist + + +def vmcreate(req): + """ create vm. returns JSON with data for whmcs """ + grid.sync() + + region_id = grid.query_region(req['region']) + if region_id == "-1": + logger.error('grid> no region found') + response = 'NO REGION FOUND' + return response + + slave_id = str(grid.query_happiness(region_id)) + vm_id = str(grid.generate_vmid()) + vm_ipv4 = grid.generate_ipv4(region_id, req['vps_ipv4']) + ipv4_dict = {} + ipidx = 0 + + vm_name = req['hostname'] + client_id = req['clientid'] + client_name = req['clientname'] + + proxobject = auth(slave_id) #we dont know the ip of slave_id so we leave the auth function to find it itself. + slave_name = proxobject.cluster.status.get()[0]['name'] + + #ioconfig.logger.info('grid[' + slave_name + ']> recieved data: %s, %s, %s, %s, %s', region_id, slave_id, vm_id, vm_ipv4, req) + for ip in vm_ipv4: + ipv4_dict[str(ipidx)] = str(ip) + ipidx += 1 + response = { 'status':'CREATING', 'vmid':vm_id, 'name':vm_name, 'password':'TODO', 'ipv4_0':vm_ipv4[0] } + + disk_filename = 'vm-' + vm_id + '-disk-1' + description = vm_name + ' (' + vm_id + ')\n' + description += 'owned by ' + client_name + ' (' + client_id + ')\n' + description += 'master ip: ' + vm_ipv4[0] + + #create partition + image_name = 'vm-' + vm_id + '-disk-0' + local_storage = proxobject.nodes(slave_name).storage('lvm') + local_storage.content.post(vmid=vm_id, + filename=image_name, + size=req['vps_disk'] + 'G') + + if req['vps_type'] == 'KVM': + create_result = proxobject.nodes(slave_name).qemu.post(vmid=vm_id, + name=vm_name, + sockets=1, + cores=req['vps_cpu'], + memory=req['vps_ram'], + virtio0='lvm:' + image_name, + ide1='skyblue:iso/' + req['vps_os'] + ',media=cdrom', + net0='e1000,bridge=pub', + onboot=1, + description=description) + if req['vps_type'] == 'LXC': + create_result = proxobject.nodes(slave_name).lxc.post(vmid=vm_id, + hostname=vm_name, + password=req['password'], + sockets=1, + cores=req['vps_cpu'], + memory=req['vps_ram'], + virtio0='lvm:' + image_name, + ip_address=vm_ipv4[0], + onboot=1, + description=description) + + #populate the client db and vm journal + client_id = req['clientid'] + client_name = req['clientname'] + clientsdb.addclient(vm_id, vm_name, client_id, client_name) + journaldb.createjnode(vm_id, region_id, slave_id) + + #start the machihe + time.sleep(7) #wait few seconds for the slave to prepare the machine for initial run + vmstart(vm_id) + return response + + +def vmstatus(vm_id): + """ returns the status of the machine """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> get status of %s %s' % (slave_name, vm_type, vm_id)) + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.current.get() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.current.get() + return result + + + +def vmstart(vm_id): + """ starts a machine """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> starting %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.start.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.start.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'STARTING' } + return response + + +def vmshutdown(vm_id): + """ acpi shutdown the machine.. """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> acpi shutdown %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.stop.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.stop.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'SHUTDOWN', 'vmid':vm_id } + return response + + +def vmstop(vm_id): + """ poweroff the machine.. """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> power off %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.stop.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.stop.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'STOPPING', 'vmid':vm_id } + return response + + +def vmshutdown(vm_id): + """ graceful stop """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> acpi shutdown sent to %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.shutdown.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.shutdown.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'ACPI SHUTDOWN', 'vmid':vm_id } + return response + + +def vmsuspend(vm_id): + """ suspend machine """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> suspending %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.suspend.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.suspend.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'SUSPEND', 'vmid':vm_id } + return response + + +def vmresume(vm_id): + """ resume machine """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> resuming %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + result = proxobject.nodes(slave_name).qemu(vm_id).status.resume.post() + if vm_type == 'lxc': + result = proxobject.nodes(slave_name).lxc(vm_id).status.resume.post() + #ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result)) + response = { 'status':'RESUME', 'vmid':vm_id } + return response + + +def vmvnc(vm_id): + """ invoke vnc ticket """ + slave_id, vm_type = grid.query_vm(vm_id) + proxobject = auth(slave_id) + vm_type = vm_type.lower() + slave_name = proxobject.cluster.status.get()[0]['name'] + ioconfig.logger.info('grid[%s]> invoking vnc ticket for %s %s' % (slave_name, vm_type, vm_id)) + + if vm_type == 'kvm': + ticket = proxobject.nodes(slave_name).qemu(vm_id).vncproxy.post(websocket=1) + #socket = proxobject.nodes(slave_name).qemu(vm_id).vncwebsocket.get(port=ticket['port'], + # vncticket=ticket['ticket']) + if vm_type == 'lxc': + ticket = proxobject.nodes(slave_name).lxc(vm_id).vncproxy.post() + #socket = proxobject.nodes(slave_name).lxc(vm_id).vncwebsocket.get(port=ticket['port'], + # vncticket=ticket['ticket']) + + slaveip = grid.query_slave_data(slave_id)['masterip'] + #slaveport = socket['port'] + slaveport = ticket['port'] + listenport = str(int(slaveport) + 1000 + (int(slave_id) * 100)) #TODO: max 100 parallel connections/slave. + myip = getmyip() + + vnc_target = { 'target_host': slaveip, + 'target_port': slaveport, + 'listen_host': myip, + 'listen_port': listenport } + + vnc_options = { 'idle-timeout': 20, + 'verbose': True, + 'run-once': True } + + novnc.spawn(vnc_target, vnc_options) + + external_url = ioconfig.parser.get('general', 'novnc_url') + prefix = external_url + "/?host=" + myip + "&port=" + listenport + "&encrypt=0&true_color=1&password=" + ioconfig.logger.info('grid[{}]> {}'.format(slave_name, prefix + ticket['ticket'])) + + response = { 'status':'VNC', 'vmid':vm_id } + return response + + +def getmyip(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(("gmail.com",80)) + myip = s.getsockname()[0] + s.close + return myip + + +if __name__ == '__main__': + #internal module tests + time.sleep(30) + diff --git a/proxmaster.py b/proxmaster.py new file mode 100644 index 0000000..1642cef --- /dev/null +++ b/proxmaster.py @@ -0,0 +1,269 @@ +#. -*- coding: utf-8 - +# required proxmox permissions: PVESysAdmin, PVEVMAdmin +# +# afx 2015-2016 + +# import site packages +import logging +import falcon +import sys +import json +import urllib.parse + +#import local packages +import ioconfig +import grid +import plugin + +config = ioconfig.parser +logger = ioconfig.logger + +def welcome(): + """displays motd in log as welcome message""" + logger.info('###################################') + logger.info('# proxmaster ][ (c) 2015-2016 afx #') + logger.info('###################################') + +def apicheck(params): + """ compares request params for api key with the config file""" + try: + if params['apipass'] == config.get('general', 'apipass'): + status = True + response = 'OK' + else: + status = False + response = 'GET KEY DENIED' + logger.error('grid> read access denied. key mismatch') + except: + #raise + status = False + response = 'GET URL DENIED' + logger.error('grid> read access denied. url error?') + finally: + return (status, response) + +#API methods +class ClusterResource(object): + def on_get(self, req, resp): + """TEST ONLY. List cluster nodes. TEST ONLY""" + logger.info('grid> cache status') + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + resp.body = str(grid.sync()) + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + + def on_post(self, req, resp): + """Create a cluster node, returns array of: status, vmid, pass, ipv4, """ + logger.info('grid> create ' + str(req.params)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmcreate(req.params)) + except: + logger.error('grid> create function cancelled') + raise + resp.status = falcon.HTTP_403 + response = 'CREATE ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class StatusResource(object): + def on_get(self, req, resp, vmid): + """ check vm status """ + logger.info('grid> status ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmstatus(vmid)) + except: + logger.error('grid> status error') + raise + resp.status = falcon.HTTP_403 + response = 'STATUS ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class DeleteResource(object): + def on_post(self, req, resp, vmid): + """ delete machine completely""" + logger.info('grid> delete ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + try: + resp.body = urllib.parse.urlencode(plugin.vmdelete(vmid)) + except: + logger.error('grid> delete error') + raise + resp.status = falcon.HTTP_403 + response = 'DELETE ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class ArchivateResource(object): + def on_post(self, req, resp, vmid): + """ Temporary suspend the instance """ + logger.info('grid> suspend ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmsuspend(vmid)) + except: + logger.error('grid> pause error') + raise + resp.status = falcon.HTTP_403 + response = 'PAUSE ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class UnArchiveResource(object): + def on_post(self, req, resp, vmid): + """ Unuspend the instance """ + logger.info('grid> resume ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmresume(vmid)) + except: + logger.error('grid> resume error') + raise + resp.status = falcon.HTTP_403 + response = 'RESUME ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class StartResource(object): + def on_post(self, req, resp, vmid): + """ Start the instance """ + logger.info('grid> start ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmstart(vmid)) + except: + logger.error('grid> start error') + #raise + resp.status = falcon.HTTP_403 + response = 'START ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + + +class ShutdownResource(object): + def on_post(self, req, resp, vmid): + """ ACPI Shutdown the instance """ + logger.info('grid> shutdown ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmshutdown(vmid)) + #TODO: Try few times and then return proper status message + except: + logger.error('grid> shutdown error') + #raise + resp.status = falcon.HTTP_403 + response = 'SHUTDOWN ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + + +class StopResource(object): + def on_post(self, req, resp, vmid): + """ Stop the instance """ + logger.info('grid> stop ' + str(vmid)) + apicheck_stat, apicheck_resp = apicheck(req.params) + if apicheck_stat: + resp.status = falcon.HTTP_200 + try: + resp.body = urllib.parse.urlencode(plugin.vmstop(vmid)) + except: + logger.error('grid> stop error') + #raise + resp.status = falcon.HTTP_403 + response = 'STOP ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + +class VNCResource(object): + def on_post(self, req, resp, vmid): + """ Create a VNC link to the instance """ + apicheck_stat, apicheck_resp = apicheck(req.params) + logger.info('grid> vnc ' + str(vmid)) + if apicheck_stat: + try: + resp.status = falcon.HTTP_200 + resp.body = urllib.parse.urlencode(plugin.vmvnc(vmid)) + except: + logger.error('grid> vnc error') + raise + resp.status = falcon.HTTP_403 + response = 'VNC ERR' + resp.body = response + else: + resp.status = falcon.HTTP_403 + resp.body = apicheck_resp + + +if __name__ == '__main__': + sys.exit("invoke proxmaster via uwsgi. thanks. bye. o/") + +#setup routes +wsgi_app = api = application = falcon.API() + +#display motd +welcome() +#logger.info('grid> sync') +#grid.sync() + +# setup routes +res_cluster = ClusterResource() +api.add_route('/instance', res_cluster) + +res_status = StatusResource() +api.add_route('/instance/{vmid}', res_status) + +res_delete = DeleteResource() +api.add_route('/instance/delete/{vmid}', res_delete) + +res_archivate = ArchivateResource() +api.add_route('/instance/archivate/{vmid}', res_archivate) + +res_unarchive = UnArchiveResource() +api.add_route('/instance/unarchive/{vmid}', res_unarchive) + +res_start = StartResource() +api.add_route('/instance/start/{vmid}', res_start) + +res_shutdown = ShutdownResource() +api.add_route('/instance/shutdown/{vmid}', res_shutdown) + +res_stop = StopResource() +api.add_route('/instance/stop/{vmid}', res_stop) + +res_vnc = VNCResource() +api.add_route('/instance/vnc/{vmid}', res_vnc) + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..12df3a9 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +uwsgi +pyOpenSSL +requests +falcon +urllib +netaddr +proxmoxer +websockify diff --git a/runwebsockify.py b/runwebsockify.py new file mode 100644 index 0000000..9ad217c --- /dev/null +++ b/runwebsockify.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python + +import websockify + +websockify.websocketproxy.websockify_init() diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..34916c5 --- /dev/null +++ b/start.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +# Log rotation +LOG_DIR=${HOME}/proxmaster/log +LOG_FILE="${LOG_DIR}/proxmaster.log" + +mkdir -p $LOG_DIR + +TIME=`date -u +%s` + +if [ -e $LOG_FILE ] ; then + mv ${LOG_FILE} ${LOG_FILE}.${TIME} && touch ${LOG_FILE} +else + touch ${LOG_FILE} +fi + +#startuwsgi instance +uwsgi config.ini diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..aae9bc2 --- /dev/null +++ b/utils.py @@ -0,0 +1,73 @@ +#. -*- coding: utf-8 +# +# helper functions + +from copy import deepcopy +import functools + +def dict_merge(target, *args): + """ Recursively merges mutiple dicts """ + # Merge multiple dicts + if len(args) > 1: + for obj in args: + dict_merge(target, obj) + return target + + # Recursively merge dicts and set non-dict values + obj = args[0] + if not isinstance(obj, dict): + return obj + for k, v in obj.items(): + if k in target and isinstance(target[k], dict): + dict_merge(target[k], v) + else: + target[k] = deepcopy(v) + return target + +def get_rec(search_dict, field): + """ + Takes a dict with nested lists and dicts, + and searches all dicts for a key of the field + provided. + """ + fields_found = [] + + for key, value in search_dict.items(): + if key == field: + fields_found.append(value) + + elif isinstance(value, dict): + results = get_rec(value, field) + for result in results: + fields_found.append(result) + + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + more_results = get_recursively(item, field) + for another_result in more_results: + fields_found.append(another_result) + + return fields_found + + +def gen_dict_extract(key, var): + if hasattr(var,'iteritems'): + for k, v in var.iteritems(): + if k == key: + yield v + if isinstance(v, dict): + for result in gen_dict_extract(key, v): + yield result + elif isinstance(v, list): + for d in v: + for result in gen_dict_extract(key, d): + yield result + + +def chained_get(dct, *keys): + SENTRY = object() + def getter(level, key): + return 'NA' if level is SENTRY else level.get(key, SENTRY) + return functools.reduce(getter, keys, dct) +