This commit is contained in:
Daniel afx 2016-02-15 12:30:43 +02:00
commit ff55f10a95
16 changed files with 1391 additions and 0 deletions

66
.gitignore vendored Normal file
View file

@ -0,0 +1,66 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
#proxmaster custom ignores
log/
config.ini
*.json

18
LICENSE Normal file
View file

@ -0,0 +1,18 @@
Copyright (c) 2015 deflax.net
This software is provided 'as-is', without any express or implied
warranty. In no event will the authors be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgement in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.

13
README.md Normal file
View file

@ -0,0 +1,13 @@
#Proxmaster
Python RESTful API for managing a grid of vm slaves
##Installation instructions:
```
1. sudo pip3 install -r requirements.txt
2. create config.ini with the following format:
3. chmod +x start.sh
4. create nginx vhost via the provided template files:
- config.ini.dist
- nginx_example_vhost.txt
5. o/
```

62
clientsdb.py Normal file
View file

@ -0,0 +1,62 @@
# -*- coding: utf-8
#
# manage clientsdb.json
#import site packages
import json
#import local packages
import ioconfig
import utils
def addclient(vmid, vmname, clientid, clientname):
""" add new client to the clientsdb.json """
clientsdb = readclientsdb()
if str(clientid) in clientsdb:
ioconfig.logger.info('clients> client ' + clientid + ' already exists. merging.')
else:
ioconfig.logger.info('clients> client ' + clientid + ' does not exist. creating.')
vcard = { 'name':str(clientname) }
newclient = { str(clientid):vcard }
clientsdb.update(newclient)
ioconfig.logger.info('clients> vmid ' + vmid + ' will be owned by ' + clientid + ' (' + clientname + ')')
vmdata = { 'name':str(vmname), 'vmid':str(vmid), 'ownerid':str(clientid) }
clientsdb[str(clientid)][str(vmid)] = vmdata
writeclientsdb(clientsdb)
def vmowner(vmid, vmname, verbose):
""" find the owner of the vm """
clientsdb = readclientsdb()
try:
clientid = utils.get_rec(clientsdb, str(vmid))[0]['ownerid']
clientname = clientsdb[str(clientid)]['name']
except:
raise
clientid = '0' #unknown owner
clientname = 'unknown'
if verbose:
ioconfig.logger.info('clients> the owner of ' + str(vmid) + ' (' + vmname + ') is ' + str(clientid) + ' (' + clientname + ')')
return clientid
def readclientsdb():
""" read client db """
try:
with open('clients.json') as dbr:
clientsdb = json.load(dbr)
dbr.close()
except:
clientsdb = {}
ioconfig.logger.warning('clients> initializing...')
#writeclientsdb(clientsdb)
return clientsdb
def writeclientsdb(clientsdb):
""" write db """
with open('clients.json', 'w') as dbw:
json.dump(clientsdb, dbw)
dbw.close()

28
config.ini.dist Normal file
View file

@ -0,0 +1,28 @@
[uwsgi]
socket = 127.0.0.1:5117
workers = 3
[general]
logfile = log/proxmaster.log
adminuser = masteradmin@pve
apipass = CHANGEME
vmid_min = 1000
vmid_max = 999999
novnc_url = http://FQDN/noVNC
[region_0]
name = CHANGEME
ipv4_min = 192.168.0.4
ipv4_max = 192.168.0.254
[slave_0]
name = CHANGEME
masterip = 192.168.0.2
password = CHANGEME
regionid = 0
[slave_1]
name = CHANGEME
masterip = 192.168.0.3
password = CHANGEME
regionid = 0

396
grid.py Normal file
View file

