server transactions: channel based sync for each account
authornsz <nsz@port70.net>
Fri, 13 Jan 2012 12:47:36 +0000 (13:47 +0100)
committernsz <nsz@port70.net>
Fri, 13 Jan 2012 12:47:36 +0000 (13:47 +0100)
cmd/epoint-server/epoint-server.go
pkg/server/server.go
pkg/store/store.go

index ee6bf58..cf29fa7 100644 (file)
@@ -16,12 +16,10 @@ const (
        seckey  = "./key.sec"
 )
 
-var serverkey *openpgp.Entity
-
 // todo: http header limit: 64K, body limit: 64K
 
-// Dummy initialization of serverkey
-func initkey() (err error) {
+// TODO: generate
+func initkey() (sk *openpgp.Entity, err error) {
        f, err := os.Open(seckey)
        if err != nil {
                return
@@ -35,25 +33,7 @@ func initkey() (err error) {
        if err != nil {
                return
        }
-       serverkey = keys[0]
-       err = os.MkdirAll(rootdir, 0755)
-       if err != nil {
-               return
-       }
-       f, err = os.Create(rootdir + "/serverkey")
-       if err != nil {
-               return
-       }
-       err = serverkey.Serialize(f)
-       if err != nil {
-               return
-       }
-       // TODO: make sure pubkey is replicated and available
-       err = f.Sync()
-       if err != nil {
-               return
-       }
-       err = f.Close()
+       sk = keys[0]
        return
 }
 
@@ -89,7 +69,7 @@ func submitHandler(w http.ResponseWriter, r *http.Request) {
        key := r.FormValue("key")
        switch {
        case draft != "":
-               cert, err := server.EvalDraft([]byte(draft), serverkey)
+               cert, err := server.EvalDraft([]byte(draft))
                if err != nil {
                        msg := fmt.Sprintf("eval draft failed: %s", err)
                        httpError(w, 404, msg)
@@ -97,7 +77,7 @@ func submitHandler(w http.ResponseWriter, r *http.Request) {
                        w.Write(cert)
                }
        case debit != "":
-               cert, err := server.EvalDebitCert([]byte(debit), serverkey)
+               cert, err := server.EvalDebitCert([]byte(debit))
                if err != nil {
                        msg := fmt.Sprintf("eval debit failed: %s", err)
                        httpError(w, 404, msg)
@@ -119,15 +99,11 @@ func submitHandler(w http.ResponseWriter, r *http.Request) {
 }
 
 func main() {
-       err := initkey()
-       if err != nil {
-               log.Fatal(err)
-       }
-       err = server.Init(rootdir)
+       serverkey, err := initkey()
        if err != nil {
                log.Fatal(err)
        }
-       err = server.StoreSk(serverkey)
+       err = server.Init(rootdir, serverkey)
        if err != nil {
                log.Fatal(err)
        }
index 53ef664..2a04bb8 100644 (file)
@@ -15,16 +15,64 @@ import (
 // TODO: do in docs?
 const IntLimit = 1e15
 
+// transaction: either draft or debit cert
+type work struct {
+       next    *work
+       docid   string
+       account string
+       in      []byte
+       out     []byte
+       err     error
+       signed  *document.Signed
+       draft   *document.Draft
+       debit   *document.DebitCert
+       sync    chan int
+}
+
 var db *store.Conn
+var serverkey *openpgp.Entity
+var newchan chan *work
+var delchan chan string
+
+type worklist struct {
+       head *work
+       tail *work
+}
+
+func pushwork(ws *worklist, w *work) {
+       if ws.tail == nil {
+               ws.head = w
+               ws.tail = w
+       } else {
+               ws.tail.next = w
+               ws.tail = w
+       }
+}
 
-func StoreSk(sk *openpgp.Entity) (err error) {
-       // TODO: initkey should save serverkey in db
+func popwork(ws *worklist) *work {
+       w := ws.head
+       if w == nil {
+               return nil
+       }
+       ws.head = w.next
+       if ws.head == nil {
+               ws.tail = nil
+       }
+       return w
+}
+
+func storekey() (err error) {
        b := new(bytes.Buffer)
-       err = sk.Serialize(b)
+       err = serverkey.Serialize(b)
+       if err != nil {
+               return
+       }
+       err = db.Set("key", key.Id(serverkey), b.Bytes())
        if err != nil {
                return
        }
-       return db.Set("key", key.Id(sk), b.Bytes())
+       err = db.Set("", "serverkey", b.Bytes())
+       return
 }
 
 func GetKeys(fpr string) (es openpgp.EntityList, err error) {
@@ -97,141 +145,304 @@ func AddKeys(d []byte) (err error) {
        return
 }
 
-func CertByDraft(draftid string) (d []byte, err error) {
-       certid, err := db.Get("certby/draft", draftid)
+// Get cert through the named store
+func GetCert(name, id string) (d []byte, err error) {
+       certid, err := db.Get(name, id)
        if err != nil {
-               // TODO: we have the draft but the cert is not ready
+               // ok if notfound
                return
        }
        d, err = db.Get("cert", string(certid))
        if err != nil {
-               // shouldn't happen, cert is not available
+               // internal error: cert is not available
                return
        }
        return
 }
 
-func CertByDebitCert(debitid string) (d []byte, err error) {
-       creditid, err := db.Get("certby/debit", debitid)
+func EvalDraft(d []byte) (c []byte, err error) {
+       iv, signed, err := document.Parse(d)
+       if err != nil {
+               return
+       }
+       draft, ok := iv.(*document.Draft)
+       if !ok {
+               err = fmt.Errorf("EvalDraft: expected a draft document")
+               return
+       }
+       k, err := db.Get("key", draft.Drawer)
        if err != nil {
-               // TODO: we have the debit cert but the credit cert is not ready
                return
        }
-       d, err = db.Get("cert", string(creditid))
+       // TODO: key.Parse
+       kr, err := openpgp.ReadKeyRing(bytes.NewBuffer(k))
        if err != nil {
-               // shouldn't happen, cert is not available
+               // internal error: pubkey cannot be parsed
                return
        }
-       return
+       err = document.Verify(signed, kr)
+       if err != nil {
+               return
+       }
+       //      _, issuer, denom, err := key.Check(kr[0])
+       //      if err != nil {
+       //              return
+       //      }
+       // TODO: do various format checks (AuthorizedBy check etc)
+       if draft.Amount <= 0 || draft.Amount >= IntLimit {
+               err = fmt.Errorf("draft amount is invalid: %d", draft.Amount)
+               return
+       }
+       k, err = db.Get("key", draft.Beneficiary)
+       if err != nil {
+               return
+       }
+       kr, err = openpgp.ReadKeyRing(bytes.NewBuffer(k))
+       if err != nil {
+               // internal error: pubkey cannot be parsed
+               return
+       }
+       return addDraft(d, signed, draft)
 }
 
-// parse clear signed draft and verify it
-func ParseDraft(d []byte) (draft *document.Draft, draftid string, err error) {
+func EvalDebitCert(d []byte) (c []byte, err error) {
        iv, signed, err := document.Parse(d)
        if err != nil {
                return
        }
-       draft, ok := iv.(*document.Draft)
+       cert, ok := iv.(*document.DebitCert)
        if !ok {
-               err = fmt.Errorf("ParseDraft: expected a draft document")
+               err = fmt.Errorf("ParseDebitCert: expected a debit docuent")
                return
        }
-       draftid = document.Id(signed)
 
-       k, err := db.Get("key", draft.Drawer)
+       kr := openpgp.EntityList{serverkey}
+       if key.Id(serverkey) != cert.AuthorizedBy {
+               // TODO: ...
+               k := []byte(nil)
+               k, err = db.Get("key", cert.AuthorizedBy)
+               if err != nil {
+                       return
+               }
+               kr, err = openpgp.ReadKeyRing(bytes.NewBuffer(k))
+               if err != nil {
+                       // internal error: pubkey cannot be parsed
+                       return
+               }
+       }
+       // must clean up to make sure the hash is ok
+       err = document.Verify(signed, kr)
        if err != nil {
                return
        }
-       kr, err := openpgp.ReadKeyRing(bytes.NewBuffer(k))
+       return addDebit(d, signed, cert)
+}
+
+func addDraft(d []byte, signed *document.Signed, draft *document.Draft) (c []byte, err error) {
+       w := new(work)
+       w.docid = document.Id(signed)
+       w.account = fmt.Sprintf("%s.%s", draft.Drawer, draft.Issuer)
+       w.in = d
+       w.signed = signed
+       w.draft = draft
+       w.sync = make(chan int)
+       newchan <- w
+       <-w.sync
+       return w.out, w.err
+}
+
+func addDebit(d []byte, signed *document.Signed, cert *document.DebitCert) (c []byte, err error) {
+       w := new(work)
+       w.docid = document.Id(signed)
+       w.account = fmt.Sprintf("%s.%s", cert.Holder, cert.Issuer)
+       w.in = d
+       w.signed = signed
+       w.debit = cert
+       w.sync = make(chan int)
+       newchan <- w
+       <-w.sync
+       return w.out, w.err
+}
+
+func dispatch() {
+       works := make(map[string]*worklist)
+       for {
+               select {
+               case w := <-newchan:
+                       ws := works[w.account]
+                       if ws == nil {
+                               // TODO: unnecessary alloc
+                               works[w.account] = new(worklist)
+                               go handle(w)
+                       } else {
+                               pushwork(ws, w)
+                       }
+               case account := <-delchan:
+                       ws := works[account]
+                       w := popwork(ws)
+                       if w == nil {
+                               delete(works, account)
+                       } else {
+                               go handle(w)
+                       }
+               }
+       }
+}
+
+func handle(w *work) {
+       if w.debit != nil {
+               handleDebit(w)
+       } else if w.draft != nil {
+               handleDraft(w)
+       } else {
+               panic("unreachable")
+       }
+       delchan <- w.account
+       w.sync <- 0
+}
+
+func handleDraft(w *work) {
+       nonce := fmt.Sprintf("%s.%s", w.account, w.draft.Nonce)
+       oldid, err := db.Get("draftby/key.issuer.nonce", nonce)
+       if err == nil {
+               if string(oldid) != w.docid {
+                       w.err = fmt.Errorf("draft nonce is not unique (see draft %s)", oldid)
+               } else {
+                       w.out, w.err = GetCert("certby/draft", w.docid)
+               }
+               return
+       } else if _, ok := err.(store.NotFoundError); !ok {
+               w.err = err
+               return
+       }
+
+       err = db.Begin(w.docid)
        if err != nil {
-               // internal error: pubkey cannot be parsed
+               w.err = err
                return
        }
-       err = document.Verify(signed, kr)
+       err = db.Set("draft", w.docid, w.in)
        if err != nil {
+               w.err = err
                return
        }
-       _, issuer, denom, err := key.Check(kr[0])
+       err = db.Set("draftby/key.issuer.nonce", nonce, []byte(w.docid))
        if err != nil {
+               w.err = err
                return
        }
-       k, err = db.Get("key", draft.Beneficiary)
+       cert, err := newDebitCert(w)
        if err != nil {
+               // probably client error
+               db.End(w.docid)
                return
        }
-       kr, err = openpgp.ReadKeyRing(bytes.NewBuffer(k))
+       c, signed, err := document.Format(cert, serverkey)
        if err != nil {
-               // internal error: pubkey cannot be parsed
                return
        }
-       _, issuer2, denom2, err := key.Check(kr[0])
+       certid := document.Id(signed)
+       w.out = c
+       err = db.Set("cert", certid, c)
        if err != nil {
+               // internal error
                return
        }
-       if draft.Issuer != issuer ||
-               draft.Issuer != issuer2 ||
-               draft.Denomination != denom ||
-               draft.Denomination != denom2 {
-               err = fmt.Errorf("Issuer or denomination mismatch")
+       err = db.Set("certby/draft", w.docid, []byte(certid))
+       if err != nil {
+               // internal error
                return
        }
-
-       // TODO: do various format checks (AuthorizedBy check etc)
-       if draft.Amount <= 0 || draft.Amount >= IntLimit {
-               err = fmt.Errorf("draft amount is invalid: %d", draft.Amount)
+       // TODO: append?
+       err = db.Set("certby/key.issuer.serial", fmt.Sprintf("%s.%09d", w.account, cert.Serial), []byte(certid))
+       if err != nil {
+               // internal error
                return
        }
+       err = db.Set("certby/key.issuer", w.account, []byte(certid))
+       if err != nil {
+               // internal error
+               return
+       }
+       // TODO: run journal cleanup in case of client errors
+       err = db.End(w.docid)
        return
 }
 
-func ParseDebitCert(d []byte) (cert *document.DebitCert, certid string, err error) {
-       iv, signed, err := document.Parse(d)
+func handleDebit(w *work) {
+       c, err := GetCert("certby/debit", w.docid)
+       if err == nil {
+               w.out = c
+               return
+       } else if _, ok := err.(store.NotFoundError); !ok {
+               // internal error
+               w.err = err
+               return
+       }
+       err = db.Begin(w.docid)
        if err != nil {
+               w.err = err
                return
        }
-       cert, ok := iv.(*document.DebitCert)
-       if !ok {
-               err = fmt.Errorf("ParseDebitCert: expected a debit docuent")
+       err = db.Set("cert", w.docid, w.in)
+       if err != nil {
+               w.err = err
                return
        }
-
-       k, err := db.Get("key", cert.AuthorizedBy)
+       // TODO: check pubkey etc
+       cert, err := newCreditCert(w)
        if err != nil {
+               // internal error
                return
        }
-       // TODO: keep our key at hand
-       kr, err := openpgp.ReadKeyRing(bytes.NewBuffer(k))
+       c, signed, err := document.Format(cert, serverkey)
        if err != nil {
-               // internal error: pubkey cannot be parsed
+               // internal error
                return
        }
-       // must clean up to make sure the hash is ok
-       err = document.Verify(signed, kr)
+       certid := document.Id(signed)
+       err = db.Set("cert", certid, c)
        if err != nil {
+               // internal error
                return
        }
-
-       certid = document.Id(signed)
+       err = db.Set("certby/debit", w.docid, []byte(certid))
+       if err != nil {
+               // internal error
+               return
+       }
+       // TODO: append?
+       err = db.Set("certby/key.issuer.serial", fmt.Sprintf("%s.%09d", w.account, cert.Serial), []byte(certid))
+       if err != nil {
+               // internal error
+               return
+       }
+       err = db.Set("certby/key.issuer", w.account, []byte(certid))
+       if err != nil {
+               // internal error
+               return
+       }
+       db.End(w.docid)
        return
 }
 
-func NewDebitCert(draftid string, draft *document.Draft) (*document.DebitCert, error) {
+func newDebitCert(w *work) (*document.DebitCert, error) {
        cert := new(document.DebitCert)
-       cert.Holder = draft.Drawer
+       cert.Holder = w.draft.Drawer
        cert.Date = time.Now().Unix()
        cert.Denomination = "epoint"
-       cert.Issuer = draft.Issuer
-       cert.AuthorizedBy = draft.AuthorizedBy
-       cert.Difference = -draft.Amount
-       cert.Draft = draftid
-       cert.Beneficiary = draft.Beneficiary
+       cert.Issuer = w.draft.Issuer
+       cert.AuthorizedBy = w.draft.AuthorizedBy
+       cert.Difference = -w.draft.Amount
+       cert.Draft = w.docid
+       cert.Beneficiary = w.draft.Beneficiary
 
-       oid, err := db.Get("certby/key", draft.Drawer)
+       oid, err := db.Get("certby/key.issuer", w.account)
        oldcertid := string(oid)
        if err != nil {
                // first cert: drawer is issuer
-               if draft.Drawer != draft.Issuer {
-                       return nil, fmt.Errorf("drawer must be the issuer when drawing an empty account (%s != %s)", draft.Drawer, draft.Issuer)
+               if w.draft.Drawer != w.draft.Issuer {
+                       return nil, fmt.Errorf("drawer must be the issuer when drawing an empty account (%s != %s)", w.draft.Drawer, w.draft.Issuer)
                }
                cert.Serial = 1
                cert.Balance = cert.Difference
@@ -274,21 +485,21 @@ func NewDebitCert(draftid string, draft *document.Draft) (*document.DebitCert, e
        return cert, nil
 }
 
-func NewCreditCert(draftid string, draft *document.Draft, dcertid string, dcert *document.DebitCert) (*document.CreditCert, error) {
+func newCreditCert(w *work) (*document.CreditCert, error) {
        cert := new(document.CreditCert)
        // TODO: get from old cert instead?
-       cert.Holder = dcert.Beneficiary
+       cert.Holder = w.debit.Beneficiary
        cert.Date = time.Now().Unix()
        // TODO: get these from the cert holder pubkey
        cert.Denomination = "epoint"
-       cert.Issuer = draft.Issuer
-       cert.AuthorizedBy = dcert.AuthorizedBy // TODO: draft vs dcert vs serverside decision
-       cert.Difference = -dcert.Difference
-       cert.Draft = draftid
-       cert.Drawer = dcert.Holder
-       cert.DebitCert = dcertid
+       cert.Issuer = w.debit.Issuer
+       cert.AuthorizedBy = w.debit.AuthorizedBy // TODO: draft vs dcert vs serverside decision
+       cert.Difference = -w.debit.Difference
+       cert.Draft = w.debit.Draft
+       cert.Drawer = w.debit.Holder
+       cert.DebitCert = w.docid
 
-       oid, err := db.Get("certby/key", dcert.Beneficiary)
+       oid, err := db.Get("certby/key", w.debit.Beneficiary)
        oldcertid := string(oid)
        if err != nil {
                // this is the first cert
@@ -330,134 +541,7 @@ func NewCreditCert(draftid string, draft *document.Draft, dcertid string, dcert
        return cert, nil
 }
 
-func EvalDraft(d []byte, sk *openpgp.Entity) (r []byte, err error) {
-       draft, draftid, err := ParseDraft(d)
-       if err != nil {
-               return
-       }
-       _, err = db.Get("draft", draftid)
-       if err == nil {
-               // found
-               // TODO: certby/draft might not be ready even if draft is there
-               return CertByDraft(draftid)
-       }
-       // if draft is ok we save it
-       err = db.Set("draft", draftid, d)
-       if err != nil {
-               // internal error
-               return
-       }
-       // TODO: db.Insert: fails if key exists
-       s := fmt.Sprintf("%s.%s", draft.Drawer, draft.Nonce)
-       _, err = db.Get("draftby/key.nonce", s)
-       if err == nil {
-               err = fmt.Errorf("draft nonce is not unique")
-               return
-       }
-       err = db.Set("draftby/key.nonce", s, d)
-       if err != nil {
-               // internal error
-               return
-       }
-
-       // debit cert
-       cert, err := NewDebitCert(draftid, draft)
-       if err != nil {
-               return
-       }
-       r, signed, err := document.Format(cert, sk)
-       certid := document.Id(signed)
-       err = db.Set("cert", certid, r)
-       if err != nil {
-               // internal error
-               return
-       }
-       err = db.Set("certby/draft", draftid, []byte(certid))
-       if err != nil {
-               // internal error
-               return
-       }
-       err = db.Set("certby/key", cert.Holder, []byte(certid))
-       if err != nil {
-               // internal error
-               return
-       }
-       // TODO: append?
-       err = db.Set("certby/key.serial", fmt.Sprintf("%s.%09d", cert.Holder, cert.Serial), []byte(certid))
-       if err != nil {
-               // internal error
-               return
-       }
-       return
-}
-
-func EvalDebitCert(d []byte, sk *openpgp.Entity) (r []byte, err error) {
-       dcert, dcertid, err := ParseDebitCert(d)
-       if err != nil {
-               return
-       }
-       r, err = CertByDebitCert(dcertid)
-       if err == nil {
-               // found
-               return
-       }
-       // TODO: we only need the draft to know the issuer (+beneficiary)
-       // it should be in the pubkey
-       d, err = db.Get("draft", dcert.Draft)
-       if err != nil {
-               // internal error
-               return
-       }
-       iv, _, err := document.Parse(d)
-       if err != nil {
-               // internal error
-               return
-       }
-       draft, ok := iv.(*document.Draft)
-       if !ok {
-               // internal error
-               err = fmt.Errorf("EvalDebitCert: expected draft from internal db")
-               return
-       }
-
-       // credit side
-       // TODO: check pubkey etc
-       cert, err := NewCreditCert(dcert.Draft, draft, dcertid, dcert)
-       if err != nil {
-               // internal error
-               return
-       }
-       r, signed, err := document.Format(cert, sk)
-       if err != nil {
-               // internal error
-               return
-       }
-       certid := document.Id(signed)
-       err = db.Set("cert", certid, r)
-       if err != nil {
-               // internal error
-               return
-       }
-       err = db.Set("certby/debit", dcertid, []byte(certid))
-       if err != nil {
-               // internal error
-               return
-       }
-       err = db.Set("certby/key", cert.Holder, []byte(certid))
-       if err != nil {
-               // internal error
-               return
-       }
-       // TODO: append?
-       err = db.Set("certby/key.serial", fmt.Sprintf("%s.%09d", cert.Holder, cert.Serial), []byte(certid))
-       if err != nil {
-               // internal error
-               return
-       }
-       return
-}
-
-func Init(rootdir string) (err error) {
+func Init(rootdir string, sk *openpgp.Entity) (err error) {
        db, err = store.Open(rootdir)
        if err != nil {
                return
@@ -482,15 +566,15 @@ func Init(rootdir string) (err error) {
        if err != nil {
                return
        }
-       err = db.Ensure("certby/key")
+       err = db.Ensure("certby/key.issuer")
        if err != nil {
                return
        }
-       err = db.Ensure("certby/key.serial")
+       err = db.Ensure("certby/key.issuer.serial")
        if err != nil {
                return
        }
-       err = db.Ensure("draftby/key.nonce")
+       err = db.Ensure("draftby/key.issuer.nonce")
        if err != nil {
                return
        }
@@ -502,5 +586,11 @@ func Init(rootdir string) (err error) {
        if err != nil {
                return
        }
+       serverkey = sk
+       err = storekey()
+       if err != nil {
+               return
+       }
+       go dispatch()
        return
 }
index 21603fe..7ea1ded 100644 (file)
@@ -40,13 +40,16 @@ func Open(root string) (c *Conn, err error) {
        if err != nil {
                return
        }
-       err = os.MkdirAll(c.path, 0755)
+       fn := filepath.Join(c.path, ".journal")
+       err = os.MkdirAll(fn, 0755)
        if err != nil {
                return
        }
        return
 }
 
+// TODO: list .journal for recovery after a crash
+
 // Get the value of k
 func (c *Conn) Get(name, k string) (v []byte, err error) {
        v, err = ioutil.ReadFile(filepath.Join(c.path, name, k))
@@ -106,6 +109,24 @@ func (c *Conn) Append(name, k string, v []byte) (err error) {
        return
 }
 
+// Begin transaction identified by k
+func (c *Conn) Begin(k string) (err error) {
+       fn := filepath.Join(c.path, ".journal", k)
+       f, err := os.OpenFile(fn, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_SYNC, 0666)
+       if err != nil {
+               return
+       }
+       err = f.Close()
+       return
+}
+
+// End transaction identified by k
+func (c *Conn) End(k string) (err error) {
+       fn := filepath.Join(c.path, ".journal", k)
+       err = os.Remove(fn)
+       return
+}
+
 // Close db connection
 func (c *Conn) Close() (err error) {
        return