main.mx raw

   1  package main
   2  
   3  import (
   4  	"fmt"
   5  	"net"
   6  	"os"
   7  	"time"
   8  
   9  	"git.smesh.lol/iskra"
  10  	"git.smesh.lol/iskradb/lattice"
  11  	"moxie"
  12  )
  13  
  14  const (
  15  	defaultTCP  = ":4847"
  16  	defaultUnix = "/tmp/iskra.sock"
  17  	flushEvery  = 64
  18  )
  19  
  20  func main() {
  21  	if len(os.Args) < 2 {
  22  		fmt.Fprintf(os.Stderr(), "usage: iskra-server <lattice-path> [tcp-addr] [unix-path]\n")
  23  		os.Exit(1)
  24  	}
  25  	path := os.Args[1]
  26  	tcpAddr := defaultTCP
  27  	unixPath := defaultUnix
  28  	if len(os.Args) > 2 {
  29  		tcpAddr = os.Args[2]
  30  	}
  31  	if len(os.Args) > 3 {
  32  		unixPath = os.Args[3]
  33  	}
  34  
  35  	tree, err := loadTree(path)
  36  	if err != nil {
  37  		fmt.Fprintf(os.Stderr(), "load: %v\n", err)
  38  		os.Exit(1)
  39  	}
  40  	fmt.Fprintf(os.Stderr(), "loaded: %d records, %d bigram entries\n",
  41  		tree.EntryCount(), len(tree.BigramIdx))
  42  
  43  	// Spawn network domains - one per listener.
  44  	// Each handles connections sequentially (one active at a time per listener).
  45  	// Channels carry raw frame bytes across the spawn boundary.
  46  	tcpReq := chan moxie.Bytes{}
  47  	tcpResp := chan moxie.Bytes{}
  48  	unixReq := chan moxie.Bytes{}
  49  	unixResp := chan moxie.Bytes{}
  50  
  51  	spawn(netDomain, moxie.Bytes([]byte(tcpAddr)), moxie.Bytes([]byte("tcp")), tcpReq, tcpResp)
  52  	spawn(netDomain, moxie.Bytes([]byte(unixPath)), moxie.Bytes([]byte("unix")), unixReq, unixResp)
  53  
  54  	fmt.Fprintf(os.Stderr(), "listening: tcp=%s unix=%s\n", tcpAddr, unixPath)
  55  
  56  	writesSinceFlush := 0
  57  	lastFlush := time.Now()
  58  
  59  	for {
  60  		select {
  61  		case frame := <-tcpReq:
  62  			resp := dispatch(tree, []byte(frame), &writesSinceFlush)
  63  			tcpResp <- moxie.Bytes(resp)
  64  		case frame := <-unixReq:
  65  			resp := dispatch(tree, []byte(frame), &writesSinceFlush)
  66  			unixResp <- moxie.Bytes(resp)
  67  		}
  68  		if writesSinceFlush >= flushEvery || (writesSinceFlush > 0 && time.Since(lastFlush) > 5*time.Second) {
  69  			flushTree(tree, path)
  70  			writesSinceFlush = 0
  71  			lastFlush = time.Now()
  72  		}
  73  	}
  74  }
  75  
  76  // netDomain is spawned per listener. It owns the socket, accepts connections,
  77  // and services them sequentially. Each request frame is sent to the main domain
  78  // via the req channel; the response comes back on resp.
  79  func netDomain(addr, network moxie.Bytes, req chan moxie.Bytes, resp chan moxie.Bytes) {
  80  	addrStr := string([]byte(addr))
  81  	netStr := string([]byte(network))
  82  
  83  	if netStr == "unix" {
  84  		os.Remove(addrStr)
  85  	}
  86  
  87  	ln, err := net.Listen(netStr, addrStr)
  88  	if err != nil {
  89  		fmt.Fprintf(os.Stderr(), "net: listen %s %s: %v\n", netStr, addrStr, err)
  90  		return
  91  	}
  92  
  93  	for {
  94  		conn, err := ln.Accept()
  95  		if err != nil {
  96  			continue
  97  		}
  98  		serviceConn(conn, req, resp)
  99  		conn.Close()
 100  	}
 101  }
 102  
 103  // serviceConn reads request frames from conn, forwards to main domain via req/resp
 104  // channels, and writes responses back. Returns when the connection errors or closes.
 105  func serviceConn(conn net.Conn, req chan moxie.Bytes, resp chan moxie.Bytes) {
 106  	for {
 107  		frame, err := iskra.ReadFrame(conn)
 108  		if err != nil {
 109  			return
 110  		}
 111  		if len(frame) == 0 {
 112  			return
 113  		}
 114  		req <- moxie.Bytes(frame)
 115  		response := <-resp
 116  		if err := iskra.WriteFrame(conn, []byte(response)); err != nil {
 117  			return
 118  		}
 119  	}
 120  }
 121  
 122  func dispatch(tree *iskra.Tree, frame []byte, writes *int32) []byte {
 123  	if len(frame) < 1 {
 124  		return iskra.EncodeResponse(iskra.StatusError, []byte("empty"))
 125  	}
 126  	op := frame[0]
 127  	switch op {
 128  	case iskra.OpBigramWeight:
 129  		return handleBigramWeight(tree, frame[1:])
 130  	case iskra.OpBigramWeightRelaxed:
 131  		return handleBigramWeightRelaxed(tree, frame[1:])
 132  	case iskra.OpIngestBigram:
 133  		*writes++
 134  		return handleIngestBigram(tree, frame[1:])
 135  	case iskra.OpIngestText:
 136  		*writes++
 137  		return handleIngestText(tree, frame[1:])
 138  	case iskra.OpWalkStep:
 139  		return handleWalkStep(tree, frame[1:])
 140  	case iskra.OpTranslate:
 141  		return handleTranslate(tree, frame[1:])
 142  	case iskra.OpBeamRun:
 143  		return handleBeamRun(tree, frame[1:])
 144  	case iskra.OpFlush:
 145  		*writes = 0
 146  		return iskra.EncodeResponse(iskra.StatusOK, nil)
 147  	}
 148  	return iskra.EncodeResponse(iskra.StatusError, []byte("unknown op"))
 149  }
 150  
 151  func handleBigramWeight(tree *iskra.Tree, p []byte) []byte {
 152  	if len(p) < 9 {
 153  		return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
 154  	}
 155  	domain := p[0]
 156  	coord, _ := iskra.DecodeUint64(p, 1)
 157  	prev, n1 := iskra.DecodeString(p, 9)
 158  	next, _ := iskra.DecodeString(p, 9+n1)
 159  	w := iskra.BigramWeight(tree, domain, coord, prev, next)
 160  	return iskra.EncodeUint32Response(w)
 161  }
 162  
 163  func handleBigramWeightRelaxed(tree *iskra.Tree, p []byte) []byte {
 164  	if len(p) < 9 {
 165  		return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
 166  	}
 167  	domain := p[0]
 168  	coord, _ := iskra.DecodeUint64(p, 1)
 169  	prev, n1 := iskra.DecodeString(p, 9)
 170  	next, _ := iskra.DecodeString(p, 9+n1)
 171  	w, matched := iskra.BigramWeightRelaxed(tree, domain, coord, prev, next)
 172  	return iskra.EncodeWeightRelaxedResponse(w, matched)
 173  }
 174  
 175  func handleIngestBigram(tree *iskra.Tree, p []byte) []byte {
 176  	if len(p) < 9 {
 177  		return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
 178  	}
 179  	domain := p[0]
 180  	coord, _ := iskra.DecodeUint64(p, 1)
 181  	prev, n1 := iskra.DecodeString(p, 9)
 182  	next, _ := iskra.DecodeString(p, 9+n1)
 183  	ri := iskra.IngestBigram(tree, domain, coord, prev, next)
 184  	return iskra.EncodeUint32Response(ri)
 185  }
 186  
 187  func handleIngestText(tree *iskra.Tree, p []byte) []byte {
 188  	if len(p) < 11 {
 189  		return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
 190  	}
 191  	domain := p[0]
 192  	coord, _ := iskra.DecodeUint64(p, 1)
 193  	count := int32(uint16(p[9]) | uint16(p[10])<<8)
 194  	off := 11
 195  	tokens := []string{:0:count}
 196  	for i := 0; i < count; i++ {
 197  		s, n := iskra.DecodeString(p, off)
 198  		if n == 0 {
 199  			break
 200  		}
 201  		tokens = append(tokens, s)
 202  		off += n
 203  	}
 204  	iskra.IngestText(tree, domain, coord, tokens)
 205  	return iskra.EncodeResponse(iskra.StatusOK, nil)
 206  }
 207  
 208  func handleWalkStep(tree *iskra.Tree, p []byte) []byte {
 209  	if len(p) < 9 {
 210  		return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
 211  	}
 212  	domain := p[0]
 213  	coord, _ := iskra.DecodeUint64(p, 1)
 214  	word, n1 := iskra.DecodeString(p, 9)
 215  	off := 9 + n1
 216  	maxCandidates := 10
 217  	if off+2 <= len(p) {
 218  		maxCandidates = int32(uint16(p[off]) | uint16(p[off+1])<<8)
 219  	}
 220  	state := iskra.WalkState{
 221  		Domain: domain,
 222  		Coord:  coord,
 223  		Word:   word,
 224  	}
 225  	candidates := iskra.WalkStep(tree, state, maxCandidates)
 226  	return iskra.EncodeWalkCandidatesResponse(candidates)
 227  }
 228  
 229  func handleTranslate(tree *iskra.Tree, p []byte) []byte {
 230  	if len(p) < 10 {
 231  		return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
 232  	}
 233  	srcDomain := p[0]
 234  	dstDomain := p[1]
 235  	coord, _ := iskra.DecodeUint64(p, 2)
 236  	word, _ := iskra.DecodeString(p, 10)
 237  	src := iskra.NewSubLattice(tree.DB(), tree.Pool(), srcDomain, tree.Reg)
 238  	dst := iskra.NewSubLattice(tree.DB(), tree.Pool(), dstDomain, tree.Reg)
 239  	result := iskra.Translate(src, dst, word, coord)
 240  	if result == "" {
 241  		return iskra.EncodeResponse(iskra.StatusNotFound, nil)
 242  	}
 243  	return iskra.EncodeStringResponse(result)
 244  }
 245  
 246  func handleBeamRun(tree *iskra.Tree, p []byte) []byte {
 247  	if len(p) < 9 {
 248  		return iskra.EncodeResponse(iskra.StatusError, []byte("short"))
 249  	}
 250  	domain := p[0]
 251  	coord, _ := iskra.DecodeUint64(p, 1)
 252  	word, n1 := iskra.DecodeString(p, 9)
 253  	off := 9 + n1
 254  	width := 4
 255  	maxSteps := 5
 256  	kind := iskra.WalkTranslate
 257  	if off+5 <= len(p) {
 258  		width = int32(uint16(p[off]) | uint16(p[off+1])<<8)
 259  		maxSteps = int32(uint16(p[off+2]) | uint16(p[off+3])<<8)
 260  		kind = iskra.WalkKind(p[off+4])
 261  	}
 262  	initial := iskra.WalkState{
 263  		Domain: domain,
 264  		Coord:  coord,
 265  		Word:   word,
 266  	}
 267  	beam := iskra.NewBeam(initial, width, kind)
 268  	result := beam.Run(tree, maxSteps)
 269  	return iskra.EncodeBeamResponse(result)
 270  }
 271  
 272  func loadTree(path string) (*iskra.Tree, error) {
 273  	tree, err := iskra.StorageOpen(path)
 274  	if err != nil {
 275  		db := lattice.NewTree(1024)
 276  		tree = iskra.NewTree(db)
 277  	}
 278  	return tree, nil
 279  }
 280  
 281  func flushTree(tree *iskra.Tree, path string) {
 282  	err := iskra.StorageFlushPath(tree, path)
 283  	if err != nil {
 284  		fmt.Fprintf(os.Stderr(), "flush: %v\n", err)
 285  	}
 286  }
 287