@ -0,0 +1,396 @@
#. -*- coding: utf-8
#
# vdc
#import site packages
import base64
import json
import re
import datetime
import random
import netaddr
#import local packages
import utils
import plugin
import ioconfig
import clientsdb
import journaldb
logger = ioconfig.logger
config = ioconfig.parser
def sync(cached=True):
""" calls slave objects and mix their nodes in a common cluster grid """
a = datetime.datetime.now()
grid_vmid_min = config.get('general', 'vmid_min')
grid_vmid_max = config.get('general', 'vmid_max')
real_grid = {'name':'real', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max }
cache_grid = {'name':'cache', "vmid_min":grid_vmid_min, "vmid_max":grid_vmid_max }
regionselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'region' + r'\W*', x)]
for ri in range(len(regionselector)):
region_section = config.sections()[int(regionselector[ri])]
region_id = region_section.split("_")[1]
region_name = config.get(region_section, 'name')
region_range_min = config.get(region_section, 'ipv4_min')
region_range_max = config.get(region_section, 'ipv4_max')
slaveselector = [i for i, x in enumerate(config.sections()) if re.match(r'\W*' + 'slave' + r'\W*', x)]
real_region = { "id":region_id, "region":region_name, "ipv4_min":region_range_min, "ipv4_max":region_range_max }
cache_region = real_region.copy() #same region both in cache nad status
for si in range(len(slaveselector)):
slave_section = config.sections()[int(slaveselector[si])]
slave_id = slave_section.split("_")[1]
slave_name = config.get(slave_section, 'name')
slave_masterip = config.get(slave_section, 'masterip')
slave_password = config.get(slave_section, 'password')
slave_regionid = config.get(slave_section, 'regionid')
enc_slave_password = base64.b64encode(slave_password.encode('ascii')) #encode base64 to avoid shoulder surfers
decoded_password = enc_slave_password.decode('utf-8')
real_slave = { "id":slave_id, "slave":slave_name, "masterip":slave_masterip, "password":decoded_password }
optional_slave = {}
cache_file = 'cache-slave-' + slave_id + '.json'
prefix = 'cache> [' + slave_id + '] '
# check if slave is in current region and include it in current dict if it is
if slave_regionid == region_id:
try:
#trying to connect to slave host
#vmlist = plugin.vmlist(slave_id, slave_masterip, enc_slave_password.decode('utf-8'))
proxobject = plugin.auth(slave_id, slave_masterip, enc_slave_password)
vmlist = plugin.vmlist(proxobject)
real_slave['alive'] = 'up'
logger.info(prefix + 'is up')
except:
#raise
#slave cant be read so it will be marked down.
real_slave['alive'] = 'down'
logger.warning(prefix + 'is down')
if real_slave['alive'] == 'up':
#populate grid with vms then
for vm in vmlist:
#static parameters that CAN go to to cache:
vm_id = vm['vmid']
vm_name = vm['name']
vm_owner = clientsdb.vmowner(vm_id, vm_name, cached) #read clientsdb cache
static_vm = { "vmid":str(vm_id), "hostname":vm_name, 'type':vm['vmtype'], 'owner':vm_owner }
real_slave[str(vm_id)] = static_vm
#dynamic parameters that SHOULD NOT go to the cache:
dynamic_vm = { "uptime":vm['uptime'] }
optional_slave[str(vm_id)] = dynamic_vm
#check current cache
cache_slave = real_slave.copy() #fallback to the current state
try:
with open(cache_file) as fcr:
cache_slave = json.load(fcr)
fcr.close()
except:
logger.info(prefix + 'does not exist in cache. Initializing...')
if cache_slave['alive'] == 'up':
#slave was not down so it must be up...
cache_slave = update_cache(real_slave, cache_file, prefix, 'up')
logger.info(prefix + 'sync success o/')
else:
#if the slave was down before, compare the state before overwriting the cache
cache_slave['alive'] = 'up' #even if alive status in cache is still down we ignore it by forcing it to up
logger.info(prefix + 'was down')
#show the differences in log for manual (or maybe automatic at some point fixing)
findDiff(cache_slave, real_slave)
if cache_slave != real_slave:
logger.warning(prefix + 'cache != current status. please restore host!')
cache_slave = update_cache(cache_slave, cache_file, prefix, 'down')
else:
logger.info(prefix + 'cache == current status. host restored. o/')
cache_slave = update_cache(cache_slave, cache_file, prefix, 'up')
#what to do with cache if host is down
if real_slave['alive'] == 'down':
try:
logger.warning(prefix + 'loading cache...')
with open(cache_file) as fscr:
cache_slave = json.load(fscr)
fscr.close()
logger.warning(prefix + '...done')
cache_slave = update_cache(cache_slave, cache_file, prefix, 'down')
except:
logger.error(prefix + 'sync failure!')
cache_slave = real_slave.copy()
#raise
#we safely mix the dynamic ids now that we dont deal with cache anymore
mergedslave = utils.dict_merge({}, real_slave, optional_slave)
real_region[slave_id] = mergedslave
cache_region[slave_id] = cache_slave
#the region is finally included in the grid
real_grid[region_id] = real_region
cache_grid[region_id] = cache_region
b = datetime.datetime.now()
real_grid["synctime"] = str(b-a)
#dump all data to json
WriteCache(cache_grid, 'grid-cache.json')
WriteCache(real_grid, 'grid-real.json')
if cached == True:
return cache_grid
else:
return real_grid
def update_cache(cachedata, cachefile, prefix, newstatus):
""" update administravite status """
cachedata['alive'] = newstatus
WriteCache(cachedata, cachefile)
#TODO send mail
logger.info(prefix + 'administratively ' + newstatus)
return cachedata
def WriteCache(src_data, cache_file):
with open(cache_file, 'w') as fcw:
json.dump(src_data, fcw)
fcw.close()
def query_region(region_name):
""" translate region name to region id """
grid_data = readcache()
all_regions = []
for element in grid_data:
try:
if str(element) == grid_data[element]['id']:
all_regions.append(element)
except:
continue
for region in all_regions:
if grid_data[region]['region'] == region_name:
logger.info('grid> region ' + region_name + ' found')
return grid_data[region]['id']
break
logger.error('grid> cant find region ' + region_name)
return "-1"
def query_happiness(region_id):
""" analyzes grid data for the reuqested region and returns proposed slave_id,
based on a "happiness" factor. happiness means alive and free :) """
grid_data = readcache()
grid_data = grid_data[str(region_id)]
all_slaves = []
for element in grid_data:
try:
if str(element) == grid_data[element]['id']:
all_slaves.append(element)
except:
continue
all_slaves = [ int(x) for x in all_slaves ] #convert values from str to int
alive_slaves = []
for slaveid in all_slaves:
if str(grid_data[str(slaveid)]['alive']) == 'up':
alive_slaves.append(slaveid)
logger.info('grid> alive slaves ' + str(alive_slaves))
#happy_slave = random.choice(alive_slaves)
if len(alive_slaves) < 1:
logger.error('grid> grid is full. add more slaves')
else:
happy_slave = 1 #TODO: analyze slaves and make informed decision.
logger.info('grid> ' + str(happy_slave) + ' selected')
return happy_slave
def generate_ipv4(region_id, how_many=1):
""" analyzes cached grid data and returns ip addresses for new machines. """
grid_data = readcache()
ip_range_min = grid_data[str(region_id)]['ipv4_min']
ip_range_max = grid_data[str(region_id)]['ipv4_max']
region_ipset = netaddr.IPSet(netaddr.IPRange(ip_range_min, ip_range_max))
region_ips = []
for ip in region_ipset:
region_ips.append(ip)
ip_min = 0
ip_max = len(region_ips) - 1
tested_ips = [] #initialize ip cache
requested_ips = []
all_ips = utils.get_rec(grid_data, 'ipaddr')
for ips in range(int(how_many)):
counter = 0
while True:
if counter == 50:
logger.error('grid> ip range full')
return None
else:
counter += 1
requested_ip_index = random.randint(ip_min, ip_max)
requested_ip = str(region_ips[requested_ip_index])
if requested_ip in tested_ips:
logger.warning('grid> ip address ' + str(requested_ip) + ' already tested. cache: ' + str(tested_ips))
continue
if requested_ip in requested_ips:
logger.warning('grid> ip address ' + str(requested_ip) + ' already generated')
tested_ips.append(requested_ip)
continue
if requested_ip in all_ips:
position = used_ips.index(requested_ip)
logger.warning('grid> ip address ' + str(requested_ip) + ' already exist. location:' + str(position))
tested_ips.append(requested_ip)
continue
else:
tested_ips = [] #clear ip cache
break
logger.info('grid> ip address ' + requested_ip + ' selected')
requested_ips.append(requested_ip)
logger.info('grid> ip addresses ' + str(requested_ips) + ' selected')
return requested_ips
def generate_vmid():
""" analyzes cached grid data and return proposed vmid for new machines """
grid_data = readcache()
tested_vmids = [] #initialize id cache
id_min = grid_data['vmid_min']
id_max = grid_data['vmid_max']
all_vmid = utils.get_rec(grid_data, 'vmid') #get all vmid values from the nested grid
all_vmid = [ int(x) for x in all_vmid ] #convert values from str to int (its a vmid right?)
counter = 0
while True:
if counter == 50:
logger.error('grid> ip range full')
return None
else:
counter += 1
requested_vmid = random.randint(int(id_min), int(id_max)) #max 90k machines
if requested_vmid in tested_vmids:
logger.warning('grid> vmid ' + str(requested_vmid) + ' already tested. cache:' + str(tested_vmids))
continue
if requested_vmid in all_vmid:
position = all_vmid.index(requested_vmid)
logger.warning('grid> vmid ' + str(requested_vmid) + ' already exist. location:' + str(position))
tested_vmids.append(requested_vmid)
else:
tested_vmids = [] #clear tested vmid cache
break
logger.info('grid> vmid ' + str(requested_vmid) + ' selected')
return requested_vmid
def query_slave_data(slave_id):
""" read the cache for the requested slave_id """
grid_data = readcache()
all_regions = []
for element in grid_data:
try:
if str(element) == grid_data[element]['id']:
all_regions.append(element)
except:
continue
try:
for region in all_regions:
cslave = grid_data[region]
result_slave = cslave[str(slave_id)] #select the slave from all regions
if result_slave != None:
return result_slave
break
except:
raise
return {}
def query_vm(req_vmid):
""" returns slave_id and vm_type for the requested vmid """
#read current state (no cache)
sync(False)
grid_data = readreal()
#compare requested vmid to all vmid's from the grid
#TODO: maybe we should also check the owner somehow
all_vmid = utils.get_rec(grid_data, 'vmid')
target = int(0)
for running_vmid in all_vmid:
if str(req_vmid) == str(running_vmid):
target = req_vmid #=runn?
break
else:
continue
if target == 0:
logger.error('grid> vmid {} cannot be found.' + str(req_vmid))
return "-1"
region_id, slave_id = journaldb.getjnode(target)
try:
vm_type = grid_data[str(region_id)][str(slave_id)][str(target)]['type']
except:
raise
#we should know them by now.
return slave_id, vm_type
def findDiff(d1, d2, path=""):
for k in d1.keys():
if not k in d2.keys():
logger.warning('cache> ' + str(k) + ' as key not in d2')
else:
if type(d1[k]) is dict:
if path == "":
path = k
else:
path = path + "->" + k
findDiff(d1[k],d2[k], path)
else:
if d1[k] != d2[k]:
logger.warning('cache> ' + str(k) + ' ' + str(d1[k]) + ' [-]')
logger.warning('cache> ' + str(k) + ' ' + str(d2[k]) + ' [+]')
def readcache():
""" read the last saved cache and return its contents """
try:
with open('grid-cache.json') as gridfile:
grid_data = json.load(gridfile)
gridfile.close()
except:
grid_data = {}
logger.error('cache> cannot read cache file')
return grid_data
def readreal():
""" read the current state and return its contents """
try:
with open('grid-real.json') as gridfile:
grid_data = json.load(gridfile)
gridfile.close()
resulttime = grid_data['synctime']
logger.info('grid> sync for ' + resulttime)
except:
grid_data = {}
logger.error('cache> cannot read temp file')
return grid_data
if __name__ == '__main__':
print(sync())
#print(query_region('Plovdiv, Bulgaria'))
#print(query_happiness(0))
#print(generate_ipv4(0,3))
#print(generate_vmid())
#print(query_slave_data(0))
#print(query_vm(483039))

