Module odinson.ruleutils.queryparser

Expand source code
from typing import Text
from pyparsing import *
from odinson.ruleutils import config
from odinson.ruleutils.queryast import *

__all__ = [
    "parse_odinson_query",
    "parse_surface",
    "parse_traversal",
]

# punctuation
comma = Literal(",").suppress()
equals = Literal("=").suppress()
vbar = Literal("|").suppress()
lt = Literal("<").suppress()
gt = Literal(">").suppress()
at = Literal("@").suppress()
ampersand = Literal("&").suppress()
open_curly = Literal("{").suppress()
close_curly = Literal("}").suppress()
open_parens = Literal("(").suppress()
close_parens = Literal(")").suppress()
open_bracket = Literal("[").suppress()
close_bracket = Literal("]").suppress()

# literal values
surface_hole = config.SURFACE_HOLE_GLYPH
traversal_hole = config.TRAVERSAL_HOLE_GLYPH
query_hole = config.QUERY_HOLE_GLYPH
number = Word(nums).setParseAction(lambda t: int(t[0]))
identifier = Word(alphas + "_", alphanums + "_")
single_quoted_string = QuotedString("'", unquoteResults=True, escChar="\\")
double_quoted_string = QuotedString('"', unquoteResults=True, escChar="\\")
quoted_string = single_quoted_string | double_quoted_string
string = identifier | quoted_string

# number to the left of the comma {n,}
quant_range_left = open_curly + number + comma + close_curly
quant_range_left.setParseAction(lambda t: (t[0], None))
# number to the right of the comma {,m}
quant_range_right = open_curly + comma + number + close_curly
quant_range_right.setParseAction(lambda t: (0, t[0]))
# numbers on both sides of the comma {n,m}
quant_range_both = open_curly + number + comma + number + close_curly
quant_range_both.setParseAction(lambda t: (t[0], t[1]))
# no number either side of the comma {,}
quant_range_neither = open_curly + comma + close_curly
quant_range_neither.setParseAction(lambda t: (0, None))
# range {n,m}
quant_range = (
    quant_range_left | quant_range_right | quant_range_both | quant_range_neither
)
# repetition {n}
quant_rep = open_curly + number + close_curly
quant_rep.setParseAction(lambda t: (t[0], t[0]))
# quantifier operator
quant_op = oneOf("? * +")
quant_op.setParseAction(
    lambda t: (0, 1) if t[0] == "?" else (0, None) if t[0] == "*" else (1, None)
)
# any quantifier
quantifier = quant_op | quant_range | quant_rep

# a hole that can take the place of a matcher
hole_matcher = Literal(surface_hole).setParseAction(lambda t: HoleMatcher())
# a matcher that compares tokens to a string (t[0])
exact_matcher = string.setParseAction(lambda t: ExactMatcher(t[0]))
# any matcher
matcher = hole_matcher | exact_matcher

# a hole that can take the place of a token constraint
hole_constraint = Literal(surface_hole).setParseAction(lambda t: HoleConstraint())

# a constraint of the form `f=v` means that only tokens
# that have a field `f` with a corresponding value of `v`
# can be accepted
field_constraint = matcher + equals + matcher
field_constraint.setParseAction(lambda t: FieldConstraint(*t))

# forward declaration, defined below
or_constraint = Forward()

# an expression that represents a single constraint
atomic_constraint = (
    field_constraint | hole_constraint | open_parens + or_constraint + close_parens
)

# a constraint that may or may not be negated
not_constraint = Optional("!") + atomic_constraint
not_constraint.setParseAction(lambda t: NotConstraint(t[1]) if len(t) > 1 else t[0])

# one or two constraints ANDed together
and_constraint = Forward()
and_constraint << (not_constraint + Optional(ampersand + and_constraint))
and_constraint.setParseAction(lambda t: AndConstraint(*t) if len(t) == 2 else t[0])

# one or two constraints ORed together
or_constraint << (and_constraint + Optional(vbar + or_constraint))
or_constraint.setParseAction(lambda t: OrConstraint(*t) if len(t) == 2 else t[0])

# a hole that can take the place of a surface query
hole_surface = Literal(surface_hole).setParseAction(lambda t: HoleSurface())

# a token constraint surrounded by square brackets
token_constraint = open_bracket + or_constraint + close_bracket
token_constraint.setParseAction(lambda t: TokenSurface(t[0]))

# an unconstrained token
token_wildcard = open_bracket + close_bracket
token_wildcard.setParseAction(lambda t: WildcardSurface())

# a token pattern
token_surface = token_wildcard | token_constraint

# forward declaration, defined below
or_surface = Forward()

