Source code for eta.core.eta

"""The core executable for the Eta Dialogue Manager.

Running this module will read an agent and user config from eta.config and user_config,
respectively, before spawning the core multiprocess loops that handle interpretation, reasoning,
planning, and execution.

Note that the agent config is provided as a Python module that exports a 'config' function,
returning a dict of config parameters, since transducers and other objects may be created within
the config. On the other hand, the user config is supplied in JSON format.

Dialogue state is maintained entirely through a DialogueState class which is shared between each
core process using a ProcessManager, and using mutex locks to prevent data races on the underlying
dialogue state attributes.

Additionally, a set of buffers (i.e., priority queues) are used for the data being processed
over the dialogue session, which is necessary for:

  1. maintaining synchronicity between the processes in the relevant aspects (e.g., completing a
     series of modifications to the plan before attempting to execute a step).
  2. ensuring that transducers, which may incur some monetary cost to apply (e.g., in the case of
     GPT-based transduction), are only applied after a modification to the relevant data.
"""

import argparse
from importlib import import_module

from multiprocessing import Process
from multiprocessing import Lock
from multiprocessing.managers import BaseManager

from eta.constants import *
from eta.util.general import gentemp, clear_symtab, remove_duplicates, remove_nil, append, variablep, episode_name
import eta.util.file as file
import eta.util.time as time
import eta.util.buffer as buffer
from eta.lf import (equal_prop_p, not_prop_p, and_prop_p, or_prop_p, characterizes_prop_p, expectation_p,
                    from_lisp_dirs, list_to_s_expr)
from eta.discourse import get_prior_words
from eta.memory import MemoryStorage
from eta.schema import SchemaLibrary
from eta.plan import init_plan_from_eventualities

from eta.core.perception import perception_loop
from eta.core.reasoning import reasoning_loop
from eta.core.planning import planning_loop
from eta.core.execution import execution_loop


