mt_clockwheel/mt_clockwheel.py

202 lines
7.0 KiB
Python

import argparse
import datetime
from glob import glob
import json
from listdict import ListDict
import os
import subprocess
# globals
mediadir = "/var/lib/ffplayout/tv-media"
playlistdir = "/var/lib/ffplayout/playlists"
channel = "newellijaytelevision"
verbose = False
from_date = datetime.date.today() + datetime.timedelta(days=1) # tomorrow by default
to_date = datetime.date.today() + datetime.timedelta(days=1) # tomorrow by default
playlist_duration = 6 * 60 * 60 # six hours, in seconds
mediadirs = ListDict()
def main():
parse_config()
get_media_dirs(mediadir)
for single_date in daterange(from_date, to_date):
if verbose:
print("Processing date", single_date)
daylist = build_day(single_date)
daylist_path = playlistdir + "/" + channel + "/" + single_date.strftime("%Y") + "/" + single_date.strftime("%m") + "/" + single_date.strftime("%d") + "/"
if not os.path.isdir(daylist_path):
os.makedirs(daylist_path)
daylist_file = single_date.strftime("%Y") + "-" + single_date.strftime("%m") + "-" + single_date.strftime("%d") + ".json"
with open(daylist_path + daylist_file, "w") as file:
file.write(daylist)
if verbose:
print('wrote playlist file', daylist_path + daylist_file)
def parse_config():
global mediadir, playlistdir, channel, verbose, from_date, to_date, playlist_duration
parser = argparse.ArgumentParser(
prog="mt_clockwheel",
description="a simple clockwheel playlist generator for ffplayout",
epilog="brought to you by your friends at Mountain Town Technology and Community Media Network"
)
parser.add_argument('-d', '--directory', dest='mediadir', help='root directory for media library')
parser.add_argument('-p', '--playlists', dest='playlistdir', help='root directory for playlists')
parser.add_argument('-c', '--channel', dest='channel', help='channel name to generate playlists for')
parser.add_argument('-v', '--verbose', action='store_true', help='provide verbose output logging')
parser.add_argument('-f', '--from', dest='from_date', help='first date to generate playlist for')
parser.add_argument('-t', '--to', dest='to_date', help='last date to generate playlist for')
parser.add_argument('-l', '--length', dest='playlist_duration', help='duration (in seconds) for each playlist - if a factor of 24 * 60 * 60 then playlist will be duplicated to fill 24 hours')
args = parser.parse_args()
if args.verbose:
verbose = True
if args.mediadir:
mediadir = args.mediadir
if args.playlistdir:
playlistdir = args.playlistdir
if args.channel:
channel = args.channel
if args.from_date:
from_date = datetime.datetime.strptime(args.from_date, "%Y-%m-%d").date()
if args.to_date:
to_date = datetime.datetime.strptime(args.to_date, "%Y-%m-%d").date()
if args.playlist_duration:
playlist_duration = args.playlist_duration
if verbose:
print("Arguments parsed, config:")
print("mediadir:", mediadir)
print("playlistdir:", playlistdir)
print("channel:", channel)
print("from_date:", from_date)
print("to_date:", to_date)
def get_media_dirs(rootdir):
for file in os.listdir(rootdir):
if file == "Commercials":
pass
elif file == "Television":
get_media_dirs(os.path.join(rootdir, file))
else:
d = os.path.join(rootdir, file)
if os.path.isdir(d):
if verbose:
print("adding " + d + " to media dirs")
mediadirs.add_item(d)
def build_day(this_date):
d = {} # empty dict that will become our JSON output
d["channel"] = channel
d["date"] = this_date.strftime("%Y-%m-%d")
d["program"] = [] # empty list to populate with programs
total_time = 0
get_commercial = False
while total_time < playlist_duration:
entry, length = get_playlist_entry(get_commercial)
d["program"].append(entry)
total_time += length
if length > 0:
get_commercial = not get_commercial
if verbose:
print(' added program:', json.dumps(entry), length)
day_length = 24 * 60 * 60
if day_length % playlist_duration == 0:
if verbose:
print("looping playlist...")
iterations = (day_length / playlist_duration)
iteration = d["program"] # snapshot of the unit of iteration
looped = []
if verbose:
print("iteration: ", json.dumps(iteration, indent=4))
print("iterations: ", iterations)
while iterations > 0:
looped.extend(iteration)
iterations -= 1
d["program"] = looped
playlist = json.dumps(d)
if verbose:
print("playlist json:", playlist)
return playlist
def get_playlist_entry(get_commercial):
exts = ['*.mp4', '*.webm', '*.mov']
entry_dir = mediadir + '/Commercials' if get_commercial else mediadirs.choose_random_item()
media_files = ListDict()
found_media = [f for ext in exts
for f in glob(os.path.join(entry_dir, '**', ext), recursive=True)]
for d in found_media:
media_files.add_item(d)
this_file = media_files.choose_random_item() if media_files.length() > 0 else ""
if this_file == "":
if verbose:
print('cannot select file from empty folder')
return {}, 0 # get out of this oopsie situation and try again
if verbose:
print('selected file: ' + this_file)
# probe file for details
metadata = json.loads(json.dumps(probe(this_file), sort_keys=True, indent=4))
if verbose:
print('file metadata: ', metadata)
print('format: ', metadata['format'])
duration = int(float(metadata['format']['duration_secs'])) if "duration_secs" in metadata['format'].keys() else int(float(metadata["format"]["duration"])) if "duration" in metadata['format'].keys() else 0
if duration == 0:
if verbose:
print('cannot parse duration of ' + this_file)
return {}, 0 # cannot parse duration, so skip this file
entry = {
"in": 0,
"out": duration,
"duration": duration,
"source": this_file
}
if get_commercial:
entry["category"] = "advertisement"
length = entry["duration"]
return entry, length
def probe(file):
cmd = ['ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_format', '-show_streams', f"{file}"]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if len(out) > 0:
result = json.loads(out)
else:
result = {}
if err:
print("===err===")
print(err)
if verbose:
print('ffprobe result: ', result)
return result
# generator function to yield single dates from a date range
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days) + 1): # adding 1 to make the range inclusive
yield start_date + datetime.timedelta(n)
if __name__ == "__main__":
main()