server transactions: channel based sync for each account
[epoint] / pkg / server / server.go
1 package server
2
3 // main transfer logic
4
5 import (
6         "bytes"
7         "crypto/openpgp"
8         "epoint/document"
9         "epoint/key"
10         "epoint/store"
11         "fmt"
12         "time"
13 )
14
15 // TODO: do in docs?
16 const IntLimit = 1e15
17
18 // transaction: either draft or debit cert
19 type work struct {
20         next    *work
21         docid   string
22         account string
23         in      []byte
24         out     []byte
25         err     error
26         signed  *document.Signed
27         draft   *document.Draft
28         debit   *document.DebitCert
29         sync    chan int
30 }
31
32 var db *store.Conn
33 var serverkey *openpgp.Entity
34 var newchan chan *work
35 var delchan chan string
36
37 type worklist struct {
38         head *work
39         tail *work
40 }
41
42 func pushwork(ws *worklist, w *work) {
43         if ws.tail == nil {
44                 ws.head = w
45                 ws.tail = w
46         } else {
47                 ws.tail.next = w
48                 ws.tail = w
49         }
50 }
51
52 func popwork(ws *worklist) *work {
53         w := ws.head
54         if w == nil {
55                 return nil
56         }
57         ws.head = w.next
58         if ws.head == nil {
59                 ws.tail = nil
60         }
61         return w
62 }
63
64 func storekey() (err error) {
65         b := new(bytes.Buffer)
66         err = serverkey.Serialize(b)
67         if err != nil {
68                 return
69         }
70         err = db.Set("key", key.Id(serverkey), b.Bytes())
71         if err != nil {
72                 return
73         }
74         err = db.Set("", "serverkey", b.Bytes())
75         return
76 }
77
78 func GetKeys(fpr string) (es openpgp.EntityList, err error) {
79         b, err := db.Get("key", fpr)
80         if err != nil {
81                 return
82         }
83         es, err = openpgp.ReadKeyRing(bytes.NewBuffer(b))
84         if err != nil {
85                 // internal error: pubkey cannot be parsed
86                 return
87         }
88         return
89 }
90
91 func AddKeys(d []byte) (err error) {
92         entities, err := openpgp.ReadArmoredKeyRing(bytes.NewBuffer(d))
93         if err != nil {
94                 return
95         }
96         // TODO: allow multiple key uploads at once?
97         if len(entities) > 100 {
98                 err = fmt.Errorf("expected at most 100 keys; got %d", len(entities))
99                 return
100         }
101         for _, e := range entities {
102                 // TODO: various checks..
103                 // TODO: collect errors instead of aborting addkeys
104                 isIssuer, issuer, denom, err1 := key.Check(e)
105                 err = err1
106                 if err != nil {
107                         return
108                 }
109                 if !isIssuer {
110                         es, err1 := GetKeys(issuer)
111                         err = err1
112                         if err != nil {
113                                 return
114                         }
115                         ok, _, den, err1 := key.Check(es[0])
116                         err = err1
117                         if err != nil {
118                                 // internal error
119                                 return
120                         }
121                         if !ok || den != denom {
122                                 err = fmt.Errorf("Issuer key check failed")
123                                 return
124                         }
125                 }
126                 b := new(bytes.Buffer)
127                 err = e.Serialize(b)
128                 if err != nil {
129                         return
130                 }
131                 fpr := key.Id(e)
132                 err = db.Insert("key", fpr, b.Bytes())
133                 if err != nil {
134                         return
135                 }
136                 err = db.Append("keysby/64", fpr[len(fpr)-16:], []byte(fpr))
137                 if err != nil {
138                         return
139                 }
140                 err = db.Append("keysby/32", fpr[len(fpr)-8:], []byte(fpr))
141                 if err != nil {
142                         return
143                 }
144         }
145         return
146 }
147
148 // Get cert through the named store
149 func GetCert(name, id string) (d []byte, err error) {
150         certid, err := db.Get(name, id)
151         if err != nil {
152                 // ok if notfound
153                 return
154         }
155         d, err = db.Get("cert", string(certid))
156         if err != nil {
157                 // internal error: cert is not available
158                 return
159         }
160         return
161 }
162
163 func EvalDraft(d []byte) (c []byte, err error) {
164         iv, signed, err := document.Parse(d)
165         if err != nil {
166                 return
167         }
168         draft, ok := iv.(*document.Draft)
169         if !ok {
170                 err = fmt.Errorf("EvalDraft: expected a draft document")
171                 return
172         }
173         k, err := db.Get("key", draft.Drawer)
174         if err != nil {
175                 return
176         }
177         // TODO: key.Parse
178         kr, err := openpgp.ReadKeyRing(bytes.NewBuffer(k))
179         if err != nil {
180                 // internal error: pubkey cannot be parsed
181                 return
182         }
183         err = document.Verify(signed, kr)
184         if err != nil {
185                 return
186         }
187         //      _, issuer, denom, err := key.Check(kr[0])
188         //      if err != nil {
189         //              return
190         //      }
191         // TODO: do various format checks (AuthorizedBy check etc)
192         if draft.Amount <= 0 || draft.Amount >= IntLimit {
193                 err = fmt.Errorf("draft amount is invalid: %d", draft.Amount)
194                 return
195         }
196         k, err = db.Get("key", draft.Beneficiary)
197         if err != nil {
198                 return
199         }
200         kr, err = openpgp.ReadKeyRing(bytes.NewBuffer(k))
201         if err != nil {
202                 // internal error: pubkey cannot be parsed
203                 return
204         }
205         return addDraft(d, signed, draft)
206 }
207
208 func EvalDebitCert(d []byte) (c []byte, err error) {
209         iv, signed, err := document.Parse(d)
210         if err != nil {
211                 return
212         }
213         cert, ok := iv.(*document.DebitCert)
214         if !ok {
215                 err = fmt.Errorf("ParseDebitCert: expected a debit docuent")
216                 return
217         }
218
219         kr := openpgp.EntityList{serverkey}
220         if key.Id(serverkey) != cert.AuthorizedBy {
221                 // TODO: ...
222                 k := []byte(nil)
223                 k, err = db.Get("key", cert.AuthorizedBy)
224                 if err != nil {
225                         return
226                 }
227                 kr, err = openpgp.ReadKeyRing(bytes.NewBuffer(k))
228                 if err != nil {
229                         // internal error: pubkey cannot be parsed
230                         return
231                 }
232         }
233         // must clean up to make sure the hash is ok
234         err = document.Verify(signed, kr)
235         if err != nil {
236                 return
237         }
238         return addDebit(d, signed, cert)
239 }
240
241 func addDraft(d []byte, signed *document.Signed, draft *document.Draft) (c []byte, err error) {
242         w := new(work)
243         w.docid = document.Id(signed)
244         w.account = fmt.Sprintf("%s.%s", draft.Drawer, draft.Issuer)
245         w.in = d
246         w.signed = signed
247         w.draft = draft
248         w.sync = make(chan int)
249         newchan <- w
250         <-w.sync
251         return w.out, w.err
252 }
253
254 func addDebit(d []byte, signed *document.Signed, cert *document.DebitCert) (c []byte, err error) {
255         w := new(work)
256         w.docid = document.Id(signed)
257         w.account = fmt.Sprintf("%s.%s", cert.Holder, cert.Issuer)
258         w.in = d
259         w.signed = signed
260         w.debit = cert
261         w.sync = make(chan int)
262         newchan <- w
263         <-w.sync
264         return w.out, w.err
265 }
266
267 func dispatch() {
268         works := make(map[string]*worklist)
269         for {
270                 select {
271                 case w := <-newchan:
272                         ws := works[w.account]
273                         if ws == nil {
274                                 // TODO: unnecessary alloc
275                                 works[w.account] = new(worklist)
276                                 go handle(w)
277                         } else {
278                                 pushwork(ws, w)
279                         }
280                 case account := <-delchan:
281                         ws := works[account]
282                         w := popwork(ws)
283                         if w == nil {
284                                 delete(works, account)
285                         } else {
286                                 go handle(w)
287                         }
288                 }
289         }
290 }
291
292 func handle(w *work) {
293         if w.debit != nil {
294                 handleDebit(w)
295         } else if w.draft != nil {
296                 handleDraft(w)
297         } else {
298                 panic("unreachable")
299         }
300         delchan <- w.account
301         w.sync <- 0
302 }
303
304 func handleDraft(w *work) {
305         nonce := fmt.Sprintf("%s.%s", w.account, w.draft.Nonce)
306         oldid, err := db.Get("draftby/key.issuer.nonce", nonce)
307         if err == nil {
308                 if string(oldid) != w.docid {
309                         w.err = fmt.Errorf("draft nonce is not unique (see draft %s)", oldid)
310                 } else {
311                         w.out, w.err = GetCert("certby/draft", w.docid)
312                 }
313                 return
314         } else if _, ok := err.(store.NotFoundError); !ok {
315                 w.err = err
316                 return
317         }
318
319         err = db.Begin(w.docid)
320         if err != nil {
321                 w.err = err
322                 return
323         }
324         err = db.Set("draft", w.docid, w.in)
325         if err != nil {
326                 w.err = err
327                 return
328         }
329         err = db.Set("draftby/key.issuer.nonce", nonce, []byte(w.docid))
330         if err != nil {
331                 w.err = err
332                 return
333         }
334         cert, err := newDebitCert(w)
335         if err != nil {
336                 // probably client error
337                 db.End(w.docid)
338                 return
339         }
340         c, signed, err := document.Format(cert, serverkey)
341         if err != nil {
342                 return
343         }
344         certid := document.Id(signed)
345         w.out = c
346         err = db.Set("cert", certid, c)
347         if err != nil {
348                 // internal error
349                 return
350         }
351         err = db.Set("certby/draft", w.docid, []byte(certid))
352         if err != nil {
353                 // internal error
354                 return
355         }
356         // TODO: append?
357         err = db.Set("certby/key.issuer.serial", fmt.Sprintf("%s.%09d", w.account, cert.Serial), []byte(certid))
358         if err != nil {
359                 // internal error
360                 return
361         }
362         err = db.Set("certby/key.issuer", w.account, []byte(certid))
363         if err != nil {
364                 // internal error
365                 return
366         }
367         // TODO: run journal cleanup in case of client errors
368         err = db.End(w.docid)
369         return
370 }
371
372 func handleDebit(w *work) {
373         c, err := GetCert("certby/debit", w.docid)
374         if err == nil {
375                 w.out = c
376                 return
377         } else if _, ok := err.(store.NotFoundError); !ok {
378                 // internal error
379                 w.err = err
380                 return
381         }
382         err = db.Begin(w.docid)
383         if err != nil {
384                 w.err = err
385                 return
386         }
387         err = db.Set("cert", w.docid, w.in)
388         if err != nil {
389                 w.err = err
390                 return
391         }
392         // TODO: check pubkey etc
393         cert, err := newCreditCert(w)
394         if err != nil {
395                 // internal error
396                 return
397         }
398         c, signed, err := document.Format(cert, serverkey)
399         if err != nil {
400                 // internal error
401                 return
402         }
403         certid := document.Id(signed)
404         err = db.Set("cert", certid, c)
405         if err != nil {
406                 // internal error
407                 return
408         }
409         err = db.Set("certby/debit", w.docid, []byte(certid))
410         if err != nil {
411                 // internal error
412                 return
413         }
414         // TODO: append?
415         err = db.Set("certby/key.issuer.serial", fmt.Sprintf("%s.%09d", w.account, cert.Serial), []byte(certid))
416         if err != nil {
417                 // internal error
418                 return
419         }
420         err = db.Set("certby/key.issuer", w.account, []byte(certid))
421         if err != nil {
422                 // internal error
423                 return
424         }
425         db.End(w.docid)
426         return
427 }
428
429 func newDebitCert(w *work) (*document.DebitCert, error) {
430         cert := new(document.DebitCert)
431         cert.Holder = w.draft.Drawer
432         cert.Date = time.Now().Unix()
433         cert.Denomination = "epoint"
434         cert.Issuer = w.draft.Issuer
435         cert.AuthorizedBy = w.draft.AuthorizedBy
436         cert.Difference = -w.draft.Amount
437         cert.Draft = w.docid
438         cert.Beneficiary = w.draft.Beneficiary
439
440         oid, err := db.Get("certby/key.issuer", w.account)
441         oldcertid := string(oid)
442         if err != nil {
443                 // first cert: drawer is issuer
444                 if w.draft.Drawer != w.draft.Issuer {
445                         return nil, fmt.Errorf("drawer must be the issuer when drawing an empty account (%s != %s)", w.draft.Drawer, w.draft.Issuer)
446                 }
447                 cert.Serial = 1
448                 cert.Balance = cert.Difference
449                 cert.LastDebitSerial = 0
450                 cert.LastCreditSerial = 0
451         } else {
452                 d, err := db.Get("cert", oldcertid)
453                 if err != nil {
454                         return nil, err
455                 }
456                 iv, _, err := document.Parse(d)
457                 if err != nil {
458                         // internal error
459                         return nil, err
460                 }
461                 // TODO: this is a hack
462                 oldcert, err := document.ToCert(iv)
463                 if err != nil {
464                         // internal error
465                         return nil, err
466                 }
467                 // TODO: sanity checks? oldcert.Holder == draft.Drawer
468                 cert.Serial = oldcert.Serial + 1
469                 cert.Balance = oldcert.Balance + cert.Difference
470                 if cert.Balance <= -IntLimit {
471                         return nil, fmt.Errorf("balance limit exceeded: %d", cert.Balance)
472                 }
473                 if oldcert.Balance > 0 && cert.Balance < 0 {
474                         return nil, fmt.Errorf("insufficient funds: %d", oldcert.Balance)
475                 }
476                 cert.LastDebitSerial = oldcert.LastDebitSerial
477                 cert.LastCreditSerial = oldcert.LastCreditSerial
478                 if _, ok := iv.(*document.DebitCert); ok {
479                         cert.LastDebitSerial = oldcert.Serial
480                 } else {
481                         cert.LastCreditSerial = oldcert.Serial
482                 }
483                 cert.LastCert = &oldcertid
484         }
485         return cert, nil
486 }
487
488 func newCreditCert(w *work) (*document.CreditCert, error) {
489         cert := new(document.CreditCert)
490         // TODO: get from old cert instead?
491         cert.Holder = w.debit.Beneficiary
492         cert.Date = time.Now().Unix()
493         // TODO: get these from the cert holder pubkey
494         cert.Denomination = "epoint"
495         cert.Issuer = w.debit.Issuer
496         cert.AuthorizedBy = w.debit.AuthorizedBy // TODO: draft vs dcert vs serverside decision
497         cert.Difference = -w.debit.Difference
498         cert.Draft = w.debit.Draft
499         cert.Drawer = w.debit.Holder
500         cert.DebitCert = w.docid
501
502         oid, err := db.Get("certby/key", w.debit.Beneficiary)
503         oldcertid := string(oid)
504         if err != nil {
505                 // this is the first cert
506                 cert.Serial = 1
507                 cert.Balance = cert.Difference
508                 cert.LastDebitSerial = 0
509                 cert.LastCreditSerial = 0
510         } else {
511                 d, err := db.Get("cert", oldcertid)
512                 if err != nil {
513                         // internal error
514                         return nil, err
515                 }
516                 iv, _, err := document.Parse(d)
517                 if err != nil {
518                         // internal error
519                         return nil, err
520                 }
521                 // TODO: this is a hack
522                 oldcert, err := document.ToCert(iv)
523                 if err != nil {
524                         // internal error
525                         return nil, err
526                 }
527                 cert.Serial = oldcert.Serial + 1
528                 cert.Balance = oldcert.Balance + cert.Difference
529                 if cert.Balance >= IntLimit {
530                         return nil, fmt.Errorf("balance limit exceeded: %d", cert.Balance)
531                 }
532                 cert.LastDebitSerial = oldcert.LastDebitSerial
533                 cert.LastCreditSerial = oldcert.LastCreditSerial
534                 if _, ok := iv.(*document.DebitCert); ok {
535                         cert.LastDebitSerial = oldcert.Serial
536                 } else {
537                         cert.LastCreditSerial = oldcert.Serial
538                 }
539                 cert.LastCert = &oldcertid
540         }
541         return cert, nil
542 }
543
544 func Init(rootdir string, sk *openpgp.Entity) (err error) {
545         db, err = store.Open(rootdir)
546         if err != nil {
547                 return
548         }
549         err = db.Ensure("key")
550         if err != nil {
551                 return
552         }
553         err = db.Ensure("cert")
554         if err != nil {
555                 return
556         }
557         err = db.Ensure("draft")
558         if err != nil {
559                 return
560         }
561         err = db.Ensure("certby/draft")
562         if err != nil {
563                 return
564         }
565         err = db.Ensure("certby/debit")
566         if err != nil {
567                 return
568         }
569         err = db.Ensure("certby/key.issuer")
570         if err != nil {
571                 return
572         }
573         err = db.Ensure("certby/key.issuer.serial")
574         if err != nil {
575                 return
576         }
577         err = db.Ensure("draftby/key.issuer.nonce")
578         if err != nil {
579                 return
580         }
581         err = db.Ensure("keysby/64")
582         if err != nil {
583                 return
584         }
585         err = db.Ensure("keysby/32")
586         if err != nil {
587                 return
588         }
589         serverkey = sk
590         err = storekey()
591         if err != nil {
592                 return
593         }
594         go dispatch()
595         return
596 }