# an entity or event mention
mention_surface = at + matcher
mention_surface.setParseAction(lambda t: MentionSurface(t[0]))

# an expression that represents a single query
atomic_surface = (
    hole_surface
    | token_surface
    | mention_surface
    | open_parens + or_surface + close_parens
)

# a query with an optional quantifier
repeat_surface = atomic_surface + Optional(quantifier)
repeat_surface.setParseAction(
    lambda t: RepeatSurface(t[0], *t[1]) if len(t) > 1 else t[0]
)

# one or two queries that must match consecutively
concat_surface = Forward()
concat_surface << (repeat_surface + Optional(concat_surface))
concat_surface.setParseAction(lambda t: ConcatSurface(*t) if len(t) == 2 else t[0])

# one or two queries ORed together
or_surface << (concat_surface + Optional(vbar + or_surface))
or_surface.setParseAction(lambda t: OrSurface(*t) if len(t) == 2 else t[0])

# a hole that can take the place of a traversal
hole_traversal = Literal(traversal_hole).setParseAction(lambda t: HoleTraversal())

# labeled incoming edge
incoming_label = lt + matcher
incoming_label.setParseAction(lambda t: IncomingLabelTraversal(t[0]))

# any incoming edge
incoming_wildcard = Literal("<<")
incoming_wildcard.setParseAction(lambda t: IncomingWildcardTraversal())

# an incoming edge
incoming_traversal = incoming_label | incoming_wildcard

# labeled outgoing edge
outgoing_label = gt + matcher
outgoing_label.setParseAction(lambda t: OutgoingLabelTraversal(t[0]))

# any outgoing edge
outgoing_wildcard = Literal(">>")
outgoing_wildcard.setParseAction(lambda t: OutgoingWildcardTraversal())

# an outgoing edge
outgoing_traversal = outgoing_label | outgoing_wildcard

# forward declaration, defined below
or_traversal = Forward()

# an expression that represents a single traversal
atomic_traversal = (
    hole_traversal
    | incoming_traversal
    | outgoing_traversal
    | open_parens + or_traversal + close_parens
)

# a traversal with an optional quantifier
repeat_traversal = atomic_traversal + Optional(quantifier)
repeat_traversal.setParseAction(
    lambda t: RepeatTraversal(t[0], *t[1]) if len(t) > 1 else t[0]
)

# one or two traversals that must match consecutively
concat_traversal = Forward()
concat_traversal << (repeat_traversal + Optional(concat_traversal))
concat_traversal.setParseAction(lambda t: ConcatTraversal(*t) if len(t) == 2 else t[0])

# one or two traversals ORed together
or_traversal << (concat_traversal + Optional(vbar + or_traversal))
or_traversal.setParseAction(lambda t: OrTraversal(*t) if len(t) == 2 else t[0])

# a hole that can take the place of a hybrid query
hole_query = Literal(query_hole).setParseAction(lambda t: HoleQuery())

# forward declaration, defined below
odinson_query = Forward()

# a single surface or a hybrid (surface, traversal, surface)
hybrid_query = Forward()
hybrid_query << (or_surface + Optional(or_traversal + odinson_query))
hybrid_query.setParseAction(lambda t: HybridQuery(*t) if len(t) == 3 else t[0])

odinson_query << (hole_query | hybrid_query)

# the top symbol of our grammar
top = LineStart() + odinson_query + LineEnd()


def parse_odinson_query(pattern: Text) -> AstNode:
    """Gets a string and returns the corresponding AST."""
    return top.parseString(pattern)[0]


def parse_surface(pattern: Text) -> Surface:
    """Gets a string and returns the corresponding surface pattern."""
    return or_surface.parseString(pattern)[0]


def parse_traversal(pattern: Text) -> Traversal:
    """Gets a string and returns the corresponding graph traversal."""
    return or_traversal.parseString(pattern)[0]

Functions

def parse_odinson_query(pattern: str) ‑> AstNode

Gets a string and returns the corresponding AST.

Expand source code
def parse_odinson_query(pattern: Text) -> AstNode:
    """Gets a string and returns the corresponding AST."""
    return top.parseString(pattern)[0]
def parse_surface(pattern: str) ‑> Surface

Gets a string and returns the corresponding surface pattern.

Expand source code
def parse_surface(pattern: Text) -> Surface:
    """Gets a string and returns the corresponding surface pattern."""
    return or_surface.parseString(pattern)[0]
def parse_traversal(pattern: str) ‑> Traversal

Gets a string and returns the corresponding graph traversal.

Expand source code
def parse_traversal(pattern: Text) -> Traversal:
    """Gets a string and returns the corresponding graph traversal."""
    return or_traversal.parseString(pattern)[0]