diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..da591de --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ + +notifyker/__pycache__/ + +notifyker/callbackNK/__pycache__/ + +notifyker/notifiers/__pycache__/ + +notifyker/config\.py diff --git a/README.md b/README.md index 56b3251..7ce02e1 100644 --- a/README.md +++ b/README.md @@ -1 +1,18 @@ -NotifyKer +# NotifyKer + +*Callback notifier and manager bot for Keras ML library* + +##### Simple to use: + +Set your TOKEN and PROXY settings in **notifyker/config_default.py** and rename to **notifyker/config.py** + +```python +from notifyker import NotifierTelegram, CallbackSimple + + +nfk = NotifierTelegram() +callback = CallbackSimple(notifier=nfk) + +model.fit(... + callbacks=[callback]) +``` \ No newline at end of file diff --git a/bot.gif b/bot.gif new file mode 100644 index 0000000..00471af Binary files /dev/null and b/bot.gif differ diff --git a/notifyker/__init__.py b/notifyker/__init__.py new file mode 100644 index 0000000..866819b --- /dev/null +++ b/notifyker/__init__.py @@ -0,0 +1,3 @@ +from .notifiers import NotifierTelegram +from .callbackNK import CallbackSimple + diff --git a/notifyker/callbackNK/__init__.py b/notifyker/callbackNK/__init__.py new file mode 100644 index 0000000..beb3ddb --- /dev/null +++ b/notifyker/callbackNK/__init__.py @@ -0,0 +1 @@ +from .callback_simple import CallbackSimple diff --git a/notifyker/callbackNK/callback_base.py b/notifyker/callbackNK/callback_base.py new file mode 100644 index 0000000..09375ae --- /dev/null +++ b/notifyker/callbackNK/callback_base.py @@ -0,0 +1,12 @@ +from keras.callbacks import Callback + + +class CallbackBase(Callback): + def __init__(self, notifier=None, custom_metrics=None): + super().__init__() + + if notifier is not None: + self.notifier = notifier + + if custom_metrics is not None: + self.custom_metrics = custom_metrics diff --git a/notifyker/callbackNK/callback_simple.py b/notifyker/callbackNK/callback_simple.py new file mode 100644 index 0000000..62bd53a --- /dev/null +++ b/notifyker/callbackNK/callback_simple.py @@ -0,0 +1,123 @@ +import time + +from .callback_base import CallbackBase + + +class CallbackSimple(CallbackBase): + def __init__(self, verbose=0, notifier=None, custom_metrics=None): + super().__init__() + + if notifier is not None: + self.notifier = notifier + else: + raise ValueError + + if custom_metrics is not None: + self.custom_metrics = custom_metrics + + self.details = {} + self.starting_time = None + self.batch_update_freq = None + self.current_epoch = 1 + + def on_train_begin(self, logs=None): + self.notifier._connect() + self.notifier.flags_batch = [] + self.notifier.flags_epoch = [] + + for i in self.params: + self.details[i] = self.params[i] + + self.starting_time = time.time() + self.batch_update_freq = max(self.details['samples'] // self.details['batch_size'] // 10, 1) + + message = [] + message.append('\nTraining started in {}\n'.format(self.starting_time)) + + if self.notifier.verbose_value == 0: + self.notifier.message(' \n'.join(message)) + + return + + if self.notifier.verbose_value == 2: + message.append('With the following parameters:') + for i in self.details: + if isinstance(self.details[i], list): + value = ', '.join(self.details[i]) + else: + value = str(self.details[i]) + + message.append('{0:25s}: {1:25s}'.format(i, value)) + + self.notifier.message(' \n'.join(message)) + + def on_train_end(self, logs=None): + if 's' in self.notifier.flags_batch: + tag = 'forcibly' + else: + tag = 'Successfully' + self.notifier.message('Training completed {}'.format(tag)) + self.notifier._close_connect() + + def on_batch_end(self, batch, logs=None): + if self.notifier.flags_batch: + self.flags_handler() + + if self.notifier.verbose_value == 0: + return + + if batch % self.batch_update_freq == 0: + message = [] + + message.append('Epoch {} / {}'.format(self.current_epoch, self.details['epochs'])) + + pad_bar = '[{}{}]'.format('++' * (batch // self.batch_update_freq), '==' * (10 - batch // self.batch_update_freq)) + message.append('{} / {} {}'.format(2 * logs['batch'], self.details['samples'], pad_bar)) + + if self.notifier.verbose_value == 2: + for i in self.details['metrics']: + if 'val_' not in i: + message.append('{:15s}: {:15s}'.format(i, str(logs[i]))) + + ack = self.notifier.message(' \n'.join(message), self.notifier.cache_message_id) + self.notifier.cache_message_id = ack.message_id + + def on_epoch_begin(self, epoch, logs=None): + if self.notifier.flags_epoch: + self.flags_handler() + + self.notifier.cache_message_id = None + + def on_epoch_end(self, epoch, logs=None): + if self.notifier.flags_epoch: + self.flags_handler() + + message = [] + + message.append('Epoch {} / {}'.format(self.current_epoch, self.details['epochs'])) + + pad_bar = '[{}]'.format('++' * 10) + message.append('{} / {} {}'.format(self.details['samples'], self.details['samples'], pad_bar)) + for i in logs: + message.append('{:15s}: {:15s}'.format(i, str(logs[i]))) + + self.notifier._status = ' \n'.join(message) + + if self.notifier.verbose_value != 0: + ack = self.notifier.message(' \n'.join(message), self.notifier.cache_message_id) + self.notifier.cache_message_id = ack.message_id + + self.current_epoch += 1 + + def flags_handler(self): + if 'p' in self.notifier.flags_batch: + self.notifier.flags_batch.remove('p') + + while 'c' not in self.notifier.flags_batch and 's' not in self.notifier.flags_batch: + time.sleep(10) + + if 's' in self.notifier.flags_batch: + self.model.stop_training = True + + if 'c' in self.notifier.flags_batch: + self.notifier.flags_batch.remove('c') diff --git a/notifyker/config_default.py b/notifyker/config_default.py new file mode 100644 index 0000000..8d2ca80 --- /dev/null +++ b/notifyker/config_default.py @@ -0,0 +1,2 @@ +TOKEN = 'XXXX:YYYY' +PROXY = {'proxy_url': 'socks5h://ip:port', 'urllib3_proxy_kwargs': {'username': 'username', 'password': 'password'}} diff --git a/notifyker/notifiers/__init__.py b/notifyker/notifiers/__init__.py new file mode 100644 index 0000000..e02b86d --- /dev/null +++ b/notifyker/notifiers/__init__.py @@ -0,0 +1,2 @@ +from .notifier_telegram_menu import NotifierTelegram +from .notifier_telegram import NotifierTelegram as NotifierTelegramSimple diff --git a/notifyker/notifiers/notifier_base.py b/notifyker/notifiers/notifier_base.py new file mode 100644 index 0000000..4a1f94a --- /dev/null +++ b/notifyker/notifiers/notifier_base.py @@ -0,0 +1,34 @@ +class NotifierBase: + """ + Abstract class for notifiers + """ + + def __init__(self): + """ + Initialize mandatory variables of notifier + """ + self.cache_message_id = None + self.flags_batch = [] + self.flags_epoch = [] + self._status = None + + def status(self): + """ + Status message update + """ + if self._status is None: + text = 'Status undefined. Probably, first epoch is still performing' + else: + text = self._status + + self.message(text) + + def message(self, message, message_id=None): + """ + Abstract method of message sending + Method must be redefined with the return variable ack (can be None, used to edit message of batches) + """ + pass + + def _close_connect(self): + pass diff --git a/notifyker/notifiers/notifier_telegram.py b/notifyker/notifiers/notifier_telegram.py new file mode 100644 index 0000000..a773d3e --- /dev/null +++ b/notifyker/notifiers/notifier_telegram.py @@ -0,0 +1,105 @@ +from telegram.ext import CommandHandler, Updater + +from ..config import TOKEN, PROXY +from .notifier_base import NotifierBase + + +class NotifierTelegram(NotifierBase): + """ + Telegram notifier bot + """ + def __init__(self): + """ + Create handlers and chat id for message edits + """ + super().__init__() + self.active = False + self.cache_message_id = None + self.flags_batch = [] + self.flags_epoch = [] + self._status = None + self.verbose_value = 1 + self.chat_id = None + + self._connect() + + def _connect(self): + if not self.active: + self.updater = Updater(TOKEN, request_kwargs=PROXY) + + self.handlers() + self.updater.start_polling() + + self.active = True + + def message(self, message, message_id=None, reply_markup=None): + """ + Telegram specific method of message sending + """ + if message_id is not None: + ack = self.updater.bot.edit_message_text(chat_id=self.chat_id, text=message, message_id=message_id) + else: + ack = self.updater.bot.send_message(chat_id=self.chat_id, text=message, reply_markup=reply_markup) + + return ack + + def handlers(self): + """ + Method of activation of telegram bot handlers + """ + self.updater.dispatcher.add_handler(CommandHandler('start', self.start)) + self.updater.dispatcher.add_handler(CommandHandler('interrupt', self.interrupt)) + self.updater.dispatcher.add_handler(CommandHandler('help', self._help)) + self.updater.dispatcher.add_handler(CommandHandler('status', self.status)) + self.updater.dispatcher.add_handler(CommandHandler('pause', self.pause)) + self.updater.dispatcher.add_handler(CommandHandler('verbose', self.verbose)) + self.updater.dispatcher.add_handler(CommandHandler('continue', self.cont)) + + def start(self, bot, update): + """ + Method of start message processing required to obtain chat_id + """ + self.chat_id = update.message.chat_id + update.message.reply_text('Hello, my friend') + + def _help(self, bot, update): + """ + Method of help command processing + """ + message = 'Welcome! Enter /start to add your chat_id before you start training\n\ + /help - Show available commands\n\ + /status - Show current training status - epoch, metrics\n\ + /pause - Suspend training process (model still in a memory)\n\ + /continue - Continue training process\n\ + /interrupt - Interrupt training process ATTENTION: You will not be able to continue by this bot\n' + self.message(message) + + def pause(self, bot, update): + """ + Method of pause command processing. Suspend the training process + """ + self.flags_batch.append('p') + self.message('Training suspended. Use /stop or /cont now') + + def cont(self, bot, update): + """ + Method of continue command processing. Continue the training process + """ + self.flags_batch.append('c') + self.message('Training continues') + + def verbose(self, bot, update): + self.message('Current verbose: {}'.format(self.verbose_value)) + + def interrupt(self, bot, update): + """ + Method of stop (training) command processing + """ + self.flags_batch.append('s') + self.message('') + + self.message('Training interrupting...') + + def _close_connect(self): + self.updater.stop() + self.active = False diff --git a/notifyker/notifiers/notifier_telegram_menu.py b/notifyker/notifiers/notifier_telegram_menu.py new file mode 100644 index 0000000..2d0383d --- /dev/null +++ b/notifyker/notifiers/notifier_telegram_menu.py @@ -0,0 +1,219 @@ +from telegram.ext import CommandHandler, ConversationHandler, RegexHandler, Updater +from telegram import ReplyKeyboardMarkup, InlineKeyboardButton, ReplyKeyboardRemove + +from ..config import TOKEN, PROXY +from .notifier_base import NotifierBase + + +class NotifierTelegram(NotifierBase): + """ + Telegram notifier bot + """ + def __init__(self): + """ + Create handlers and chat id for message edits + """ + super().__init__() + self.active = False + self.cache_message_id = None + self.flags_batch = [] + self.flags_epoch = [] + self._status = None + self.verbose_value = 1 + self._connect() + + def _connect(self): + if not self.active: + self.updater = Updater(TOKEN, request_kwargs=PROXY) + + self.handlers() + self.updater.start_polling() + + self.active = True + + def message(self, message, message_id=None, reply_markup=None): + """ + Telegram specific method of message sending + """ + if message_id is not None: + ack = self.updater.bot.edit_message_text(chat_id=self.chat_id, text=message, message_id=message_id) + else: + ack = self.updater.bot.send_message(chat_id=self.chat_id, text=message, reply_markup=reply_markup) + + return ack + + def handlers(self): + """ + Method of activation of telegram bot handlers + """ + self.updater.dispatcher.add_handler(CommandHandler('start', self.start)) + self.updater.dispatcher.add_handler(CommandHandler('interrupt', self.interrupt_handler)) + self.updater.dispatcher.add_handler(CommandHandler('help', self._help)) + self.updater.dispatcher.add_handler(CommandHandler('status', self.status)) + self.updater.dispatcher.add_handler(CommandHandler('pause', self.pause)) + self.updater.dispatcher.add_handler(CommandHandler('continue', self.cont)) + + conv_handler = ConversationHandler( + entry_points=[CommandHandler('menu', self.menu)], + states={ + 0: [RegexHandler('^(Status|Verbose|Pause|Continue|Interrupt)$', self.menu_handler)], + 1: [RegexHandler('^(Unchanged|0|1|2)$', self.verbose_handler)], + 2: [RegexHandler('^(Yes|No)$', self.interrupt_handler)], + }, + fallbacks=[CommandHandler('cancel', self.cancel)]) + + self.updater.dispatcher.add_handler(conv_handler) + + def start(self, bot, update): + """ + Method of start message processing required to obtain chat_id + """ + self.chat_id = update.message.chat_id + update.message.reply_text('Hello, my friend. /menu') + + def _help(self, bot, update): + """ + Method of help command processing + """ + self.chat_id = update.message.chat_id + message = 'Welcome! \n\ +/help - Show available commands\n\ +/menu - Activate keyboard menu with following options:\n\ +Status - Show current training status - epoch, metrics\n\ +Pause - Suspend training process (model still in a memory)\n\ +Continue - Continue training process\n\ +Interrupt - Interrupt training process ATTENTION: You will not be able to continue by this bot\n' + self.message(message) + + def pause(self, bot, update): + """ + Method of pause command processing. Suspend the training process + """ + self.flags_batch.append('p') + self.message('Training suspended. Use Interrupt or Continue now') + + def cont(self, bot, update): + """ + Method of continue command processing. Continue the training process + """ + self.flags_batch.append('c') + self.message('Training continues') + + def cancel(self, bot, update): + self.message('Cancel', reply_markup=ReplyKeyboardRemove()) + + return ConversationHandler.END + + def menu(self, bot, update): + """ + Method of menu command processing. Return the keyboard-menu + """ + menu_keyboard = [ + [InlineKeyboardButton("Status", callback_data='status')], + [InlineKeyboardButton("Verbose", callback_data='verbose')], + [InlineKeyboardButton("Pause", callback_data='pause')], + [InlineKeyboardButton("Continue", callback_data='Continue')], + [InlineKeyboardButton("Interrupt", callback_data='interrupt')], + [InlineKeyboardButton("Cancel", callback_data='cancel')]] + + reply_markup = ReplyKeyboardMarkup(menu_keyboard, one_time_keyboard=False) + + self.message('Menu activated', reply_markup=reply_markup) + + return 0 + + def menu_handler(self, bot, update): + """ + """ + option = update.message.text + + if option == 'Status': + self.status() + + elif option == 'Verbose': + return self.verbose(bot, update) + + elif option == 'Pause': + return self.pause(bot, update) + + elif option == 'Continue': + return self.cont(bot, update) + + elif option == 'Interrupt': + return self.interrupt(bot, update) + + elif option == 'Cancel': + return ConversationHandler.END + + def verbose(self, bot, update): + verbose_list = [i for i in range(3) if i != self.verbose_value] + verbose_keyboard = [ + [InlineKeyboardButton("Unchanged", callback_data='none')] + ] + + for item in verbose_list: + verbose_keyboard.append( + [InlineKeyboardButton("{}".format(item), callback_data='verbose_{}'.format(item))]) + + reply_markup = ReplyKeyboardMarkup(verbose_keyboard) + self.message('Current verbose: {}. Set to: '.format(self.verbose_value), reply_markup=reply_markup) + + return 1 + + def verbose_handler(self, bot, update): + option = update.message.text + if option != 'Unchanged': + self.verbose_value = int(option) + + menu_keyboard = [ + [InlineKeyboardButton("Status", callback_data='status')], + [InlineKeyboardButton("Verbose", callback_data='verbose')], + [InlineKeyboardButton("Pause", callback_data='pause')], + [InlineKeyboardButton("Continue", callback_data='Continue')], + [InlineKeyboardButton("Interrupt", callback_data='interrupt')], + [InlineKeyboardButton("Cancel", callback_data='cancel')]] + + reply_markup = ReplyKeyboardMarkup(menu_keyboard, one_time_keyboard=False) + + self.message('Verbose: {}'.format(self.verbose_value), reply_markup=reply_markup) + + return 0 + + def interrupt(self, bot, update): + """ + Method of stop (training) command processing + """ + menu_keyboard = [ + [InlineKeyboardButton("No", callback_data='y')], + [InlineKeyboardButton("Yes", callback_data='n')]] + + reply_markup = ReplyKeyboardMarkup(menu_keyboard, one_time_keyboard=False) + + self.message('Are you sure you want to interrupt?', reply_markup=reply_markup) + + return 2 + + def interrupt_handler(self, bot, update): + menu_keyboard = [ + [InlineKeyboardButton("Status", callback_data='status')], + [InlineKeyboardButton("Verbose", callback_data='verbose')], + [InlineKeyboardButton("Pause", callback_data='pause')], + [InlineKeyboardButton("Continue", callback_data='Continue')], + [InlineKeyboardButton("Interrupt", callback_data='interrupt')], + [InlineKeyboardButton("Cancel", callback_data='cancel')]] + + reply_markup = ReplyKeyboardMarkup(menu_keyboard, one_time_keyboard=False) + + option = update.message.text + if option == 'Yes': + self.flags_batch.append('s') + self.message('Training interrupting...', reply_markup=ReplyKeyboardRemove()) + else: + self.flags_batch.append('c') + self.message('Training continue', reply_markup=reply_markup) + + return 0 + + def _close_connect(self): + # self.updater.stop() + self.active = False