21
ioconfig.py Normal file
View file

@ -0,0 +1,21 @@
#. -*- coding: utf-8
#
# ioconfig.py - read/write config/log etc.
import configparser
import logging
""" config reader """
parser = configparser.ConfigParser()
parser.read('config.ini')
""" log writer """
logger = logging.getLogger('proxmaster')
logging.captureWarnings(True)
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler(parser.get('general', 'logfile'))
handler.setLevel(logging.DEBUG)
#formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

59
journaldb.py Normal file
View file

@ -0,0 +1,59 @@
# -*- coding: utf-8
#
# manage journaldb.json which is a table of vmid's as indexes update on vmcreate and
# values of region_id and slave_id. should support deletion of unused id's and be
# properly updated on vm migrations
#site
import json
#local
import ioconfig
import utils
def createjnode(vmid, regionid, slaveid):
""" create new record into the journal. invoked on vm creation """
journaldb = readjournal()
if str(vmid) in journaldb:
ioconfig.logger.warning('journal> overwriting id[{}] !'.format(vmid))
else:
jnode = { str(vmid):{} }
journaldb.update(jnode)
ioconfig.logger.info ('journal> r[{}] s[{}] -> id[{}]'.format(regionid, slaveid, vmid))
jdata = { 'vmid':str(vmid), 'slaveid':str(slaveid), 'regionid':str(regionid) }
journaldb[str(vmid)] = jdata
writedb(journaldb)
def getjnode(vmid):
""" query the database for records with requested vmid. invoked on user commands """
journaldb = readjournal()
try:
regionid = journaldb[str(vmid)]['regionid']
slaveid = journaldb[str(vmid)]['slaveid']
ioconfig.logger.info('journal> read: id[{}] -> r[{}] s[{}]'.format(vmid, regionid, slaveid))
except:
ioconfig.logger.error('journal> invalid id[{}] !'.format(vmid))
else:
return regionid, slaveid
def readjournal():
""" read journal """
try:
with open('journal.json') as dbr:
journaldb = json.load(dbr)
dbr.close()
except:
journaldb = {}
ioconfig.logger.warning('journal> initializing...')
return journaldb
def writedb(journaldb):
""" write journal """
with open('journal.json', 'w') as dbw:
json.dump(journaldb, dbw)
dbw.close()

