homes/common/.local/lib/python2.7/site-packages/powerline/segments/common.py
2013-05-02 18:11:33 +02:00

1063 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# vim:fileencoding=utf-8:noet
from __future__ import absolute_import
import os
import sys
from datetime import datetime
import socket
from multiprocessing import cpu_count as _cpu_count
from powerline.lib import add_divider_highlight_group
from powerline.lib.url import urllib_read, urllib_urlencode
from powerline.lib.vcs import guess
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment, with_docstring
from powerline.lib.monotonic import monotonic
from powerline.lib.humanize_bytes import humanize_bytes
from powerline.theme import requires_segment_info
from collections import namedtuple
cpu_count = None
@requires_segment_info
def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False):
'''Return the current hostname.
:param bool only_if_ssh:
only return the hostname if currently in an SSH session
:param bool exclude_domain:
return the hostname without domain if there is one
'''
if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'):
return None
if exclude_domain:
return socket.gethostname().split('.')[0]
return socket.gethostname()
@requires_segment_info
class RepositorySegment(KwThreadedSegment):
def __init__(self):
super(RepositorySegment, self).__init__()
self.directories = {}
@staticmethod
def key(segment_info, **kwargs):
return os.path.abspath(segment_info['getcwd']())
def update(self, *args):
# .compute_state() is running only in this method, and only in one
# thread, thus operations with .directories do not need write locks
# (.render() method is not using .directories). If this is changed
# .directories needs redesigning
self.directories.clear()
return super(RepositorySegment, self).update(*args)
def compute_state(self, path):
repo = guess(path=path)
if repo:
if repo.directory in self.directories:
return self.directories[repo.directory]
else:
r = self.process_repo(repo)
self.directories[repo.directory] = r
return r
class RepositoryStatusSegment(RepositorySegment):
interval = 2
@staticmethod
def process_repo(repo):
return repo.status()
repository_status = with_docstring(RepositoryStatusSegment(),
'''Return the status for the current VCS repository.''')
class BranchSegment(RepositorySegment):
interval = 0.2
started_repository_status = False
@staticmethod
def process_repo(repo):
return repo.branch()
@staticmethod
def render_one(branch, status_colors=False, **kwargs):
if branch and status_colors:
return [{
'contents': branch,
'highlight_group': ['branch_dirty' if repository_status(**kwargs) else 'branch_clean', 'branch'],
}]
else:
return branch
def startup(self, status_colors=False, **kwargs):
super(BranchSegment, self).startup(**kwargs)
if status_colors:
self.started_repository_status = True
repository_status.startup(**kwargs)
def shutdown(self):
if self.started_repository_status:
repository_status.shutdown()
super(BranchSegment, self).shutdown()
branch = with_docstring(BranchSegment(),
'''Return the current VCS branch.
:param bool status_colors:
determines whether repository status will be used to determine highlighting. Default: True.
Highlight groups used: ``branch_clean``, ``branch_dirty``, ``branch``.
''')
@requires_segment_info
def cwd(pl, segment_info, dir_shorten_len=None, dir_limit_depth=None, use_path_separator=False):
'''Return the current working directory.
Returns a segment list to create a breadcrumb-like effect.
:param int dir_shorten_len:
shorten parent directory names to this length (e.g. :file:`/long/path/to/powerline` → :file:`/l/p/t/powerline`)
:param int dir_limit_depth:
limit directory depth to this number (e.g. :file:`/long/path/to/powerline` → :file:`⋯/to/powerline`)
:param bool use_path_separator:
Use path separator in place of soft divider.
Divider highlight group used: ``cwd:divider``.
Highlight groups used: ``cwd:current_folder`` or ``cwd``. It is recommended to define all highlight groups.
'''
import re
try:
cwd = segment_info['getcwd']()
except OSError as e:
if e.errno == 2:
# user most probably deleted the directory
# this happens when removing files from Mercurial repos for example
pl.warn('Current directory not found')
cwd = "[not found]"
else:
raise
home = segment_info['home']
if home:
cwd = re.sub('^' + re.escape(home), '~', cwd, 1)
cwd_split = cwd.split(os.sep)
cwd_split_len = len(cwd_split)
if dir_limit_depth and cwd_split_len > dir_limit_depth + 1:
del(cwd_split[0:-dir_limit_depth])
cwd_split.insert(0, '')
cwd = [i[0:dir_shorten_len] if dir_shorten_len and i else i for i in cwd_split[:-1]] + [cwd_split[-1]]
ret = []
if not cwd[0]:
cwd[0] = '/'
draw_inner_divider = not use_path_separator
for part in cwd:
if not part:
continue
if use_path_separator:
part += os.sep
ret.append({
'contents': part,
'divider_highlight_group': 'cwd:divider',
'draw_inner_divider': draw_inner_divider,
})
ret[-1]['highlight_group'] = ['cwd:current_folder', 'cwd']
if use_path_separator:
ret[-1]['contents'] = ret[-1]['contents'][:-1]
return ret
def date(pl, format='%Y-%m-%d', istime=False):
'''Return the current date.
:param str format:
strftime-style date format string
Divider highlight group used: ``time:divider``.
Highlight groups used: ``time`` or ``date``.
'''
return [{
'contents': datetime.now().strftime(format),
'highlight_group': (['time'] if istime else []) + ['date'],
'divider_highlight_group': 'time:divider' if istime else None,
}]
def fuzzy_time(pl):
'''Display the current time as fuzzy time, e.g. "quarter past six".'''
hour_str = ['twelve', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven']
minute_str = {
5: 'five past',
10: 'ten past',
15: 'quarter past',
20: 'twenty past',
25: 'twenty-five past',
30: 'half past',
35: 'twenty-five to',
40: 'twenty to',
45: 'quarter to',
50: 'ten to',
55: 'five to',
}
special_case_str = {
(23, 58): 'round about midnight',
(23, 59): 'round about midnight',
(0, 0): 'midnight',
(0, 1): 'round about midnight',
(0, 2): 'round about midnight',
(12, 0): 'noon',
}
now = datetime.now()
try:
return special_case_str[(now.hour, now.minute)]
except KeyError:
pass
hour = now.hour
if now.minute > 32:
if hour == 23:
hour = 0
else:
hour += 1
if hour > 11:
hour = hour - 12
hour = hour_str[hour]
minute = int(round(now.minute / 5.0) * 5)
if minute == 60 or minute == 0:
return ' '.join([hour, 'o\'clock'])
else:
minute = minute_str[minute]
return ' '.join([minute, hour])
def _external_ip(query_url='http://ipv4.icanhazip.com/'):
return urllib_read(query_url).strip()
class ExternalIpSegment(ThreadedSegment):
interval = 300
def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs):
self.query_url = query_url
super(ExternalIpSegment, self).set_state(**kwargs)
def update(self, old_ip):
return _external_ip(query_url=self.query_url)
def render(self, ip, **kwargs):
if not ip:
return None
return [{'contents': ip, 'divider_highlight_group': 'background:divider'}]
external_ip = with_docstring(ExternalIpSegment(),
'''Return external IP address.
Suggested URIs:
* http://ipv4.icanhazip.com/
* http://ipv6.icanhazip.com/
* http://icanhazip.com/ (returns IPv6 address if available, else IPv4)
:param str query_url:
URI to query for IP address, should return only the IP address as a text string
Divider highlight group used: ``background:divider``.
''')
# Weather condition code descriptions available at
# http://developer.yahoo.com/weather/#codes
weather_conditions_codes = (
('tornado', 'stormy'), # 0
('tropical_storm', 'stormy'), # 1
('hurricane', 'stormy'), # 2
('severe_thunderstorms', 'stormy'), # 3
('thunderstorms', 'stormy'), # 4
('mixed_rain_and_snow', 'rainy' ), # 5
('mixed_rain_and_sleet', 'rainy' ), # 6
('mixed_snow_and_sleet', 'snowy' ), # 7
('freezing_drizzle', 'rainy' ), # 8
('drizzle', 'rainy' ), # 9
('freezing_rain', 'rainy' ), # 10
('showers', 'rainy' ), # 11
('showers', 'rainy' ), # 12
('snow_flurries', 'snowy' ), # 13
('light_snow_showers', 'snowy' ), # 14
('blowing_snow', 'snowy' ), # 15
('snow', 'snowy' ), # 16
('hail', 'snowy' ), # 17
('sleet', 'snowy' ), # 18
('dust', 'foggy' ), # 19
('fog', 'foggy' ), # 20
('haze', 'foggy' ), # 21
('smoky', 'foggy' ), # 22
('blustery', 'foggy' ), # 23
('windy', ), # 24
('cold', 'day' ), # 25
('clouds', 'cloudy'), # 26
('mostly_cloudy_night', 'cloudy'), # 27
('mostly_cloudy_day', 'cloudy'), # 28
('partly_cloudy_night', 'cloudy'), # 29
('partly_cloudy_day', 'cloudy'), # 30
('clear_night', 'night' ), # 31
('sun', 'sunny' ), # 32
('fair_night', 'night' ), # 33
('fair_day', 'day' ), # 34
('mixed_rain_and_hail', 'rainy' ), # 35
('hot', 'sunny' ), # 36
('isolated_thunderstorms', 'stormy'), # 37
('scattered_thunderstorms', 'stormy'), # 38
('scattered_thunderstorms', 'stormy'), # 39
('scattered_showers', 'rainy' ), # 40
('heavy_snow', 'snowy' ), # 41
('scattered_snow_showers', 'snowy' ), # 42
('heavy_snow', 'snowy' ), # 43
('partly_cloudy', 'cloudy'), # 44
('thundershowers', 'rainy' ), # 45
('snow_showers', 'snowy' ), # 46
('isolated_thundershowers', 'rainy' ), # 47
)
# ('day', (25, 34)),
# ('rainy', (5, 6, 8, 9, 10, 11, 12, 35, 40, 45, 47)),
# ('cloudy', (26, 27, 28, 29, 30, 44)),
# ('snowy', (7, 13, 14, 15, 16, 17, 18, 41, 42, 43, 46)),
# ('stormy', (0, 1, 2, 3, 4, 37, 38, 39)),
# ('foggy', (19, 20, 21, 22, 23)),
# ('sunny', (32, 36)),
# ('night', (31, 33))):
weather_conditions_icons = {
'day': '',
'blustery': '',
'rainy': '',
'cloudy': '',
'snowy': '',
'stormy': '',
'foggy': '',
'sunny': '',
'night': '',
'windy': '',
'not_available': '<EFBFBD>',
'unknown': '',
}
temp_conversions = {
'C': lambda temp: temp,
'F': lambda temp: (temp * 9 / 5) + 32,
'K': lambda temp: temp + 273.15,
}
# Note: there are also unicode characters for units: ℃, ℉ and
temp_units = {
'C': '°C',
'F': '°F',
'K': 'K',
}
class WeatherSegment(ThreadedSegment):
interval = 600
def set_state(self, location_query=None, **kwargs):
self.location = location_query
self.url = None
super(WeatherSegment, self).set_state(**kwargs)
def update(self, old_weather):
import json
if not self.url:
# Do not lock attribute assignments in this branch: they are used
# only in .update()
if not self.location:
location_data = json.loads(urllib_read('http://freegeoip.net/json/' + _external_ip()))
self.location = ','.join([location_data['city'],
location_data['region_name'],
location_data['country_name']])
query_data = {
'q':
'use "http://github.com/yql/yql-tables/raw/master/weather/weather.bylocation.xml" as we;'
'select * from we where location="{0}" and unit="c"'.format(self.location).encode('utf-8'),
'format': 'json',
}
self.url = 'http://query.yahooapis.com/v1/public/yql?' + urllib_urlencode(query_data)
raw_response = urllib_read(self.url)
if not raw_response:
self.error('Failed to get response')
return
response = json.loads(raw_response)
condition = response['query']['results']['weather']['rss']['channel']['item']['condition']
condition_code = int(condition['code'])
temp = float(condition['temp'])
try:
icon_names = weather_conditions_codes[condition_code]
except IndexError:
if condition_code == 3200:
icon_names = ('not_available',)
self.warn('Weather is not available for location {0}', self.location)
else:
icon_names = ('unknown',)
self.error('Unknown condition code: {0}', condition_code)
return (temp, icon_names)
def render(self, weather, icons=None, unit='C', temp_format=None, temp_coldest=-30, temp_hottest=40, **kwargs):
if not weather:
return None
temp, icon_names = weather
for icon_name in icon_names:
if icons:
if icon_name in icons:
icon = icons[icon_name]
break
else:
icon = weather_conditions_icons[icon_names[-1]]
temp_format = temp_format or ('{temp:.0f}' + temp_units[unit])
converted_temp = temp_conversions[unit](temp)
if temp <= temp_coldest:
gradient_level = 0
elif temp >= temp_hottest:
gradient_level = 100
else:
gradient_level = (temp - temp_coldest) * 100.0 / (temp_hottest - temp_coldest)
groups = ['weather_condition_' + icon_name for icon_name in icon_names] + ['weather_conditions', 'weather']
return [
{
'contents': icon + ' ',
'highlight_group': groups,
'divider_highlight_group': 'background:divider',
},
{
'contents': temp_format.format(temp=converted_temp),
'highlight_group': ['weather_temp_gradient', 'weather_temp', 'weather'],
'divider_highlight_group': 'background:divider',
'gradient_level': gradient_level,
},
]
weather = with_docstring(WeatherSegment(),
'''Return weather from Yahoo! Weather.
Uses GeoIP lookup from http://freegeoip.net/ to automatically determine
your current location. This should be changed if you're in a VPN or if your
IP address is registered at another location.
Returns a list of colorized icon and temperature segments depending on
weather conditions.
:param str unit:
temperature unit, can be one of ``F``, ``C`` or ``K``
:param str location_query:
location query for your current location, e.g. ``oslo, norway``
:param dict icons:
dict for overriding default icons, e.g. ``{'heavy_snow' : u''}``
:param str temp_format:
format string, receives ``temp`` as an argument. Should also hold unit.
:param float temp_coldest:
coldest temperature. Any temperature below it will have gradient level equal
to zero.
:param float temp_hottest:
hottest temperature. Any temperature above it will have gradient level equal
to 100. Temperatures between ``temp_coldest`` and ``temp_hottest`` receive
gradient level that indicates relative position in this interval
(``100 * (cur-coldest) / (hottest-coldest)``).
Divider highlight group used: ``background:divider``.
Highlight groups used: ``weather_conditions`` or ``weather``, ``weather_temp_gradient`` (gradient) or ``weather``.
Also uses ``weather_conditions_{condition}`` for all weather conditions supported by Yahoo.
''')
def system_load(pl, format='{avg:.1f}', threshold_good=1, threshold_bad=2, track_cpu_count=False):
'''Return system load average.
Highlights using ``system_load_good``, ``system_load_bad`` and
``system_load_ugly`` highlighting groups, depending on the thresholds
passed to the function.
:param str format:
format string, receives ``avg`` as an argument
:param float threshold_good:
threshold for gradient level 0: any normalized load average below this
value will have this gradient level.
:param float threshold_bad:
threshold for gradient level 100: any normalized load average above this
value will have this gradient level. Load averages between
``threshold_good`` and ``threshold_bad`` receive gradient level that
indicates relative position in this interval:
(``100 * (cur-good) / (bad-good)``).
Note: both parameters are checked against normalized load averages.
:param bool track_cpu_count:
if True powerline will continuously poll the system to detect changes
in the number of CPUs.
Divider highlight group used: ``background:divider``.
Highlight groups used: ``system_load_gradient`` (gradient) or ``system_load``.
'''
global cpu_count
try:
cpu_num = cpu_count = _cpu_count() if cpu_count is None or track_cpu_count else cpu_count
except NotImplementedError:
pl.warn('Unable to get CPU count: method is not implemented')
return None
ret = []
for avg in os.getloadavg():
normalized = avg / cpu_num
if normalized < threshold_good:
gradient_level = 0
elif normalized < threshold_bad:
gradient_level = (normalized - threshold_good) * 100.0 / (threshold_bad - threshold_good)
else:
gradient_level = 100
ret.append({
'contents': format.format(avg=avg),
'highlight_group': ['system_load_gradient', 'system_load'],
'divider_highlight_group': 'background:divider',
'gradient_level': gradient_level,
})
ret[0]['contents'] += ' '
ret[1]['contents'] += ' '
return ret
try:
import psutil
def _get_bytes(interface):
io_counters = psutil.network_io_counters(pernic=True)
if_io = io_counters.get(interface)
if not if_io:
return None
return if_io.bytes_recv, if_io.bytes_sent
def _get_interfaces():
io_counters = psutil.network_io_counters(pernic=True)
for interface, data in io_counters.items():
if data:
yield interface, data.bytes_recv, data.bytes_sent
def _get_user(segment_info):
return psutil.Process(os.getpid()).username
class CPULoadPercentSegment(ThreadedSegment):
interval = 1
def update(self, old_cpu):
return psutil.cpu_percent(interval=None)
def run(self):
while not self.shutdown_event.is_set():
try:
self.update_value = psutil.cpu_percent(interval=self.interval)
except Exception as e:
self.exception('Exception while calculating cpu_percent: {0}', str(e))
def render(self, cpu_percent, format='{0:.0f}%', **kwargs):
if not cpu_percent:
return None
return [{
'contents': format.format(cpu_percent),
'gradient_level': cpu_percent,
'highlight_group': ['cpu_load_percent_gradient', 'cpu_load_percent'],
}]
except ImportError:
def _get_bytes(interface): # NOQA
with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
rx = int(file_obj.read())
with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
tx = int(file_obj.read())
return (rx, tx)
def _get_interfaces(): # NOQA
for interface in os.listdir('/sys/class/net'):
x = _get_bytes(interface)
if x is not None:
yield interface, x[0], x[1]
def _get_user(segment_info): # NOQA
return segment_info['environ'].get('USER', None)
class CPULoadPercentSegment(ThreadedSegment): # NOQA
interval = 1
@staticmethod
def startup(**kwargs):
pass
@staticmethod
def start():
pass
@staticmethod
def shutdown():
pass
@staticmethod
def render(cpu_percent, pl, format='{0:.0f}%', **kwargs):
pl.warn('psutil package is not installed, thus CPU load is not available')
return None
cpu_load_percent = with_docstring(CPULoadPercentSegment(),
'''Return the average CPU load as a percentage.
Requires the ``psutil`` module.
:param str format:
Output format. Accepts measured CPU load as the first argument.
Highlight groups used: ``cpu_load_percent_gradient`` (gradient) or ``cpu_load_percent``.
''')
username = False
# os.geteuid is not available on windows
_geteuid = getattr(os, 'geteuid', lambda: 1)
def user(pl, segment_info=None):
'''Return the current user.
Highlights the user with the ``superuser`` if the effective user ID is 0.
Highlight groups used: ``superuser`` or ``user``. It is recommended to define all highlight groups.
'''
global username
if username is False:
username = _get_user(segment_info)
if username is None:
pl.warn('Failed to get username')
return None
euid = _geteuid()
return [{
'contents': username,
'highlight_group': 'user' if euid != 0 else ['superuser', 'user'],
}]
if 'psutil' not in globals():
user = requires_segment_info(user)
if os.path.exists('/proc/uptime'):
def _get_uptime():
with open('/proc/uptime', 'r') as f:
return int(float(f.readline().split()[0]))
elif 'psutil' in globals():
from time import time
def _get_uptime(): # NOQA
# psutil.BOOT_TIME is not subject to clock adjustments, but time() is.
# Thus it is a fallback to /proc/uptime reading and not the reverse.
return int(time() - psutil.BOOT_TIME)
else:
def _get_uptime(): # NOQA
raise NotImplementedError
@add_divider_highlight_group('background:divider')
def uptime(pl, days_format='{days:d}d', hours_format=' {hours:d}h', minutes_format=' {minutes:d}m', seconds_format=' {seconds:d}s', shorten_len=3):
'''Return system uptime.
:param str days_format:
day format string, will be passed ``days`` as the argument
:param str hours_format:
hour format string, will be passed ``hours`` as the argument
:param str minutes_format:
minute format string, will be passed ``minutes`` as the argument
:param str seconds_format:
second format string, will be passed ``seconds`` as the argument
:param int shorten_len:
shorten the amount of units (days, hours, etc.) displayed
Divider highlight group used: ``background:divider``.
'''
try:
seconds = _get_uptime()
except NotImplementedError:
pl.warn('Unable to get uptime. You should install psutil package')
return None
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
time_formatted = list(filter(None, [
days_format.format(days=days) if days and days_format else None,
hours_format.format(hours=hours) if hours and hours_format else None,
minutes_format.format(minutes=minutes) if minutes and minutes_format else None,
seconds_format.format(seconds=seconds) if seconds and seconds_format else None,
]))[0:shorten_len]
return ''.join(time_formatted).strip()
class NetworkLoadSegment(KwThreadedSegment):
import re
interfaces = {}
replace_num_pat = re.compile(r'[a-zA-Z]+')
@staticmethod
def key(interface='detect', **kwargs):
return interface
def compute_state(self, interface):
if interface == 'detect':
proc_exists = getattr(self, 'proc_exists', None)
if proc_exists is None:
proc_exists = self.proc_exists = os.path.exists('/proc/net/route')
if proc_exists:
# Look for default interface in routing table
with open('/proc/net/route', 'rb') as f:
for line in f.readlines():
parts = line.split()
if len(parts) > 1:
iface, destination = parts[:2]
if not destination.replace(b'0', b''):
interface = iface.decode('utf-8')
break
if interface == 'detect':
# Choose interface with most total activity, excluding some
# well known interface names
interface, total = 'eth0', -1
for name, rx, tx in _get_interfaces():
base = self.replace_num_pat.match(name)
if None in (base, rx, tx) or base.group() in ('lo', 'vmnet', 'sit'):
continue
activity = rx + tx
if activity > total:
total = activity
interface = name
if interface in self.interfaces:
idata = self.interfaces[interface]
try:
idata['prev'] = idata['last']
except KeyError:
pass
else:
idata = {}
if self.run_once:
idata['prev'] = (monotonic(), _get_bytes(interface))
self.shutdown_event.wait(self.interval)
self.interfaces[interface] = idata
idata['last'] = (monotonic(), _get_bytes(interface))
return idata
def render_one(self, idata, recv_format='{value:>8}', sent_format='{value:>8}', suffix='B/s', si_prefix=False, **kwargs):
if not idata or 'prev' not in idata:
return None
t1, b1 = idata['prev']
t2, b2 = idata['last']
measure_interval = t2 - t1
if None in (b1, b2):
return None
if measure_interval == 0:
self.error('Measure interval is zero. This should not happen')
return None
r = []
for i, key in zip((0, 1), ('recv', 'sent')):
format = locals()[key + '_format']
value = (b2[i] - b1[i]) / measure_interval
max_key = key + '_max'
is_gradient = max_key in kwargs
hl_groups = ['network_load_' + key, 'network_load']
if is_gradient:
hl_groups[:0] = (group + '_gradient' for group in hl_groups)
r.append({
'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)),
'divider_highlight_group': 'background:divider',
'highlight_group': hl_groups,
})
if is_gradient:
max = kwargs[max_key]
if value >= max:
r[-1]['gradient_level'] = 100
else:
r[-1]['gradient_level'] = value * 100.0 / max
return r
network_load = with_docstring(NetworkLoadSegment(),
'''Return the network load.
Uses the ``psutil`` module if available for multi-platform compatibility,
falls back to reading
:file:`/sys/class/net/{interface}/statistics/{rx,tx}_bytes`.
:param str interface:
network interface to measure (use the special value "detect" to have powerline try to auto-detect the network interface)
:param str suffix:
string appended to each load string
:param bool si_prefix:
use SI prefix, e.g. MB instead of MiB
:param str recv_format:
format string, receives ``value`` as argument
:param str sent_format:
format string, receives ``value`` as argument
:param float recv_max:
maximum number of received bytes per second. Is only used to compute
gradient level
:param float sent_max:
maximum number of sent bytes per second. Is only used to compute gradient
level
Divider highlight group used: ``background:divider``.
Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_load_recv_gradient`` (gradient) or ``network_load_gradient`` (gradient), ``network_load_sent`` or ``network_load_recv`` or ``network_load``.
''')
@requires_segment_info
def virtualenv(pl, segment_info):
'''Return the name of the current Python virtualenv.'''
return os.path.basename(segment_info['environ'].get('VIRTUAL_ENV', '')) or None
_IMAPKey = namedtuple('Key', 'username password server port folder')
class EmailIMAPSegment(KwThreadedSegment):
interval = 60
@staticmethod
def key(username, password, server='imap.gmail.com', port=993, folder='INBOX', **kwargs):
return _IMAPKey(username, password, server, port, folder)
def compute_state(self, key):
if not key.username or not key.password:
self.warn('Username and password are not configured')
return None
try:
import imaplib
import re
mail = imaplib.IMAP4_SSL(key.server, key.port)
mail.login(key.username, key.password)
rc, message = mail.status(key.folder, '(UNSEEN)')
unread_str = message[0].decode('utf-8')
unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
except imaplib.IMAP4.error as e:
unread_count = str(e)
return unread_count
@staticmethod
def render_one(unread_count, max_msgs=None, **kwargs):
if not unread_count:
return None
elif type(unread_count) != int or not max_msgs:
return [{
'contents': str(unread_count),
'highlight_group': 'email_alert',
}]
else:
return [{
'contents': str(unread_count),
'highlight_group': ['email_alert_gradient', 'email_alert'],
'gradient_level': min(unread_count * 100.0 / max_msgs, 100),
}]
email_imap_alert = with_docstring(EmailIMAPSegment(),
'''Return unread e-mail count for IMAP servers.
:param str username:
login username
:param str password:
login password
:param str server:
e-mail server
:param int port:
e-mail server port
:param str folder:
folder to check for e-mails
:param int max_msgs:
Maximum number of messages. If there are more messages then max_msgs then it
will use gradient level equal to 100, otherwise gradient level is equal to
``100 * msgs_num / max_msgs``. If not present gradient is not computed.
Highlight groups used: ``email_alert_gradient`` (gradient), ``email_alert``.
''')
class NowPlayingSegment(object):
STATE_SYMBOLS = {
'fallback': '',
'play': '',
'pause': '▮▮',
'stop': '',
}
def __call__(self, player='mpd', format='{state_symbol} {artist} - {title} ({total})', **kwargs):
player_func = getattr(self, 'player_{0}'.format(player))
stats = {
'state': None,
'state_symbol': self.STATE_SYMBOLS['fallback'],
'album': None,
'artist': None,
'title': None,
'elapsed': None,
'total': None,
}
func_stats = player_func(**kwargs)
if not func_stats:
return None
stats.update(func_stats)
return format.format(**stats)
@staticmethod
def _run_cmd(cmd):
from subprocess import Popen, PIPE
try:
p = Popen(cmd, stdout=PIPE)
stdout, err = p.communicate()
except OSError as e:
sys.stderr.write('Could not execute command ({0}): {1}\n'.format(e, cmd))
return None
return stdout.strip()
@staticmethod
def _convert_state(state):
state = state.lower()
if 'play' in state:
return 'play'
if 'pause' in state:
return 'pause'
if 'stop' in state:
return 'stop'
@staticmethod
def _convert_seconds(seconds):
return '{0:.0f}:{1:02.0f}'.format(*divmod(float(seconds), 60))
def player_cmus(self, pl):
'''Return cmus player information.
cmus-remote -Q returns data with multi-level information i.e.
status playing
file <file_name>
tag artist <artist_name>
tag title <track_title>
tag ..
tag n
set continue <true|false>
set repeat <true|false>
set ..
set n
For the information we are looking for we don't really care if we're on
the tag level or the set level. The dictionary comprehension in this
method takes anything in ignore_levels and brings the key inside that
to the first level of the dictionary.
'''
now_playing_str = self._run_cmd(['cmus-remote', '-Q'])
if not now_playing_str:
return
ignore_levels = ('tag', 'set',)
now_playing = dict(((token[0] if token[0] not in ignore_levels else token[1],
(' '.join(token[1:]) if token[0] not in ignore_levels else
' '.join(token[2:]))) for token in [line.split(' ') for line in now_playing_str.split('\n')[:-1]]))
state = self._convert_state(now_playing.get('status'))
return {
'state': state,
'state_symbol': self.STATE_SYMBOLS.get(state),
'album': now_playing.get('album'),
'artist': now_playing.get('artist'),
'title': now_playing.get('title'),
'elapsed': self._convert_seconds(now_playing.get('position', 0)),
'total': self._convert_seconds(now_playing.get('duration', 0)),
}
def player_mpd(self, pl, host='localhost', port=6600):
try:
import mpd
client = mpd.MPDClient()
client.connect(host, port)
now_playing = client.currentsong()
if not now_playing:
return
status = client.status()
client.close()
client.disconnect()
return {
'state': status.get('state'),
'state_symbol': self.STATE_SYMBOLS.get(status.get('state')),
'album': now_playing.get('album'),
'artist': now_playing.get('artist'),
'title': now_playing.get('title'),
'elapsed': self._convert_seconds(now_playing.get('elapsed', 0)),
'total': self._convert_seconds(now_playing.get('time', 0)),
}
except ImportError:
now_playing = self._run_cmd(['mpc', 'current', '-f', '%album%\n%artist%\n%title%\n%time%', '-h', str(host), '-p', str(port)])
if not now_playing:
return
now_playing = now_playing.split('\n')
return {
'album': now_playing[0],
'artist': now_playing[1],
'title': now_playing[2],
'total': now_playing[3],
}
def player_spotify(self, pl):
try:
import dbus
except ImportError:
sys.stderr.write('Could not add Spotify segment: Requires python-dbus.\n')
return
bus = dbus.SessionBus()
DBUS_IFACE_PROPERTIES = 'org.freedesktop.DBus.Properties'
DBUS_IFACE_PLAYER = 'org.freedesktop.MediaPlayer2'
try:
player = bus.get_object('com.spotify.qt', '/')
iface = dbus.Interface(player, DBUS_IFACE_PROPERTIES)
info = iface.Get(DBUS_IFACE_PLAYER, 'Metadata')
status = iface.Get(DBUS_IFACE_PLAYER, 'PlaybackStatus')
except dbus.exceptions.DBusException:
return
if not info:
return
state = self._convert_state(status)
return {
'state': state,
'state_symbol': self.STATE_SYMBOLS.get(state),
'album': info.get('xesam:album'),
'artist': info.get('xesam:artist')[0],
'title': info.get('xesam:title'),
'total': self._convert_seconds(info.get('mpris:length') / 1e6),
}
def player_rhythmbox(self, pl):
now_playing = self._run_cmd(['rhythmbox-client', '--no-start', '--no-present', '--print-playing-format', '%at\n%aa\n%tt\n%te\n%td'])
if not now_playing:
return
now_playing = now_playing.split('\n')
return {
'album': now_playing[0],
'artist': now_playing[1],
'title': now_playing[2],
'elapsed': now_playing[3],
'total': now_playing[4],
}
now_playing = NowPlayingSegment()