__author__ = "Josh"
__description__ = "Connects commands from the user with responses from the mud."
import random, string, re, time, traceback, sys
from lyntin import exported, engine, utils, ansi
from lyntin.modules import modutils
commands = {}
prompt_filter = None
command_response_driver = None
prompted_sessions = []
def prompt_cmd(ses, args, input):
global prompted_sessions
if ses not in prompted_sessions:
prompted_sessions.append(ses)
newprompt = args["prompt"]
newdisplayprompt = args["displayprompt"]
send = args["send"]
global prompt_filter
if prompt_filter == None:
prompt_filter = PromptFilter()
exported.hook_register("mud_filter_hook", prompt_filter, -5)
global command_response_driver
if command_response_driver == None:
command_response_driver = commandResponseDriver()
exported.hook_register("to_mud_hook", command_response_driver.to_mud_hook_callback)
exported.hook_register("mud_response_hook", command_response_driver.mud_response_callback)
command_response_driver.resetSession(ses)
prompt = prompt_filter.prompt
displayprompt = prompt_filter.displayprompt
if not newprompt:
newprompt = prompt
if not displayprompt:
displayprompt = (prompt or "") + " "
if not newdisplayprompt:
newdisplayprompt = displayprompt
if newprompt:
prompt_filter.setprompt(newprompt,newdisplayprompt,send)
exported.write_message("Prompt is %s and is shown as %s" % (newprompt,newdisplayprompt))
else:
exported.write_message("No prompt set")
commands["prompt"] = (prompt_cmd, "prompt:eval=\"!-prompt->\" displayprompt:eval=\"----------\\\\n\" send:boolean=true")
class PromptFilter:
def __init__(self):
self.prompt = None
self.displayprompt = None
self.promptfound = None
self.parseprompt = None
self.text = ""
self.additionalPromptREs = [re.compile(r"More: \d+-\d+\(\d+\) (?:: ){0,1}\[q,b,\<cr\>\]")]
self.eatprompt = 0
return
def setprompt(self, prompt, displayprompt, send):
if send:
exported.lyntin_command("#raw {prompt %s}" % (ansi.filter_ansi(prompt)), internal=1)
if not send:
self.eatprompt = 1
self.prompt = prompt
self.parseprompt = self.prompt + " "
self.displayprompt = displayprompt
self.promptfound = None
self.text = ""
def __call__(self, args):
session = args["session"]
original = args["data"]
text = args["dataadj"]
if session not in prompted_sessions:
return text
if not self.parseprompt:
return text
outputtext = ""
if self.promptfound:
while len(text) > 0 and self.promptfound != self.parseprompt and text[0] == self.parseprompt[len(self.promptfound)]:
self.promptfound = self.promptfound + text[0:1]
text = text[1:]
if self.promptfound == self.parseprompt:
self.promptfound = None
outputtext = outputtext + self.displayprompt
self.promptFound(session,"")
else:
text = self.promptfound + text
self.promptfound = None
while text.find(self.parseprompt) > -1:
promptindex = text.find(self.parseprompt)
self.promptFound(session,text[0:promptindex])
outputtext = outputtext + text[0:promptindex] + self.displayprompt
text = text[promptindex + len(self.parseprompt):]
for testsize in range(max(1,len(self.parseprompt) - len(text)),len(self.parseprompt)):
if text[-(len(self.parseprompt)-testsize):] + self.parseprompt[-testsize:] == self.parseprompt:
text = text[0:-(len(self.parseprompt)-testsize)]
self.promptfound = self.parseprompt[0:-testsize]
if len(text) > 0:
self.textFound(session,text)
outputtext = outputtext + text
return outputtext
def promptFound(self,session,text):
textblocks = [ansi.filter_ansi(self.text + text)]
self.text = ""
for otherre in self.additionalPromptREs:
oldtextblocks = textblocks
textblocks = []
for text in oldtextblocks:
textblocks += otherre.split(text)
for response in textblocks:
if self.eatprompt > 0:
exported.write_message("Eatting a response")
self.eatprompt -= 1
else:
exported.hook_spam("mud_response_hook", {"session": session, "text": response})
return
def textFound(self,session,text):
self.text = self.text + text
return
class toMudHookCallback:
def __init__(self, driver):
self.driver = driver
def __call__(self, args):
self.driver.to_mud_hook(args)
class mudResponseHookCallback:
def __init__(self, driver):
self.driver = driver
def __call__(self, args):
self.driver.mud_response_hook(args)
class commandResponseCollater:
def __init__(self, session):
self.commands = []
self.responses = []
self.sentalready = 0
self.session = session
self.keep = 100
self.purge = 10
self.skewwarningtime = 10
self.skewtime = 0
self.lastwarning = 0
def checkAndSend(self):
while len(self.commands) > self.sentalready and len(self.responses) > self.sentalready:
self.sentalready = self.sentalready + 1
try:
exported.hook_spam("command_response_hook",
{"session": self.session,
"command": self.commands[self.sentalready-1][0],
"tag": self.commands[self.sentalready-1][1],
"response": self.responses[self.sentalready-1]})
except:
traceback.print_exc()
commandskew = len(self.commands) - self.sentalready
responseskew = len(self.responses) - self.sentalready
skew = max(commandskew, responseskew)
if commandskew > 0:
if self.skewtime:
if (time.time() - self.skewtime > self.skewwarningtime) and time.time() - self.lastwarning > 1:
self.lastwarning = time.time()
exported.write_message("Warning: command/response skew (c:%d r:%d) persisting (use #prompt to reset processor)" % (commandskew,responseskew))
else:
self.skewtime = time.time()
self.lastwarning = 0
elif responseskew > 0:
if time.time() - self.lastwarning > 1:
self.lastwarning = time.time()
exported.write_message("Warning: command/response skew (c:%d r:%d) persisting (use #prompt to reset processor)" % (commandskew,responseskew))
else:
self.skewtime = 0
if self.sentalready > self.keep + self.purge:
toremove = self.sentalready - self.keep
self.commands = self.commands[toremove:]
self.responses = self.responses[toremove:]
self.sentalready = self.sentalready - toremove
def to_mud(self, text, tag):
self.commands.append( (text, tag) )
self.checkAndSend()
def mud_response(self, text):
self.responses.append(text)
self.checkAndSend()
class commandResponseDriver:
def __init__(self):
self.collaters = {}
self.to_mud_hook_callback = toMudHookCallback(self)
self.mud_response_callback = mudResponseHookCallback(self)
def to_mud_hook(self, args):
ses = args["session"]
text = args["data"]
tag = args["tag"]
if ses not in prompted_sessions:
return
if not self.collaters.has_key(ses):
self.resetSession(ses)
self.collaters[ses].to_mud(text, tag)
def mud_response_hook(self, args):
ses = args["session"]
text = args["text"]
if ses not in prompted_sessions:
return
if not self.collaters.has_key(ses):
self.resetSession(ses)
self.collaters[ses].mud_response(text)
def resetSession(self, ses):
self.collaters[ses] = commandResponseCollater(ses)
class ProcessingTag:
def __init__(self):
pass
def process(self, session, command, response):
pass
def command_response_processor(args):
ses = args["session"]
command = args["command"]
tag = args["tag"]
response = args["response"]
if tag and hasattr(tag,"process"):
try:
tag.process(ses,command,response)
except:
traceback.print_exc()
def reload():
global prompted_sessions
for ses in prompted_sessions:
sys.modules["commandresponse"].prompted_sessions = prompted_sessions
def load():
exported.hook_register("command_response_hook", command_response_processor)
modutils.load_commands(commands)
def unload():
global prompt_filter, command_response_driver
if prompt_filter:
exported.hook_unregister("mud_filter_hook", prompt_filter)
prompt_filter = None
if command_response_driver:
exported.hook_unregister("to_mud_hook", command_response_driver.to_mud_hook_callback)
exported.hook_unregister("mud_response_hook", command_response_driver.mud_response_callback)
command_response_driver = None
exported.hook_unregister("command_response_hook", command_response_processor)
modutils.unload_commands(commands)