10
nginx_example_vhost.txt Normal file
View file

@ -0,0 +1,10 @@
server {
listen 80;
server_name EXAMPLE.com;
location / {
uwsgi_pass 127.0.0.1:5117;
include uwsgi_params;
uwsgi_param UWSGI_SCRIPT proxmaster;
uwsgi_param UWSGI_PYHOME /home/USER/proxmaster;
}
}

44
novnc.py Normal file
View file

@ -0,0 +1,44 @@
#. -*- coding: utf-8
#
# novnc daemon spawner
#import site packages
import shlex, subprocess
def spawn(target, options):
""" spawn """
vnctarget = '{}:{} {}:{} '.format(target['listen_host'], target['listen_port'], target['target_host'], target['target_port'])
a_options = ''
for key, value in options.items():
if value == True:
c_option = '--{} '.format(key)
else:
c_option = '--{} {} '.format(key, value)
a_options += c_option
command_line = 'python3 runwebsockify.py ' + a_options + vnctarget
args = shlex.split(command_line)
p = subprocess.Popen(args)
print('spawn!')
def spawn2(options):
""" spawn novnc daemon """
print('daemon spawned')
novncd = threading.Thread(name='novncd', target=start_websockify, args=(options,))
novncd.setDaemon(True) #daemonic ]:>
novncd.start()
print('stauts', novncd.isAlive())
def start_websockify(options):
""" spawn websockify process """
print(options)
server = websockify.WebSocketProxy(**options)
server.start_server()
print('daemon exited')
#while True:
# print('daemon')