[docs] class DialogueState(): """A representation of the current dialogue state. Parameters ---------- config_agent : dict The config parameters for the agent. config_user : dict The config parameters for the user. Attributes ---------- id : str A unique ID for this session. config_agent : dict The config parameters for the agent. config_user : dict The config parameters for the user. start_schema : str The schema predicate used to begin the dialogue session. start_time : TimePoint The starting time of this dialogue session. io_path : str The path used to read and write input/output for this session. me : str The name of the agent for this session. you : str The name of the user for this session. output_buffer : list[Utterance] A buffer of output utterances that are accumulated until the system is ready to write them and listen for user input. step_failure_timer : float A POSIX time record used to track time before failing an expected event. quit_conversation : bool Whether to quit the current session. transducers : dict[str, Transducer or list[Transducer]] A dict associating Transducer object(s) with named mapping functions (e.g., 'gist'). embedder : Embedder An embedder object used to compute object embeddings and perform retrieval. schemas : SchemaLibrary A library of dialogue, episode, and object schemas that form the agent's generic knowledge. concept_aliases : None TODO concept_sets : None TODO init_knowledge : list[Eventuality] A list of initial facts in the agent's semantic memory. schema_instances : dict A dict containing all schema instances (keyed on their unique IDs). plan : PlanNode The "currently due" node in the agent's plan, initialized from the `start_schema`. buffers : dict[str, list] The named buffers (i.e., priority queues) for each type of processed data. reference_list : list TODO equality_sets : dict TODO conversation_log : list[DialogueTurn] A list of turns in the dialogue history. memory : MemoryStorage The episodic memory of the agent, initialized from `init_knowledge`. timegraph : None TODO """ def __init__(self, config_agent, config_user): self._lock = Lock() # session variables self.id = gentemp('SESSION') self.config_agent = config_agent self.config_user = config_user self.start_schema = config_agent['start_schema'] if 'start_schema' in config_agent else DEFAULT_START self.start_time = time.TimePoint() self.io_path = IO_PATH + config_agent['agent'] + '/' + config_user['user_id'] + '/' self.log_path = LOG_PATH + self.start_time.format_date() + '/' self.me = config_agent['agent_name'] self.you = config_user['user_name'] self.output_buffer = [] self.step_failure_timer = time.now() self.quit_conversation = False # internal mechanisms self.transducers = self.config_agent.pop('transducers') self.embedder = None if 'embedder' in config_agent: self.embedder = self.config_agent.pop('embedder') # static knowledge self.schemas = SchemaLibrary(self.embedder).from_lisp_dirs(config_agent['schema_dirs']) self.concept_aliases = None # TODO self.concept_sets = None # TODO self.init_knowledge = [] if 'knowledge_dirs' in config_agent: self.init_knowledge = from_lisp_dirs(config_agent['knowledge_dirs']) # dialogue variables if not self.start_schema in self.schemas.dial: raise Exception('Start schema for session not found.') self.schema_instances = {} self.plan = self.init_plan_from_schema(self.start_schema) self.buffers = self._make_buffers() self.reference_list = [] self.equality_sets = {} self.conversation_log = [] importance_threshold = (config_agent['importance_threshold'] if 'importance_threshold' in config_agent else DEFAULT_IMPORTANCE_THRESHOLD) self.memory = MemoryStorage(self.embedder, importance_threshold=importance_threshold) self.add_to_memory(self.init_knowledge, importance=[1. for _ in self.init_knowledge]) self.timegraph = self._make_timegraph() self._create_session_io_files() self._create_session_log_files() # ----------------- # session functions # -----------------
[docs] def get_io_path(self, fname=''): """Get the path for an IO file (if given) or the IO directory for this session.""" return self.io_path + fname
[docs] def get_log_path(self, fname=''): """Get the path for a log file (if given) or the log directory for this session.""" return self.log_path + fname
[docs] def get_perception_servers(self): """Get the registered perception servers for this session.""" return self.config_agent['perception_servers']
[docs] def get_specialist_servers(self): """Get the registered specialist servers for this session.""" return self.config_agent['specialist_servers']
[docs] def get_user_id(self): """Get the user ID for this session.""" return self.config_user['id']
[docs] def get_step_failure_timer(self): """Get the current value of the step failure timer.""" with self._lock: return self.step_failure_timer
[docs] def reset_step_failure_timer(self): """Reset the step failure timer to the current time.""" with self._lock: self.step_failure_timer = time.now()
[docs] def get_quit_conversation(self): """Check whether to quit the conversation.""" with self._lock: return self.quit_conversation
[docs] def set_quit_conversation(self, quit): """Set whether to quit the conversation.""" with self._lock: self.quit_conversation = quit
# ---------------- # schema functions # ----------------
[docs] def is_schema(self, predicate, type=None): """Check whether a given predicate exists in the agent's schema library.""" with self._lock: return self.schemas.is_schema(predicate, type=type)
# -------------- # plan functions # --------------
[docs] def has_plan(self): """Check whether the agent currently has a plan.""" with self._lock: return self.plan is not None
[docs] def get_plan(self): """Get the agent's plan.""" with self._lock: return self.plan
[docs] def set_plan(self, plan): """Set the agent's plan to a new plan node.""" if plan is None: return with self._lock: self.plan = plan
[docs] def do_continue(self): """Check whether to continue with the current dialogue.""" return self.has_plan() and not self.get_quit_conversation()
[docs] def init_plan_from_schema(self, predicate, args=[]): """Initialize a plan from a given schema predicate along with a list of arguments. This instantiates the generic schema as a schema instance, binding variables occurring in the header to the supplied arguments, if any. It then instantiates a plan structure from the episodes list of that schema. Notes ----- TODO: the plan structure created when instantiating a schema is currently "flat" - in the future, we might want to add support for annotating abstraction hierarchies in the schema, in which case these would be added as supersteps to the steps of the plan-nodes that are created. TODO: by default, we assume that the episodes of the schema define sequential steps. However, the episode-relations in the schema should be used to impose a different ordering on the resulting plan. TODO: immediately following schema instantiation, we should also attempt to (a) bind variables in the schema occurring within the :types section to possible values in the dialogue context, as well as inferring the facts from the other schema sections, i.e., adding them to the context. """ with self._lock: if predicate not in self.schemas.dial: raise Exception(f'Attempting to instantiate a dialogue schema, {predicate}, that does not exist.') schema = self.schemas.dial[predicate] if not schema.get_section('episodes'): raise Exception(f'Attempting to initialize a plan from a schema, {predicate}, that has no episodes.') schema_instance = schema.instantiate(args) self.schema_instances[schema_instance.id] = schema_instance return init_plan_from_eventualities(schema_instance.get_section('episodes'), schema=schema_instance)
[docs] def advance_plan(self): """Advance the plan to the next step (or signal to quit the conversation if none exists).""" with self._lock: if self.plan is not None and self.plan.next: self.plan = self.plan.next else: self.quit_conversation = True
[docs] def instantiate_curr_step(self): """Instantiate the current plan step. This binds the episode variable for the current plan step with a newly created episode constant throughout the dialogue state. The event is also added to context, unless it is an expectation step, in which case it would have already been matched to an event in context. This process recurs for any superstep such that the current step "completes" that superstep. """ def instantiate_step_recur(step): event = step.event ep_var = event.get_ep() if not variablep(ep_var): return [] substeps = step.substeps # Only instantiate if no substeps, or all substeps have already been instantiated if all([not variablep(substep.event.get_ep()) for substep in substeps]): # If single substep, share same episode name if substeps and len(substeps) == 1: ep = substeps[0].event.get_ep() ep = episode_name() if variablep(ep) else ep else: ep = episode_name() event.bind(ep_var, ep) self.bind(ep_var, ep) if substeps or not expectation_p(event): self.add_to_context(event) # Recur for supersteps return [instantiate_step_recur(superstep) for superstep in step.supersteps] instantiate_step_recur(self.plan.step) return self.plan.step
# ---------------- # buffer functions # ----------------
[docs] def add_to_buffer(self, x, type): """Add an element to the buffer of the given type.""" if x is None: return with self._lock: buffer.enqueue(x, self.buffers[type])
[docs] def add_to_buffer_if_empty(self, x, type): """Add an element to the buffer of the given type iff that buffer is currently empty.""" if x is None: return with self._lock: if buffer.is_empty(self.buffers[type]): buffer.enqueue(x, self.buffers[type])
[docs] def add_all_to_buffer(self, xs, type): """Add all elements in a list to the buffer of the given type.""" with self._lock: buffer.enqueue_ordered(xs, self.buffers[type])
[docs] def replace_buffer(self, x, type): """Replace the buffer of the given type with a single element.""" if x is None: return with self._lock: buffer.pop_all(self.buffers[type]) buffer.enqueue(x, self.buffers[type])
[docs] def replace_all_buffer(self, xs, type): """Replace the buffer of the given type with a list of elements.""" with self._lock: buffer.pop_all(self.buffers[type]) buffer.enqueue_ordered(xs, self.buffers[type])
[docs] def get_buffer(self, type): """Get the buffer of the given type.""" with self._lock: return buffer.iterate(self.buffers[type])
[docs] def buffer_empty(self, type): """Check whether the buffer of the given type is empty.""" with self._lock: return buffer.is_empty(self.buffers[type])
[docs] def pop_buffer(self, type): """Pop an element from the buffer of the given type.""" with self._lock: return buffer.pop_item(self.buffers[type])
[docs] def pop_all_buffer(self, type): """Pop all elements from the buffer of the given type.""" with self._lock: return buffer.pop_all(self.buffers[type])
# ------------------------ # reference list functions # ------------------------ # TODO # ---------------------- # equality set functions # ---------------------- # TODO # -------------------------- # conversation log functions # --------------------------
[docs] def get_conversation_log(self): """Get the current conversation history.""" with self._lock: return self.conversation_log
[docs] def log_turn(self, turn): """Log and write a given DialogueTurn in the conversation log.""" with self._lock: self.conversation_log.append(turn) self._write_turn(turn)
# ---------------- # memory functions # ----------------
[docs] def add_to_memory(self, fact, importance=DEFAULT_IMPORTANCE): """Add a fact to the memory.""" with self._lock: self.memory.instantiate(fact, importance=importance, context=False)
[docs] def add_to_context(self, fact, importance=DEFAULT_IMPORTANCE): """Add a fact to the context.""" with self._lock: self.memory.instantiate(fact, importance=importance)
[docs] def remove_from_context(self, fact): """Remove a fact from the context.""" with self._lock: self.memory.remove_matching_from_context(fact)
[docs] def access_from_context(self, pred_patt): """Access facts from context matching a given predicate pattern.""" with self._lock: return self.memory.get_from_context(pred_patt, access=True)
[docs] def flush_context(self): """Flush the dialogue context of "instantaneous" events.""" with self._lock: self.memory.flush_context()
[docs] def get_memory(self): """Get the memory storage object.""" with self._lock: return self.memory
[docs] def eval_truth_value(self, wff): """Evaluate the truth value of a wff in memory/context.""" with self._lock: def eval_truth_value_recur(wff): # (wff1 = wff2) if equal_prop_p(wff): return wff[0] == wff[2] # (not wff1) elif not_prop_p(wff): return not eval_truth_value_recur(wff[1]) # (wff1 and wff2) elif and_prop_p(wff): return eval_truth_value_recur(wff[0]) and eval_truth_value_recur(wff[2]) # (wff1 or wff2) elif or_prop_p(wff): return eval_truth_value_recur(wff[0]) or eval_truth_value_recur(wff[2]) # (wff1 ** e) elif characterizes_prop_p(wff): return self.memory.does_characterize_episode(wff[0], wff[2]) # Otherwise, check to see if wff is true in context return True if self.memory.get_from_context(wff) else False return eval_truth_value_recur(wff)
# ------------------- # timegraph functions # ------------------- # TODO # -------------------- # transducer functions # --------------------
[docs] def apply_transducer(self, type, *args): """Apply the transducer(s) of the given type to a list of arguments.""" with self._lock: if type not in self.transducers: return [] if isinstance(self.transducers[type], list): return remove_nil(remove_duplicates(append([t(*args) for t in self.transducers[type]]), order=True)) else: return self.transducers[type](*args)
# --------------- # other functions # ---------------
[docs] def bind(self, var, val): """Bind the given variable symbol to the given value throughout the dialogue state.""" with self._lock: if self.plan: self.plan.bind(var, val)
[docs] def unbind(self, var): """Unbind the given variable symbol throughout the dialogue state.""" with self._lock: if self.plan: self.plan.unbind(var)
[docs] def cost(self): """Compute the accumulated (monetary) cost of each transducer for this session.""" cost = 0. with self._lock: for t in self.transducers.values(): if isinstance(t, list): cost += sum([t1.cost() for t1 in t]) else: cost += t.cost() return cost
[docs] def retrieve_facts(self, query=None, n_schema=1, n_schema_facts=3, n_memory=3): """Retrieve and combine facts from the current dialogue schema, relevant episode schemas, and memory. The facts are divided into two subsets for facts that are "backgrounded" and those that are "foregrounded", which may be leveraged by downstream tasks (e.g., in generation, we may assume that foregrounded facts should be used in the generated response in some way, whereas backgrounded facts may condition the generation without being directly used). Currently, it is assumed that background facts correspond to the current dialogue schema conditions, whereas foregrounded facts are retrieved from relevant episode schemas and memory. Parameters ---------- query : str, optional The query string to use for retrieval. If not given, the previous user turn will be used. n_schema : int, default=1 The number of schemas to retrieve. n_schema_facts : int, default=3 The number of facts to use from the retrieved schemas (in addition to schema headers). n_memories : int, default=3 The number of facts to use from memory. Returns ------- facts_bg : list[Eventuality] The retrieved background facts. facts_fg : list[Eventuality] The retrieved foreground facts. """ with self._lock: facts_bg = [] facts_fg = [] for sec in ['rigid-conds', 'static-conds', 'preconds', 'goals']: facts_bg += append([schema.get_section(sec) for schema in self.plan.get_schemas()]) if not query: query = get_prior_words(self.conversation_log, YOU) facts_fg += self.schemas.retrieve_knowledge('epi', query=query, m=n_schema, n=n_schema_facts) facts_fg += [m.event for m in self.memory.retrieve(query=query, n=n_memory)] return facts_bg, facts_fg
[docs] def write_output_buffer(self): """Write the output buffer (a list of Utterances) to output files.""" with self._lock: if not self.output_buffer: return output = ' '.join([utt.words for utt in self.output_buffer]) affects = [utt.affect for utt in self.output_buffer if utt.affect != 'neutral'] affect = affects[0] if affects else 'neutral' file.write_file(self.get_io_path('turn-output.txt'), output) file.write_file(self.get_io_path('turn-affect.txt'), affect) self.output_buffer = []
[docs] def push_output_buffer(self, utt): """Push an utterance onto the output buffer.""" with self._lock: self.output_buffer.append(utt)
[docs] def print_schema_instances(self, no_bind=False): """Print all current schema instances.""" for schema in self.schema_instances.values(): print(schema.format(no_bind))
# ---------------- # helper functions # ---------------- def _make_buffers(self): return { 'observations' : [], 'inferences' : [], 'actions' : [], 'plans' : [] } def _make_timegraph(self): return None def _create_session_io_files(self): file.ensure_dir_exists(self.get_io_path()) file.ensure_dir_exists(self.get_io_path(IO_IN_DIR)) file.ensure_dir_exists(self.get_io_path(IO_OUT_DIR)) file.ensure_dir_exists(self.get_io_path(IO_CLOG_DIR)) for system in self.get_perception_servers()+self.get_specialist_servers(): file.ensure_file_exists(self.get_io_path(f'{IO_IN_DIR}{system}.txt')) file.ensure_file_exists(self.get_io_path(f'{IO_OUT_DIR}{system}.txt')) file.ensure_file_exists(self.get_io_path('turn-output.txt')) file.ensure_file_exists(self.get_io_path('turn-affect.txt')) for fname in CLOG_FILES: file.ensure_file_exists(self.get_io_path(f'{IO_CLOG_DIR}{fname}.txt')) def _create_session_log_files(self): file.ensure_dir_exists(self.get_log_path()) for fname in CLOG_FILES: file.ensure_file_exists(self.get_log_path(f'{fname}.txt')) file.write_json(self.get_log_path('config-agent.json'), self.config_agent, pretty=True) file.write_json(self.get_log_path('config-user.json'), self.config_user, pretty=True) def _write_turn(self, turn): text = turn.utterance.words affect = turn.utterance.affect gist = " ".join(['"'+list_to_s_expr(t)+'"' for t in turn.gists]) if turn.gists else 'NIL' semantics = " ".join([list_to_s_expr(t) for t in turn.semantics]) if turn.semantics else 'NIL' pragmatics = " ".join([list_to_s_expr(t) for t in turn.pragmatics]) if turn.pragmatics else 'NIL' obligations = " ".join([list_to_s_expr(t) for t in turn.obligations]) if turn.obligations else 'NIL' plan_step = self.plan.step step = self.plan.step.format(schemas=True) outputs = { 'text' : f'{turn.agent} : {text}\n', 'affect' : f'{turn.agent} : {affect}\n', 'gist' : f'{turn.agent} : {gist}\n', 'semantic' : f'{turn.agent} : {semantics}\n', 'pragmatic' : f'{pragmatics}\n', 'obligations' : f'{obligations}\n', 'step' : f'{step}\n' } for fname in CLOG_FILES: file.append_file(self.get_io_path(f'{IO_CLOG_DIR}{fname}.txt'), outputs[fname]) file.append_file(self.get_log_path(f'{fname}.txt'), outputs[fname])
[docs] class ProcessManager(BaseManager): """Manager to handle multiprocessing.""" pass
[docs] def eta(config_agent, config_user): """Initialize the dialogue state for a new session and spawn each core process. Parameters ---------- config_agent : dict A dict of config parameters for the agent. config_user : dict A dict of config parameters for the user. """ ProcessManager.register('DialogueState', DialogueState) with ProcessManager() as manager: ds = manager.DialogueState(config_agent, config_user) perception = Process(target=perception_loop, args=(ds,)) reasoning = Process(target=reasoning_loop, args=(ds,)) planning = Process(target=planning_loop, args=(ds,)) execution = Process(target=execution_loop, args=(ds,)) perception.start() reasoning.start() planning.start() execution.start() perception.join() reasoning.join() planning.join() execution.join() # Write any remaining output ds.write_output_buffer() print(ds.get_memory()) print() print(ds.get_plan()) print(f'total cost of session: ${ds.cost()}')
[docs] def main(agent_config_name, user_config_name): """Clear the symbol table, read the agent and user configs, and start Eta.""" clear_symtab() agent_config = import_module(f'eta.config.{agent_config_name}').config() user_config = file.load_json(f'user_config/{user_config_name}.json') eta(agent_config, user_config)
if __name__ == "__main__": parser = argparse.ArgumentParser( prog='eta', description='Starts the Eta dialogue manager') parser.add_argument('--agent', type=str, default='sophie_offline', help='The name of an agent config in eta.config') parser.add_argument('--user', type=str, default='test', help='The name of a user config in ./user_config/') args = parser.parse_args() main(args.agent, args.user)