# This files contains your custom actions which can be used to run # custom Python code. # # See this guide on how to implement these action: # https://rasa.com/docs/rasa/custom-actions # This is a simple example for a custom action which utters "Hello World!" from typing import Any, Text, Dict, List from rasa_sdk import Action, Tracker from rasa_sdk.executor import CollectingDispatcher from flask import request import json class ActionService(Action): def name(self) -> Text: return "action_service" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: Dict[Text, Any]) -> List[Dict[Text, Any]]: buttons=[ {"payload":'/docs{"content_type":"blogs"}',"title":"yes"}, {"payload":'/video{"content_type":"video"}',"title":"no"}, ] dispatcher.utter_message(text="Are you satisfied with above solution",buttons=buttons) return [] class ActionService(Action): def name(self) -> Text: return "action_suport_service" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: Dict[Text, Any]) -> List[Dict[Text, Any]]: buttons=[ {"payload":'/connect_support{"content_type":"text"}',"title":"yes"}, {"payload":'/noconnect_support{"content_type":"text"}',"title":"no"}, ] dispatcher.utter_message(text="do you want me to connect to support team",buttons=buttons) return [] question=[] answer=[] class ActionSaveConversation(Action): def name(self) -> Text: return "action_save_conersation" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: Dict[Text, Any]) -> List[Dict[Text, Any]]: sender_id = tracker.sender_id #print(tracker.events) conversation=tracker.events import os if not os. path.isfile('chats.txt'): with open('chats.txt ','w') as file: file.write("user_input $$$$ bot_reply\n") chat_data='' for i in conversation: if i['event'] == 'user': chat_data+=i['parse_data']['intent']['name'] print('user: {}'.format(i['text'])) a='user{}: {}'.format(sender_id,i['text']) question.append(a) if len(i['parse_data']['entities']) > 0: chat_data+=i['parse_data']['entities'][0]['entity']+', '+i['parse_data']['entities' ][0]['value']+',' print('extra data:', i['parse_data']['entities'][0]['entity'], '=', i['parse_data']['entities'][0]['value']) elif i['event'] == 'bot': print('{}'.format(i['text'])) b='Bot{}: {}'.format(sender_id,i['text']) question.append(b) # try: # chat_data+=i['metadata']['utter_action']+','+i['text']+'\n' # except KeyError: # pass chat_data+=i['text']+'\n' else: with open('chats.txt','a') as file: file.write(chat_data) #print(question) import pandas as pd df=pd.DataFrame(question) print(df) conversation=None question.clear() print(conversation) print(question) print(sender_id) # print(answer) # a=question # b=answer # import numpy as np # import pandas as pd # if len(a) > len(b): # b = np.pad(b, (0, len(a) - len(b)), 'constant') # else: # a = np.pad(a, (0, len(b) - len(a)), 'constant') # df=pd.DataFrame({'question':a,"answer":b}) # print(df) dispatcher.utter_message(text="All Chats saved") return [] class ActionSaveConversation(Action): def name(self) -> Text: return "action_save_conersation" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: Dict[Text, Any]) -> List[Dict[Text, Any]]: # Get sender_id sender_id = tracker.sender_id conversation = tracker.events chat_data = [] for i in conversation: if i['event'] == 'user': text = i['text'].replace('/connect_support{"content_type":"text"}', 'yes').replace('/noconnect_support{"content_type":"text"}','No').replace('/docs{"content_type":"blogs"}','yes').replace('/video{"content_type":"video"}','No') chat_data.append({'sender': f'user_{sender_id}', 'message': text}) print(f'user_{sender_id}: {i["text"]}') if len(i['parse_data']['entities']) > 0: print('extra data:', i['parse_data']['entities'][0]['entity'], '=', i['parse_data']['entities'][0]['value']) elif i['event'] == 'bot': print(f'bot_{sender_id}: {i["text"]}') chat_data.append({'sender': f'bot_{sender_id}', 'message': i['text']}) with open('chats.json', 'w') as file: json.dump({'conversation': chat_data, 'sender_id': sender_id}, file) file.write('\n') file.close() print(json.dumps({'conversation': chat_data, 'sender_id': sender_id}, indent=4)) conversation = None return [] class ActionClearConversation(Action): def name(self) -> Text: return "action_clear_conversation" def run(self, dispatcher: CollectingDispatcher, tracker: Tracker, domain: Dict[Text, Any]) -> List[Dict[Text, Any]]: sender_id = tracker.sender_id tracker.events = [] return []