301
plugin.py Normal file
View file

@ -0,0 +1,301 @@
#. -*- coding: utf-8 -
# required proxmox permissions: PVESysAdmin, PVEVMAdmin
#
# afx 2015-2016
# site
from proxmoxer import ProxmoxAPI
import base64
import json
import time
import socket
#local
import grid
import clientsdb
import journaldb
import utils
import ioconfig
import novnc
def auth(slave_id, masterip=None, enc_password=None):
""" captures slave we want to auth from the cache and extract the credentials """
adminuser = ioconfig.parser.get('general', 'adminuser')
if masterip is None:
result_slave = grid.query_slave_data(slave_id)
masterip = result_slave['masterip']
enc_password = result_slave['password']
adminpassword = base64.b64decode(enc_password).decode('ascii')
#vendor specific
#connection = lib_proxmoxia.Connector(masterip)
#auth_token = connection.get_auth_token(adminuser, adminpassword)
#proxobject = lib_proxmoxia.Proxmox(connection)
proxobject = ProxmoxAPI(masterip, user=adminuser, password=adminpassword, verify_ssl=False)
return proxobject
def vmlist(proxobject):
""" get vmlist """
#we keep a single node proxmoxes so node id = 0
#slave_name = proxobject.get('cluster/status')#'name']
slave_name = proxobject.cluster.status.get()[0]['name']
#query_kvm = proxobject.get('nodes/%s/qemu' % slave_name)
query_kvm = proxobject.nodes(slave_name).qemu.get()
query_lxc = proxobject.nodes(slave_name).lxc.get()
for kvm_dict in query_kvm:
kvm_dict['vmtype'] = 'kvm'
for lxc_dict in query_lxc:
lxc_dict['vmtype'] = 'lxc'
vmlist = query_kvm + query_lxc #merge machine list
return vmlist
def vmcreate(req):
""" create vm. returns JSON with data for whmcs """
grid.sync()
region_id = grid.query_region(req['region'])
if region_id == "-1":
logger.error('grid> no region found')
response = 'NO REGION FOUND'
return response
slave_id = str(grid.query_happiness(region_id))
vm_id = str(grid.generate_vmid())
vm_ipv4 = grid.generate_ipv4(region_id, req['vps_ipv4'])
ipv4_dict = {}
ipidx = 0
vm_name = req['hostname']
client_id = req['clientid']
client_name = req['clientname']
proxobject = auth(slave_id) #we dont know the ip of slave_id so we leave the auth function to find it itself.
slave_name = proxobject.cluster.status.get()[0]['name']
#ioconfig.logger.info('grid[' + slave_name + ']> recieved data: %s, %s, %s, %s, %s', region_id, slave_id, vm_id, vm_ipv4, req)
for ip in vm_ipv4:
ipv4_dict[str(ipidx)] = str(ip)
ipidx += 1
response = { 'status':'CREATING', 'vmid':vm_id, 'name':vm_name, 'password':'TODO', 'ipv4_0':vm_ipv4[0] }
disk_filename = 'vm-' + vm_id + '-disk-1'
description = vm_name + ' (' + vm_id + ')\n'
description += 'owned by ' + client_name + ' (' + client_id + ')\n'
description += 'master ip: ' + vm_ipv4[0]
#create partition
image_name = 'vm-' + vm_id + '-disk-0'
local_storage = proxobject.nodes(slave_name).storage('lvm')
local_storage.content.post(vmid=vm_id,
filename=image_name,
size=req['vps_disk'] + 'G')
if req['vps_type'] == 'KVM':
create_result = proxobject.nodes(slave_name).qemu.post(vmid=vm_id,
name=vm_name,
sockets=1,
cores=req['vps_cpu'],
memory=req['vps_ram'],
virtio0='lvm:' + image_name,
ide1='skyblue:iso/' + req['vps_os'] + ',media=cdrom',
net0='e1000,bridge=pub',
onboot=1,
description=description)
if req['vps_type'] == 'LXC':
create_result = proxobject.nodes(slave_name).lxc.post(vmid=vm_id,
hostname=vm_name,
password=req['password'],
sockets=1,
cores=req['vps_cpu'],
memory=req['vps_ram'],
virtio0='lvm:' + image_name,
ip_address=vm_ipv4[0],
onboot=1,
description=description)
#populate the client db and vm journal
client_id = req['clientid']
client_name = req['clientname']
clientsdb.addclient(vm_id, vm_name, client_id, client_name)
journaldb.createjnode(vm_id, region_id, slave_id)
#start the machihe
time.sleep(7) #wait few seconds for the slave to prepare the machine for initial run
vmstart(vm_id)
return response
def vmstatus(vm_id):
""" returns the status of the machine """
slave_id, vm_type = grid.query_vm(vm_id)
proxobject = auth(slave_id)
vm_type = vm_type.lower()
slave_name = proxobject.cluster.status.get()[0]['name']
ioconfig.logger.info('grid[%s]> get status of %s %s' % (slave_name, vm_type, vm_id))
if vm_type == 'kvm':
result = proxobject.nodes(slave_name).qemu(vm_id).status.current.get()
if vm_type == 'lxc':
result = proxobject.nodes(slave_name).lxc(vm_id).status.current.get()
return result
def vmstart(vm_id):
""" starts a machine """
slave_id, vm_type = grid.query_vm(vm_id)
proxobject = auth(slave_id)
vm_type = vm_type.lower()
slave_name = proxobject.cluster.status.get()[0]['name']
ioconfig.logger.info('grid[%s]> starting %s %s' % (slave_name, vm_type, vm_id))
if vm_type == 'kvm':
result = proxobject.nodes(slave_name).qemu(vm_id).status.start.post()
if vm_type == 'lxc':
result = proxobject.nodes(slave_name).lxc(vm_id).status.start.post()
#ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result))
response = { 'status':'STARTING' }
return response
def vmshutdown(vm_id):
""" acpi shutdown the machine.. """
slave_id, vm_type = grid.query_vm(vm_id)
proxobject = auth(slave_id)
vm_type = vm_type.lower()
slave_name = proxobject.cluster.status.get()[0]['name']
ioconfig.logger.info('grid[%s]> acpi shutdown %s %s' % (slave_name, vm_type, vm_id))
if vm_type == 'kvm':
result = proxobject.nodes(slave_name).qemu(vm_id).status.stop.post()
if vm_type == 'lxc':
result = proxobject.nodes(slave_name).lxc(vm_id).status.stop.post()
#ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result))
response = { 'status':'SHUTDOWN', 'vmid':vm_id }
return response
def vmstop(vm_id):
""" poweroff the machine.. """
slave_id, vm_type = grid.query_vm(vm_id)
proxobject = auth(slave_id)
vm_type = vm_type.lower()
slave_name = proxobject.cluster.status.get()[0]['name']
ioconfig.logger.info('grid[%s]> power off %s %s' % (slave_name, vm_type, vm_id))
if vm_type == 'kvm':
result = proxobject.nodes(slave_name).qemu(vm_id).status.stop.post()
if vm_type == 'lxc':
result = proxobject.nodes(slave_name).lxc(vm_id).status.stop.post()
#ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result))
response = { 'status':'STOPPING', 'vmid':vm_id }
return response
def vmshutdown(vm_id):
""" graceful stop """
slave_id, vm_type = grid.query_vm(vm_id)
proxobject = auth(slave_id)
vm_type = vm_type.lower()
slave_name = proxobject.cluster.status.get()[0]['name']
ioconfig.logger.info('grid[%s]> acpi shutdown sent to %s %s' % (slave_name, vm_type, vm_id))
if vm_type == 'kvm':
result = proxobject.nodes(slave_name).qemu(vm_id).status.shutdown.post()
if vm_type == 'lxc':
result = proxobject.nodes(slave_name).lxc(vm_id).status.shutdown.post()
#ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result))
response = { 'status':'ACPI SHUTDOWN', 'vmid':vm_id }
return response
def vmsuspend(vm_id):
""" suspend machine """
slave_id, vm_type = grid.query_vm(vm_id)
proxobject = auth(slave_id)
vm_type = vm_type.lower()
slave_name = proxobject.cluster.status.get()[0]['name']
ioconfig.logger.info('grid[%s]> suspending %s %s' % (slave_name, vm_type, vm_id))
if vm_type == 'kvm':
result = proxobject.nodes(slave_name).qemu(vm_id).status.suspend.post()
if vm_type == 'lxc':
result = proxobject.nodes(slave_name).lxc(vm_id).status.suspend.post()
#ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result))
response = { 'status':'SUSPEND', 'vmid':vm_id }
return response
def vmresume(vm_id):
""" resume machine """
slave_id, vm_type = grid.query_vm(vm_id)
proxobject = auth(slave_id)
vm_type = vm_type.lower()
slave_name = proxobject.cluster.status.get()[0]['name']
ioconfig.logger.info('grid[%s]> resuming %s %s' % (slave_name, vm_type, vm_id))
if vm_type == 'kvm':
result = proxobject.nodes(slave_name).qemu(vm_id).status.resume.post()
if vm_type == 'lxc':
result = proxobject.nodes(slave_name).lxc(vm_id).status.resume.post()
#ioconfig.logger.info('grid[{}]> {}'.format(slave_name, result))
response = { 'status':'RESUME', 'vmid':vm_id }
return response
def vmvnc(vm_id):
""" invoke vnc ticket """
slave_id, vm_type = grid.query_vm(vm_id)
proxobject = auth(slave_id)
vm_type = vm_type.lower()
slave_name = proxobject.cluster.status.get()[0]['name']
ioconfig.logger.info('grid[%s]> invoking vnc ticket for %s %s' % (slave_name, vm_type, vm_id))
if vm_type == 'kvm':
ticket = proxobject.nodes(slave_name).qemu(vm_id).vncproxy.post(websocket=1)
#socket = proxobject.nodes(slave_name).qemu(vm_id).vncwebsocket.get(port=ticket['port'],
# vncticket=ticket['ticket'])
if vm_type == 'lxc':
ticket = proxobject.nodes(slave_name).lxc(vm_id).vncproxy.post()
#socket = proxobject.nodes(slave_name).lxc(vm_id).vncwebsocket.get(port=ticket['port'],
# vncticket=ticket['ticket'])
slaveip = grid.query_slave_data(slave_id)['masterip']
#slaveport = socket['port']
slaveport = ticket['port']
listenport = str(int(slaveport) + 1000 + (int(slave_id) * 100)) #TODO: max 100 parallel connections/slave.
myip = getmyip()
vnc_target = { 'target_host': slaveip,
'target_port': slaveport,
'listen_host': myip,
'listen_port': listenport }
vnc_options = { 'idle-timeout': 20,
'verbose': True,
'run-once': True }
novnc.spawn(vnc_target, vnc_options)
external_url = ioconfig.parser.get('general', 'novnc_url')
prefix = external_url + "/?host=" + myip + "&port=" + listenport + "&encrypt=0&true_color=1&password="
ioconfig.logger.info('grid[{}]> {}'.format(slave_name, prefix + ticket['ticket']))
response = { 'status':'VNC', 'vmid':vm_id }
return response
def getmyip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("gmail.com",80))
myip = s.getsockname()[0]
s.close
return myip
if __name__ == '__main__':
#internal module tests
time.sleep(30)

269
proxmaster.py Normal file
View file

@ -0,0 +1,269 @@
#. -*- coding: utf-8 -
# required proxmox permissions: PVESysAdmin, PVEVMAdmin
#
# afx 2015-2016
# import site packages
import logging
import falcon
import sys
import json
import urllib.parse
#import local packages
import ioconfig
import grid
import plugin
config = ioconfig.parser
logger = ioconfig.logger
def welcome():
"""displays motd in log as welcome message"""
logger.info('###################################')
logger.info('# proxmaster ][ (c) 2015-2016 afx #')
logger.info('###################################')
def apicheck(params):
""" compares request params for api key with the config file"""
try:
if params['apipass'] == config.get('general', 'apipass'):
status = True
response = 'OK'
else:
status = False
response = 'GET KEY DENIED'
logger.error('grid> read access denied. key mismatch')
except:
#raise
status = False
response = 'GET URL DENIED'
logger.error('grid> read access denied. url error?')
finally:
return (status, response)
#API methods
class ClusterResource(object):
def on_get(self, req, resp):
"""TEST ONLY. List cluster nodes. TEST ONLY"""
logger.info('grid> cache status')
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
resp.status = falcon.HTTP_200
resp.body = str(grid.sync())
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
def on_post(self, req, resp):
"""Create a cluster node, returns array of: status, vmid, pass, ipv4, """
logger.info('grid> create ' + str(req.params))
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
resp.status = falcon.HTTP_200
try:
resp.body = urllib.parse.urlencode(plugin.vmcreate(req.params))
except:
logger.error('grid> create function cancelled')
raise
resp.status = falcon.HTTP_403
response = 'CREATE ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
class StatusResource(object):
def on_get(self, req, resp, vmid):
""" check vm status """
logger.info('grid> status ' + str(vmid))
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
resp.status = falcon.HTTP_200
try:
resp.body = urllib.parse.urlencode(plugin.vmstatus(vmid))
except:
logger.error('grid> status error')
raise
resp.status = falcon.HTTP_403
response = 'STATUS ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
class DeleteResource(object):
def on_post(self, req, resp, vmid):
""" delete machine completely"""
logger.info('grid> delete ' + str(vmid))
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
try:
resp.body = urllib.parse.urlencode(plugin.vmdelete(vmid))
except:
logger.error('grid> delete error')
raise
resp.status = falcon.HTTP_403
response = 'DELETE ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
class ArchivateResource(object):
def on_post(self, req, resp, vmid):
""" Temporary suspend the instance """
logger.info('grid> suspend ' + str(vmid))
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
resp.status = falcon.HTTP_200
try:
resp.body = urllib.parse.urlencode(plugin.vmsuspend(vmid))
except:
logger.error('grid> pause error')
raise
resp.status = falcon.HTTP_403
response = 'PAUSE ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
class UnArchiveResource(object):
def on_post(self, req, resp, vmid):
""" Unuspend the instance """
logger.info('grid> resume ' + str(vmid))
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
resp.status = falcon.HTTP_200
try:
resp.body = urllib.parse.urlencode(plugin.vmresume(vmid))
except:
logger.error('grid> resume error')
raise
resp.status = falcon.HTTP_403
response = 'RESUME ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
class StartResource(object):
def on_post(self, req, resp, vmid):
""" Start the instance """
logger.info('grid> start ' + str(vmid))
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
resp.status = falcon.HTTP_200
try:
resp.body = urllib.parse.urlencode(plugin.vmstart(vmid))
except:
logger.error('grid> start error')
#raise
resp.status = falcon.HTTP_403
response = 'START ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
class ShutdownResource(object):
def on_post(self, req, resp, vmid):
""" ACPI Shutdown the instance """
logger.info('grid> shutdown ' + str(vmid))
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
resp.status = falcon.HTTP_200
try:
resp.body = urllib.parse.urlencode(plugin.vmshutdown(vmid))
#TODO: Try few times and then return proper status message
except:
logger.error('grid> shutdown error')
#raise
resp.status = falcon.HTTP_403
response = 'SHUTDOWN ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
class StopResource(object):
def on_post(self, req, resp, vmid):
""" Stop the instance """
logger.info('grid> stop ' + str(vmid))
apicheck_stat, apicheck_resp = apicheck(req.params)
if apicheck_stat:
resp.status = falcon.HTTP_200
try:
resp.body = urllib.parse.urlencode(plugin.vmstop(vmid))
except:
logger.error('grid> stop error')
#raise
resp.status = falcon.HTTP_403
response = 'STOP ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
class VNCResource(object):
def on_post(self, req, resp, vmid):
""" Create a VNC link to the instance """
apicheck_stat, apicheck_resp = apicheck(req.params)
logger.info('grid> vnc ' + str(vmid))
if apicheck_stat:
try:
resp.status = falcon.HTTP_200
resp.body = urllib.parse.urlencode(plugin.vmvnc(vmid))
except:
logger.error('grid> vnc error')
raise
resp.status = falcon.HTTP_403
response = 'VNC ERR'
resp.body = response
else:
resp.status = falcon.HTTP_403
resp.body = apicheck_resp
if __name__ == '__main__':
sys.exit("invoke proxmaster via uwsgi. thanks. bye. o/")
#setup routes
wsgi_app = api = application = falcon.API()
#display motd
welcome()
#logger.info('grid> sync')
#grid.sync()
# setup routes
res_cluster = ClusterResource()
api.add_route('/instance', res_cluster)
res_status = StatusResource()
api.add_route('/instance/{vmid}', res_status)
res_delete = DeleteResource()
api.add_route('/instance/delete/{vmid}', res_delete)
res_archivate = ArchivateResource()
api.add_route('/instance/archivate/{vmid}', res_archivate)
res_unarchive = UnArchiveResource()
api.add_route('/instance/unarchive/{vmid}', res_unarchive)
res_start = StartResource()
api.add_route('/instance/start/{vmid}', res_start)
res_shutdown = ShutdownResource()
api.add_route('/instance/shutdown/{vmid}', res_shutdown)
res_stop = StopResource()
api.add_route('/instance/stop/{vmid}', res_stop)
res_vnc = VNCResource()
api.add_route('/instance/vnc/{vmid}', res_vnc)

8
requirements.txt Normal file
View file

@ -0,0 +1,8 @@
uwsgi
pyOpenSSL
requests
falcon
urllib
netaddr
proxmoxer
websockify

5
runwebsockify.py Normal file
View file

@ -0,0 +1,5 @@
#!/usr/bin/env python
import websockify
websockify.websocketproxy.websockify_init()

18
start.sh Executable file
View file

@ -0,0 +1,18 @@
#!/bin/bash
# Log rotation
LOG_DIR=${HOME}/proxmaster/log
LOG_FILE="${LOG_DIR}/proxmaster.log"
mkdir -p $LOG_DIR
TIME=`date -u +%s`
if [ -e $LOG_FILE ] ; then
mv ${LOG_FILE} ${LOG_FILE}.${TIME} && touch ${LOG_FILE}
else
touch ${LOG_FILE}
fi
#startuwsgi instance
uwsgi config.ini

73
utils.py Normal file
View file

@ -0,0 +1,73 @@
#. -*- coding: utf-8
#
# helper functions
from copy import deepcopy
import functools
def dict_merge(target, *args):
""" Recursively merges mutiple dicts """
# Merge multiple dicts
if len(args) > 1:
for obj in args:
dict_merge(target, obj)
return target
# Recursively merge dicts and set non-dict values
obj = args[0]
if not isinstance(obj, dict):
return obj
for k, v in obj.items():
if k in target and isinstance(target[k], dict):
dict_merge(target[k], v)
else:
target[k] = deepcopy(v)
return target
def get_rec(search_dict, field):
"""
Takes a dict with nested lists and dicts,
and searches all dicts for a key of the field
provided.
"""
fields_found = []
for key, value in search_dict.items():
if key == field:
fields_found.append(value)
elif isinstance(value, dict):
results = get_rec(value, field)
for result in results:
fields_found.append(result)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
more_results = get_recursively(item, field)
for another_result in more_results:
fields_found.append(another_result)
return fields_found
def gen_dict_extract(key, var):
if hasattr(var,'iteritems'):
for k, v in var.iteritems():
if k == key:
yield v
if isinstance(v, dict):
for result in gen_dict_extract(key, v):
yield result
elif isinstance(v, list):
for d in v:
for result in gen_dict_extract(key, d):
yield result
def chained_get(dct, *keys):
SENTRY = object()
def getter(level, key):
return 'NA' if level is SENTRY else level.get(key, SENTRY)
return functools.reduce(getter, keys, dct)