1
//! Net document storage backed by sqlite3.
2
//!
3
//! We store most objects in sqlite tables, except for very large ones,
4
//! which we store as "blob" files in a separate directory.
5

            
6
use super::ExpirationConfig;
7
use crate::docmeta::{AuthCertMeta, ConsensusMeta};
8
use crate::storage::{InputString, Store};
9
use crate::{Error, Result};
10

            
11
use tor_netdoc::doc::authcert::AuthCertKeyIds;
12
use tor_netdoc::doc::microdesc::MdDigest;
13
use tor_netdoc::doc::netstatus::{ConsensusFlavor, Lifetime};
14
#[cfg(feature = "routerdesc")]
15
use tor_netdoc::doc::routerdesc::RdDigest;
16

            
17
use std::collections::HashMap;
18
use std::convert::TryInto;
19
use std::path::{self, Path, PathBuf};
20
use std::time::SystemTime;
21

            
22
use rusqlite::{params, OpenFlags, OptionalExtension, Transaction};
23
use time::OffsetDateTime;
24
use tracing::trace;
25

            
26
#[cfg(target_family = "unix")]
27
use std::os::unix::fs::DirBuilderExt;
28

            
29
/// Local directory cache using a Sqlite3 connection.
30
pub(crate) struct SqliteStore {
31
    /// Connection to the sqlite3 database.
32
    conn: rusqlite::Connection,
33
    /// Location for the sqlite3 database; used to reopen it.
34
    sql_path: Option<PathBuf>,
35
    /// Location to store blob files.
36
    path: PathBuf,
37
    /// Lockfile to prevent concurrent write attempts from different
38
    /// processes.
39
    ///
40
    /// If this is None we aren't using a lockfile.  Watch out!
41
    ///
42
    /// (sqlite supports that with connection locking, but we want to
43
    /// be a little more coarse-grained here)
44
    lockfile: Option<fslock::LockFile>,
45
}
46

            
47
impl SqliteStore {
48
    /// Construct or open a new SqliteStore at some location on disk.
49
    /// The provided location must be a directory, or a possible
50
    /// location for a directory: the directory will be created if
51
    /// necessary.
52
    ///
53
    /// If readonly is true, the result will be a read-only store.
54
    /// Otherwise, when readonly is false, the result may be
55
    /// read-only or read-write, depending on whether we can acquire
56
    /// the lock.
57
    ///
58
    /// # Limitations:
59
    ///
60
    /// The file locking that we use to ensure that only one dirmgr is
61
    /// writing to a given storage directory at a time is currently
62
    /// _per process_. Therefore, you might get unexpected results if
63
    /// two SqliteStores are created in the same process with the
64
    /// path.
65
33
    pub(crate) fn from_path<P: AsRef<Path>>(path: P, mut readonly: bool) -> Result<Self> {
66
33
        let path = path.as_ref();
67
33
        let sqlpath = path.join("dir.sqlite3");
68
33
        let blobpath = path.join("dir_blobs/");
69
33
        let lockpath = path.join("dir.lock");
70
33

            
71
33
        if !readonly {
72
31
            let mut builder = std::fs::DirBuilder::new();
73
31
            #[cfg(target_family = "unix")]
74
31
            builder.mode(0o700);
75
31
            builder.recursive(true).create(&blobpath).map_err(|err| {
76
                Error::StorageError(format!("Creating directory at {:?}: {}", &blobpath, err))
77
31
            })?;
78
2
        }
79

            
80
33
        let mut lockfile = fslock::LockFile::open(&lockpath)?;
81
33
        if !readonly && !lockfile.try_lock()? {
82
            readonly = true; // we couldn't get the lock!
83
33
        };
84
33
        let flags = if readonly {
85
2
            OpenFlags::SQLITE_OPEN_READ_ONLY
86
        } else {
87
31
            OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
88
        };
89
33
        let conn = rusqlite::Connection::open_with_flags(&sqlpath, flags)?;
90
32
        let mut store = SqliteStore::from_conn(conn, &blobpath)?;
91
32
        store.sql_path = Some(sqlpath);
92
32
        store.lockfile = Some(lockfile);
93
32
        Ok(store)
94
33
    }
95

            
96
    /// Construct a new SqliteStore from a database connection and a location
97
    /// for blob files.
98
    ///
99
    /// Used for testing with a memory-backed database.
100
42
    pub(crate) fn from_conn<P>(conn: rusqlite::Connection, path: P) -> Result<Self>
101
42
    where
102
42
        P: AsRef<Path>,
103
42
    {
104
42
        let path = path.as_ref().to_path_buf();
105
42
        let mut result = SqliteStore {
106
42
            conn,
107
42
            path,
108
42
            lockfile: None,
109
42
            sql_path: None,
110
42
        };
111
42

            
112
42
        result.check_schema()?;
113

            
114
41
        Ok(result)
115
42
    }
116

            
117
    /// Check whether this database has a schema format we can read, and
118
    /// install or upgrade the schema if necessary.
119
42
    fn check_schema(&mut self) -> Result<()> {
120
42
        let tx = self.conn.transaction()?;
121
42
        let db_n_tables: u32 = tx.query_row(
122
42
            "SELECT COUNT(name) FROM sqlite_master
123
42
             WHERE type='table'
124
42
             AND name NOT LIKE 'sqlite_%'",
125
42
            [],
126
42
            |row| row.get(0),
127
42
        )?;
128
42
        let db_exists = db_n_tables > 0;
129
42

            
130
42
        if !db_exists {
131
38
            tx.execute_batch(INSTALL_V0_SCHEMA)?;
132
38
            tx.execute_batch(UPDATE_SCHEMA_V0_TO_V1)?;
133
38
            tx.commit()?;
134
37
            return Ok(());
135
4
        }
136

            
137
4
        let (version, readable_by): (u32, u32) = tx.query_row(
138
4
            "SELECT version, readable_by FROM TorSchemaMeta
139
4
             WHERE name = 'TorDirStorage'",
140
4
            [],
141
4
            |row| Ok((row.get(0)?, row.get(1)?)),
142
4
        )?;
143

            
144
4
        if version < SCHEMA_VERSION {
145
            // Update the schema.
146
            tx.execute_batch(UPDATE_SCHEMA_V0_TO_V1)?;
147
            tx.commit()?;
148
            return Ok(());
149
4
        } else if readable_by > SCHEMA_VERSION {
150
1
            return Err(Error::UnrecognizedSchema);
151
3
        }
152
3

            
153
3
        // rolls back the transaction, but nothing was done.
154
3
        Ok(())
155
41
    }
156

            
157
    /// Return the correct filename for a given blob, based on the filename
158
    /// from the ExtDocs table.
159
32
    fn blob_fname<P>(&self, path: P) -> Result<PathBuf>
160
32
    where
161
32
        P: AsRef<Path>,
162
32
    {
163
32
        let path = path.as_ref();
164
32
        if !path
165
32
            .components()
166
32
            .all(|c| matches!(c, path::Component::Normal(_)))
167
        {
168
4
            return Err(Error::CacheCorruption("Invalid path in database"));
169
28
        }
170
28

            
171
28
        let mut result = self.path.clone();
172
28
        result.push(path);
173
28
        Ok(result)
174
32
    }
175

            
176
    /// Read a blob from disk, mapping it if possible.
177
11
    fn read_blob<P>(&self, path: P) -> Result<InputString>
178
11
    where
179
11
        P: AsRef<Path>,
180
11
    {
181
11
        let path = path.as_ref();
182
11
        let full_path = self.blob_fname(path)?;
183
11
        InputString::load(&full_path).map_err(|err| {
184
            Error::StorageError(format!(
185
                "Loading blob {:?} from storage at {:?}: {}",
186
                path, full_path, err
187
            ))
188
11
        })
189
11
    }
190

            
191
    /// Write a file to disk as a blob, and record it in the ExtDocs table.
192
    ///
193
    /// Return a SavedBlobHandle that describes where the blob is, and which
194
    /// can be used either to commit the blob or delete it.
195
7
    fn save_blob_internal(
196
7
        &mut self,
197
7
        contents: &[u8],
198
7
        doctype: &str,
199
7
        dtype: &str,
200
7
        digest: &[u8],
201
7
        expires: OffsetDateTime,
202
7
    ) -> Result<SavedBlobHandle<'_>> {
203
7
        let digest = hex::encode(digest);
204
7
        let digeststr = format!("{}-{}", dtype, digest);
205
7
        let fname = format!("{}_{}", doctype, digeststr);
206
7
        let full_path = self.blob_fname(&fname)?;
207

            
208
7
        let unlinker = Unlinker::new(&full_path);
209
7
        std::fs::write(full_path, contents)?;
210

            
211
7
        let tx = self.conn.unchecked_transaction()?;
212
7
        tx.execute(INSERT_EXTDOC, params![digeststr, expires, dtype, fname])?;
213

            
214
7
        Ok(SavedBlobHandle {
215
7
            tx,
216
7
            fname,
217
7
            digeststr,
218
7
            unlinker,
219
7
        })
220
7
    }
221

            
222
    /// Save a blob to disk and commit it.
223
    #[cfg(test)]
224
2
    fn save_blob(
225
2
        &mut self,
226
2
        contents: &[u8],
227
2
        doctype: &str,
228
2
        dtype: &str,
229
2
        digest: &[u8],
230
2
        expires: OffsetDateTime,
231
2
    ) -> Result<String> {
232
2
        let h = self.save_blob_internal(contents, doctype, dtype, digest, expires)?;
233
        let SavedBlobHandle {
234
2
            tx,
235
2
            digeststr,
236
2
            fname,
237
2
            unlinker,
238
2
        } = h;
239
2
        let _ = digeststr;
240
2
        tx.commit()?;
241
2
        unlinker.forget();
242
2
        Ok(fname)
243
2
    }
244

            
245
    /// Return the valid-after time for the latest non non-pending consensus,
246
    #[cfg(test)]
247
    // We should revise the tests to use latest_consensus_meta instead.
248
3
    fn latest_consensus_time(&self, flavor: ConsensusFlavor) -> Result<Option<OffsetDateTime>> {
249
3
        Ok(self
250
3
            .latest_consensus_meta(flavor)?
251
3
            .map(|m| m.lifetime().valid_after().into()))
252
3
    }
253
}
254

            
255
impl Store for SqliteStore {
256
13
    fn is_readonly(&self) -> bool {
257
13
        match &self.lockfile {
258
13
            Some(f) => !f.owns_lock(),
259
            None => false,
260
        }
261
13
    }
262
2
    fn upgrade_to_readwrite(&mut self) -> Result<bool> {
263
2
        if self.is_readonly() && self.sql_path.is_some() {
264
1
            let lf = self
265
1
                .lockfile
266
1
                .as_mut()
267
1
                .expect("No lockfile open; cannot upgrade to read-write storage");
268
1
            if !lf.try_lock()? {
269
                // Somebody else has the lock.
270
                return Ok(false);
271
1
            }
272
1
            // Unwrap should be safe due to parent `.is_some()` check
273
1
            #[allow(clippy::unwrap_used)]
274
1
            match rusqlite::Connection::open(self.sql_path.as_ref().unwrap()) {
275
1
                Ok(conn) => {
276
1
                    self.conn = conn;
277
1
                }
278
                Err(e) => {
279
                    let _ignore = lf.unlock();
280
                    return Err(e.into());
281
                }
282
            }
283
1
        }
284
2
        Ok(true)
285
2
    }
286
3
    fn expire_all(&mut self, expiration: &ExpirationConfig) -> Result<()> {
287
3
        let tx = self.conn.transaction()?;
288
3
        let expired_blobs: Vec<String> = {
289
3
            let mut stmt = tx.prepare(FIND_EXPIRED_EXTDOCS)?;
290
3
            let names = stmt
291
3
                .query_map([], |row| row.get::<_, String>(0))?
292
3
                .filter_map(std::result::Result::ok)
293
3
                .collect();
294
3
            names
295
3
        };
296
3

            
297
3
        let now = OffsetDateTime::now_utc();
298
3
        tx.execute(DROP_OLD_EXTDOCS, [])?;
299
3
        tx.execute(DROP_OLD_MICRODESCS, [now - expiration.microdescs])?;
300
3
        tx.execute(DROP_OLD_AUTHCERTS, [now - expiration.authcerts])?;
301
3
        tx.execute(DROP_OLD_CONSENSUSES, [now - expiration.consensuses])?;
302
3
        tx.execute(DROP_OLD_ROUTERDESCS, [now - expiration.router_descs])?;
303
3
        tx.commit()?;
304
4
        for name in expired_blobs {
305
1
            let fname = self.blob_fname(name);
306
1
            if let Ok(fname) = fname {
307
1
                let _ignore = std::fs::remove_file(fname);
308
1
            }
309
        }
310
3
        Ok(())
311
3
    }
312

            
313
8
    fn latest_consensus(
314
8
        &self,
315
8
        flavor: ConsensusFlavor,
316
8
        pending: Option<bool>,
317
8
    ) -> Result<Option<InputString>> {
318
8
        trace!(?flavor, ?pending, "Loading latest consensus from cache");
319
8
        let rv: Option<(OffsetDateTime, OffsetDateTime, String)> = match pending {
320
6
            None => self
321
6
                .conn
322
6
                .query_row(FIND_CONSENSUS, params![flavor.name()], |row| row.try_into())
323
6
                .optional()?,
324
2
            Some(pending_val) => self
325
2
                .conn
326
2
                .query_row(
327
2
                    FIND_CONSENSUS_P,
328
2
                    params![pending_val, flavor.name()],
329
2
                    |row| row.try_into(),
330
2
                )
331
2
                .optional()?,
332
        };
333

            
334
8
        if let Some((_va, _vu, filename)) = rv {
335
5
            self.read_blob(filename).map(Option::Some)
336
        } else {
337
3
            Ok(None)
338
        }
339
8
    }
340
7
    fn latest_consensus_meta(&self, flavor: ConsensusFlavor) -> Result<Option<ConsensusMeta>> {
341
7
        let mut stmt = self.conn.prepare(FIND_LATEST_CONSENSUS_META)?;
342
7
        let mut rows = stmt.query(params![flavor.name()])?;
343
7
        if let Some(row) = rows.next()? {
344
3
            Ok(Some(cmeta_from_row(row)?))
345
        } else {
346
4
            Ok(None)
347
        }
348
7
    }
349
2
    fn consensus_by_meta(&self, cmeta: &ConsensusMeta) -> Result<InputString> {
350
1
        if let Some((text, _)) =
351
2
            self.consensus_by_sha3_digest_of_signed_part(cmeta.sha3_256_of_signed())?
352
        {
353
1
            Ok(text)
354
        } else {
355
1
            Err(Error::CacheCorruption(
356
1
                "couldn't find a consensus we thought we had.",
357
1
            ))
358
        }
359
2
    }
360
8
    fn consensus_by_sha3_digest_of_signed_part(
361
8
        &self,
362
8
        d: &[u8; 32],
363
8
    ) -> Result<Option<(InputString, ConsensusMeta)>> {
364
8
        let digest = hex::encode(d);
365
8
        let mut stmt = self
366
8
            .conn
367
8
            .prepare(FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED)?;
368
8
        let mut rows = stmt.query(params![digest])?;
369
8
        if let Some(row) = rows.next()? {
370
5
            let meta = cmeta_from_row(row)?;
371
5
            let fname: String = row.get(5)?;
372
5
            let text = self.read_blob(&fname)?;
373
5
            Ok(Some((text, meta)))
374
        } else {
375
3
            Ok(None)
376
        }
377
8
    }
378
5
    fn store_consensus(
379
5
        &mut self,
380
5
        cmeta: &ConsensusMeta,
381
5
        flavor: ConsensusFlavor,
382
5
        pending: bool,
383
5
        contents: &str,
384
5
    ) -> Result<()> {
385
5
        let lifetime = cmeta.lifetime();
386
5
        let sha3_of_signed = cmeta.sha3_256_of_signed();
387
5
        let sha3_of_whole = cmeta.sha3_256_of_whole();
388
5
        let valid_after: OffsetDateTime = lifetime.valid_after().into();
389
5
        let fresh_until: OffsetDateTime = lifetime.fresh_until().into();
390
5
        let valid_until: OffsetDateTime = lifetime.valid_until().into();
391
5

            
392
5
        /// How long to keep a consensus around after it has expired
393
5
        const CONSENSUS_LIFETIME: time::Duration = time::Duration::days(4);
394
5

            
395
5
        // After a few days have passed, a consensus is no good for
396
5
        // anything at all, not even diffs.
397
5
        let expires = valid_until + CONSENSUS_LIFETIME;
398
5

            
399
5
        let doctype = format!("con:{}", flavor.name());
400

            
401
5
        let h = self.save_blob_internal(
402
5
            contents.as_bytes(),
403
5
            &doctype,
404
5
            "sha3-256",
405
5
            &sha3_of_whole[..],
406
5
            expires,
407
5
        )?;
408
5
        h.tx.execute(
409
5
            INSERT_CONSENSUS,
410
5
            params![
411
5
                valid_after,
412
5
                fresh_until,
413
5
                valid_until,
414
5
                flavor.name(),
415
5
                pending,
416
5
                hex::encode(&sha3_of_signed),
417
5
                h.digeststr
418
5
            ],
419
5
        )?;
420
5
        h.tx.commit()?;
421
5
        h.unlinker.forget();
422
5
        Ok(())
423
5
    }
424
2
    fn mark_consensus_usable(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
425
2
        let d = hex::encode(cmeta.sha3_256_of_whole());
426
2
        let digest = format!("sha3-256-{}", d);
427

            
428
2
        let tx = self.conn.transaction()?;
429
2
        let n = tx.execute(MARK_CONSENSUS_NON_PENDING, params![digest])?;
430
2
        trace!("Marked {} consensuses usable", n);
431
2
        tx.commit()?;
432

            
433
2
        Ok(())
434
2
    }
435
1
    fn delete_consensus(&mut self, cmeta: &ConsensusMeta) -> Result<()> {
436
1
        let d = hex::encode(cmeta.sha3_256_of_whole());
437
1
        let digest = format!("sha3-256-{}", d);
438

            
439
        // TODO: We should probably remove the blob as well, but for now
440
        // this is enough.
441
1
        let tx = self.conn.transaction()?;
442
1
        tx.execute(REMOVE_CONSENSUS, params![digest])?;
443
1
        tx.commit()?;
444

            
445
1
        Ok(())
446
1
    }
447

            
448
4
    fn authcerts(&self, certs: &[AuthCertKeyIds]) -> Result<HashMap<AuthCertKeyIds, String>> {
449
4
        let mut result = HashMap::new();
450
        // TODO(nickm): Do I need to get a transaction here for performance?
451
4
        let mut stmt = self.conn.prepare(FIND_AUTHCERT)?;
452

            
453
9
        for ids in certs {
454
5
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
455
5
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
456
5
            if let Some(contents) = stmt
457
5
                .query_row(params![id_digest, sk_digest], |row| row.get::<_, String>(0))
458
5
                .optional()?
459
3
            {
460
3
                result.insert(*ids, contents);
461
3
            }
462
        }
463

            
464
4
        Ok(result)
465
4
    }
466
3
    fn store_authcerts(&mut self, certs: &[(AuthCertMeta, &str)]) -> Result<()> {
467
3
        let tx = self.conn.transaction()?;
468
3
        let mut stmt = tx.prepare(INSERT_AUTHCERT)?;
469
7
        for (meta, content) in certs {
470
4
            let ids = meta.key_ids();
471
4
            let id_digest = hex::encode(ids.id_fingerprint.as_bytes());
472
4
            let sk_digest = hex::encode(ids.sk_fingerprint.as_bytes());
473
4
            let published: OffsetDateTime = meta.published().into();
474
4
            let expires: OffsetDateTime = meta.expires().into();
475
4
            stmt.execute(params![id_digest, sk_digest, published, expires, content])?;
476
        }
477
3
        stmt.finalize()?;
478
3
        tx.commit()?;
479
3
        Ok(())
480
3
    }
481

            
482
12
    fn microdescs(&self, digests: &[MdDigest]) -> Result<HashMap<MdDigest, String>> {
483
12
        let mut result = HashMap::new();
484
12
        let mut stmt = self.conn.prepare(FIND_MD)?;
485

            
486
        // TODO(nickm): Should I speed this up with a transaction, or
487
        // does it not matter for queries?
488
41
        for md_digest in digests {
489
29
            let h_digest = hex::encode(md_digest);
490
29
            if let Some(contents) = stmt
491
29
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
492
29
                .optional()?
493
22
            {
494
22
                result.insert(*md_digest, contents);
495
22
            }
496
        }
497

            
498
12
        Ok(result)
499
12
    }
500
11
    fn store_microdescs(&mut self, digests: &[(&str, &MdDigest)], when: SystemTime) -> Result<()> {
501
11
        let when: OffsetDateTime = when.into();
502

            
503
11
        let tx = self.conn.transaction()?;
504
11
        let mut stmt = tx.prepare(INSERT_MD)?;
505

            
506
28
        for (content, md_digest) in digests {
507
17
            let h_digest = hex::encode(md_digest);
508
17
            stmt.execute(params![h_digest, when, content])?;
509
        }
510
11
        stmt.finalize()?;
511
11
        tx.commit()?;
512
11
        Ok(())
513
11
    }
514
2
    fn update_microdescs_listed(&mut self, digests: &[MdDigest], when: SystemTime) -> Result<()> {
515
2
        let tx = self.conn.transaction()?;
516
2
        let mut stmt = tx.prepare(UPDATE_MD_LISTED)?;
517
2
        let when: OffsetDateTime = when.into();
518

            
519
4
        for md_digest in digests {
520
2
            let h_digest = hex::encode(md_digest);
521
2
            stmt.execute(params![when, h_digest])?;
522
        }
523

            
524
2
        stmt.finalize()?;
525
2
        tx.commit()?;
526
2
        Ok(())
527
2
    }
528

            
529
    #[cfg(feature = "routerdesc")]
530
3
    fn routerdescs(&self, digests: &[RdDigest]) -> Result<HashMap<RdDigest, String>> {
531
3
        let mut result = HashMap::new();
532
3
        let mut stmt = self.conn.prepare(FIND_RD)?;
533

            
534
        // TODO(nickm): Should I speed this up with a transaction, or
535
        // does it not matter for queries?
536
10
        for rd_digest in digests {
537
7
            let h_digest = hex::encode(rd_digest);
538
7
            if let Some(contents) = stmt
539
7
                .query_row(params![h_digest], |row| row.get::<_, String>(0))
540
7
                .optional()?
541
4
            {
542
4
                result.insert(*rd_digest, contents);
543
4
            }
544
        }
545

            
546
3
        Ok(result)
547
3
    }
548
    #[cfg(feature = "routerdesc")]
549
2
    fn store_routerdescs(&mut self, digests: &[(&str, SystemTime, &RdDigest)]) -> Result<()> {
550
2
        let tx = self.conn.transaction()?;
551
2
        let mut stmt = tx.prepare(INSERT_RD)?;
552

            
553
7
        for (content, when, rd_digest) in digests {
554
5
            let when: OffsetDateTime = (*when).into();
555
5
            let h_digest = hex::encode(rd_digest);
556
5
            stmt.execute(params![h_digest, when, content])?;
557
        }
558
2
        stmt.finalize()?;
559
2
        tx.commit()?;
560
2
        Ok(())
561
2
    }
562
}
563

            
564
/// Handle to a blob that we have saved to disk but not yet committed to
565
/// the database.
566
struct SavedBlobHandle<'a> {
567
    /// Transaction we're using to add the blob to the ExtDocs table.
568
    tx: Transaction<'a>,
569
    /// Filename for the file, with respect to the the blob directory.
570
    #[allow(unused)]
571
    fname: String,
572
    /// Declared digest string for this blob. Of the format
573
    /// "digesttype-hexstr".
574
    digeststr: String,
575
    /// An 'unlinker' for the blob file.
576
    unlinker: Unlinker,
577
}
578

            
579
/// Handle to a file which we might have to delete.
580
///
581
/// When this handle is dropped, the file gets deleted, unless you have
582
/// first called [`Unlinker::forget`].
583
struct Unlinker {
584
    /// The location of the file to remove, or None if we shouldn't
585
    /// remove it.
586
    p: Option<PathBuf>,
587
}
588
impl Unlinker {
589
    /// Make a new Unlinker for a given filename.
590
7
    fn new<P: AsRef<Path>>(p: P) -> Self {
591
7
        Unlinker {
592
7
            p: Some(p.as_ref().to_path_buf()),
593
7
        }
594
7
    }
595
    /// Forget about this unlinker, so that the corresponding file won't
596
    /// get dropped.
597
7
    fn forget(mut self) {
598
7
        self.p = None;
599
7
    }
600
}
601
impl Drop for Unlinker {
602
    fn drop(&mut self) {
603
7
        if let Some(p) = self.p.take() {
604
            let _ignore_err = std::fs::remove_file(p);
605
7
        }
606
7
    }
607
}
608

            
609
/// Convert a hexadecimal sha3-256 digest from the database into an array.
610
16
fn digest_from_hex(s: &str) -> Result<[u8; 32]> {
611
16
    let mut bytes = [0_u8; 32];
612
16
    hex::decode_to_slice(s, &mut bytes[..]).map_err(Error::BadHexInCache)?;
613
16
    Ok(bytes)
614
16
}
615

            
616
/// Convert a hexadecimal sha3-256 "digest string" as used in the
617
/// digest column from the database into an array.
618
fn digest_from_dstr(s: &str) -> Result<[u8; 32]> {
619
8
    if let Some(stripped) = s.strip_prefix("sha3-256-") {
620
8
        digest_from_hex(stripped)
621
    } else {
622
        Err(Error::CacheCorruption("Invalid digest in database"))
623
    }
624
8
}
625

            
626
/// Create a ConsensusMeta from a `Row` returned by one of
627
/// `FIND_LATEST_CONSENSUS_META` or `FIND_CONSENSUS_AND_META_BY_DIGEST`.
628
8
fn cmeta_from_row(row: &rusqlite::Row<'_>) -> Result<ConsensusMeta> {
629
8
    let va: OffsetDateTime = row.get(0)?;
630
8
    let fu: OffsetDateTime = row.get(1)?;
631
8
    let vu: OffsetDateTime = row.get(2)?;
632
8
    let d_signed: String = row.get(3)?;
633
8
    let d_all: String = row.get(4)?;
634
8
    let lifetime = Lifetime::new(va.into(), fu.into(), vu.into())
635
8
        .map_err(|_| Error::CacheCorruption("inconsistent lifetime in database"))?;
636
    Ok(ConsensusMeta::new(
637
8
        lifetime,
638
8
        digest_from_hex(&d_signed)?,
639
8
        digest_from_dstr(&d_all)?,
640
    ))
641
8
}
642

            
643
/// Version number used for this version of the arti cache schema.
644
const SCHEMA_VERSION: u32 = 1;
645

            
646
/// Set up the tables for the arti cache schema in a sqlite database.
647
const INSTALL_V0_SCHEMA: &str = "
648
  -- Helps us version the schema.  The schema here corresponds to a
649
  -- version number called 'version', and it should be readable by
650
  -- anybody who is compliant with versions of at least 'readable_by'.
651
  CREATE TABLE TorSchemaMeta (
652
     name TEXT NOT NULL PRIMARY KEY,
653
     version INTEGER NOT NULL,
654
     readable_by INTEGER NOT NULL
655
  );
656

            
657
  INSERT INTO TorSchemaMeta (name, version, readable_by) VALUES ( 'TorDirStorage', 0, 0 );
658

            
659
  -- Keeps track of external blobs on disk.
660
  CREATE TABLE ExtDocs (
661
    -- Records a digest of the file contents, in the form 'dtype-hexstr'
662
    digest TEXT PRIMARY KEY NOT NULL,
663
    -- When was this file created?
664
    created DATE NOT NULL,
665
    -- After what time will this file definitely be useless?
666
    expires DATE NOT NULL,
667
    -- What is the type of this file? Currently supported are 'con:<flavor>'.
668
    type TEXT NOT NULL,
669
    -- Filename for this file within our blob directory.
670
    filename TEXT NOT NULL
671
  );
672

            
673
  -- All the microdescriptors we know about.
674
  CREATE TABLE Microdescs (
675
    sha256_digest TEXT PRIMARY KEY NOT NULL,
676
    last_listed DATE NOT NULL,
677
    contents BLOB NOT NULL
678
  );
679

            
680
  -- All the authority certificates we know.
681
  CREATE TABLE Authcerts (
682
    id_digest TEXT NOT NULL,
683
    sk_digest TEXT NOT NULL,
684
    published DATE NOT NULL,
685
    expires DATE NOT NULL,
686
    contents BLOB NOT NULL,
687
    PRIMARY KEY (id_digest, sk_digest)
688
  );
689

            
690
  -- All the consensuses we're storing.
691
  CREATE TABLE Consensuses (
692
    valid_after DATE NOT NULL,
693
    fresh_until DATE NOT NULL,
694
    valid_until DATE NOT NULL,
695
    flavor TEXT NOT NULL,
696
    pending BOOLEAN NOT NULL,
697
    sha3_of_signed_part TEXT NOT NULL,
698
    digest TEXT NOT NULL,
699
    FOREIGN KEY (digest) REFERENCES ExtDocs (digest) ON DELETE CASCADE
700
  );
701
  CREATE INDEX Consensuses_vu on CONSENSUSES(valid_until);
702

            
703
";
704

            
705
/// Update the database schema from version 0 to version 1.
706
const UPDATE_SCHEMA_V0_TO_V1: &str = "
707
  CREATE TABLE RouterDescs (
708
    sha1_digest TEXT PRIMARY KEY NOT NULL,
709
    published DATE NOT NULL,
710
    contents BLOB NOT NULL
711
  );
712

            
713
  UPDATE TorSchemaMeta SET version=1 WHERE version<1;
714
";
715

            
716
/// Query: find the latest-expiring microdesc consensus with a given
717
/// pending status.
718
const FIND_CONSENSUS_P: &str = "
719
  SELECT valid_after, valid_until, filename
720
  FROM Consensuses
721
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
722
  WHERE pending = ? AND flavor = ?
723
  ORDER BY valid_until DESC
724
  LIMIT 1;
725
";
726

            
727
/// Query: find the latest-expiring microdesc consensus, regardless of
728
/// pending status.
729
const FIND_CONSENSUS: &str = "
730
  SELECT valid_after, valid_until, filename
731
  FROM Consensuses
732
  INNER JOIN ExtDocs ON ExtDocs.digest = Consensuses.digest
733
  WHERE flavor = ?
734
  ORDER BY valid_until DESC
735
  LIMIT 1;
736
";
737

            
738
/// Query: Find the valid-after time for the latest-expiring
739
/// non-pending consensus of a given flavor.
740
const FIND_LATEST_CONSENSUS_META: &str = "
741
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, digest
742
  FROM Consensuses
743
  WHERE pending = 0 AND flavor = ?
744
  ORDER BY valid_until DESC
745
  LIMIT 1;
746
";
747

            
748
/// Look up a consensus by its digest-of-signed-part string.
749
const FIND_CONSENSUS_AND_META_BY_DIGEST_OF_SIGNED: &str = "
750
  SELECT valid_after, fresh_until, valid_until, sha3_of_signed_part, Consensuses.digest, filename
751
  FROM Consensuses
752
  INNER JOIN ExtDocs on ExtDocs.digest = Consensuses.digest
753
  WHERE Consensuses.sha3_of_signed_part = ?
754
  LIMIT 1;
755
";
756

            
757
/// Query: Update the consensus whose digest field is 'digest' to call it
758
/// no longer pending.
759
const MARK_CONSENSUS_NON_PENDING: &str = "
760
  UPDATE Consensuses
761
  SET pending = 0
762
  WHERE digest = ?;
763
";
764

            
765
/// Query: Remove the consensus with a given digest field.
766
const REMOVE_CONSENSUS: &str = "
767
  DELETE FROM Consensuses
768
  WHERE digest = ?;
769
";
770

            
771
/// Query: Find the authority certificate with given key digests.
772
const FIND_AUTHCERT: &str = "
773
  SELECT contents FROM AuthCerts WHERE id_digest = ? AND sk_digest = ?;
774
";
775

            
776
/// Query: find the microdescriptor with a given hex-encoded sha256 digest
777
const FIND_MD: &str = "
778
  SELECT contents
779
  FROM Microdescs
780
  WHERE sha256_digest = ?
781
";
782

            
783
/// Query: find the router descriptors with a given hex-encoded sha1 digest
784
#[cfg(feature = "routerdesc")]
785
const FIND_RD: &str = "
786
  SELECT contents
787
  FROM RouterDescs
788
  WHERE sha1_digest = ?
789
";
790

            
791
/// Query: find every ExtDocs member that has expired.
792
const FIND_EXPIRED_EXTDOCS: &str = "
793
  SELECT filename FROM Extdocs where expires < datetime('now');
794
";
795

            
796
/// Query: Add a new entry to ExtDocs.
797
const INSERT_EXTDOC: &str = "
798
  INSERT OR REPLACE INTO ExtDocs ( digest, created, expires, type, filename )
799
  VALUES ( ?, datetime('now'), ?, ?, ? );
800
";
801

            
802
/// Query: Add a new consensus.
803
const INSERT_CONSENSUS: &str = "
804
  INSERT OR REPLACE INTO Consensuses
805
    ( valid_after, fresh_until, valid_until, flavor, pending, sha3_of_signed_part, digest )
806
  VALUES ( ?, ?, ?, ?, ?, ?, ? );
807
";
808

            
809
/// Query: Add a new AuthCert
810
const INSERT_AUTHCERT: &str = "
811
  INSERT OR REPLACE INTO Authcerts
812
    ( id_digest, sk_digest, published, expires, contents)
813
  VALUES ( ?, ?, ?, ?, ? );
814
";
815

            
816
/// Query: Add a new microdescriptor
817
const INSERT_MD: &str = "
818
  INSERT OR REPLACE INTO Microdescs ( sha256_digest, last_listed, contents )
819
  VALUES ( ?, ?, ? );
820
";
821

            
822
/// Query: Add a new router descriptor
823
#[allow(unused)]
824
#[cfg(feature = "routerdesc")]
825
const INSERT_RD: &str = "
826
  INSERT OR REPLACE INTO RouterDescs ( sha1_digest, published, contents )
827
  VALUES ( ?, ?, ? );
828
";
829

            
830
/// Query: Change the time when a given microdescriptor was last listed.
831
const UPDATE_MD_LISTED: &str = "
832
  UPDATE Microdescs
833
  SET last_listed = max(last_listed, ?)
834
  WHERE sha256_digest = ?;
835
";
836

            
837
/// Query: Discard every expired extdoc.
838
///
839
/// External documents aren't exposed through [`Store`].
840
const DROP_OLD_EXTDOCS: &str = "DELETE FROM ExtDocs WHERE expires < datetime('now');";
841

            
842
/// Query: Discard every router descriptor that hasn't been listed for 3
843
/// months.
844
// TODO: Choose a more realistic time.
845
const DROP_OLD_ROUTERDESCS: &str = "DELETE FROM RouterDescs WHERE published < ?;";
846
/// Query: Discard every microdescriptor that hasn't been listed for 3 months.
847
// TODO: Choose a more realistic time.
848
const DROP_OLD_MICRODESCS: &str = "DELETE FROM Microdescs WHERE last_listed < ?;";
849
/// Query: Discard every expired authority certificate.
850
const DROP_OLD_AUTHCERTS: &str = "DELETE FROM Authcerts WHERE expires < ?;";
851
/// Query: Discard every consensus that's been expired for at least
852
/// two days.
853
const DROP_OLD_CONSENSUSES: &str = "DELETE FROM Consensuses WHERE valid_until < ?;";
854

            
855
#[cfg(test)]
856
mod test {
857
    #![allow(clippy::unwrap_used)]
858
    use super::*;
859
    use crate::storage::EXPIRATION_DEFAULTS;
860
    use hex_literal::hex;
861
    use tempfile::{tempdir, TempDir};
862
    use time::ext::NumericalDuration;
863

            
864
    fn new_empty() -> Result<(TempDir, SqliteStore)> {
865
        let tmp_dir = tempdir().unwrap();
866
        let sql_path = tmp_dir.path().join("db.sql");
867
        let conn = rusqlite::Connection::open(&sql_path)?;
868
        let store = SqliteStore::from_conn(conn, &tmp_dir)?;
869

            
870
        Ok((tmp_dir, store))
871
    }
872

            
873
    #[test]
874
    fn init() -> Result<()> {
875
        let tmp_dir = tempdir().unwrap();
876
        let sql_path = tmp_dir.path().join("db.sql");
877
        // Initial setup: everything should work.
878
        {
879
            let conn = rusqlite::Connection::open(&sql_path)?;
880
            let _store = SqliteStore::from_conn(conn, &tmp_dir)?;
881
        }
882
        // Second setup: shouldn't need to upgrade.
883
        {
884
            let conn = rusqlite::Connection::open(&sql_path)?;
885
            let _store = SqliteStore::from_conn(conn, &tmp_dir)?;
886
        }
887
        // Third setup: shouldn't need to upgrade.
888
        {
889
            let conn = rusqlite::Connection::open(&sql_path)?;
890
            conn.execute_batch("UPDATE TorSchemaMeta SET version = 9002;")?;
891
            let _store = SqliteStore::from_conn(conn, &tmp_dir)?;
892
        }
893
        // Fourth: this says we can't read it, so we'll get an error.
894
        {
895
            let conn = rusqlite::Connection::open(&sql_path)?;
896
            conn.execute_batch("UPDATE TorSchemaMeta SET readable_by = 9001;")?;
897
            let val = SqliteStore::from_conn(conn, &tmp_dir);
898
            assert!(val.is_err());
899
        }
900
        Ok(())
901
    }
902

            
903
    #[test]
904
    fn bad_blob_fnames() -> Result<()> {
905
        let (_tmp_dir, store) = new_empty()?;
906

            
907
        assert!(store.blob_fname("abcd").is_ok());
908
        assert!(store.blob_fname("abcd..").is_ok());
909
        assert!(store.blob_fname("..abcd..").is_ok());
910
        assert!(store.blob_fname(".abcd").is_ok());
911

            
912
        assert!(store.blob_fname(".").is_err());
913
        assert!(store.blob_fname("..").is_err());
914
        assert!(store.blob_fname("../abcd").is_err());
915
        assert!(store.blob_fname("/abcd").is_err());
916

            
917
        Ok(())
918
    }
919

            
920
    #[test]
921
    fn blobs() -> Result<()> {
922
        let (tmp_dir, mut store) = new_empty()?;
923

            
924
        let now = OffsetDateTime::now_utc();
925
        let one_week = 1.weeks();
926

            
927
        let fname1 = store.save_blob(
928
            b"Hello world",
929
            "greeting",
930
            "sha1",
931
            &hex!("7b502c3a1f48c8609ae212cdfb639dee39673f5e"),
932
            now + one_week,
933
        )?;
934

            
935
        let fname2 = store.save_blob(
936
            b"Goodbye, dear friends",
937
            "greeting",
938
            "sha1",
939
            &hex!("2149c2a7dbf5be2bb36fb3c5080d0fb14cb3355c"),
940
            now - one_week,
941
        )?;
942

            
943
        assert_eq!(
944
            fname1,
945
            "greeting_sha1-7b502c3a1f48c8609ae212cdfb639dee39673f5e"
946
        );
947
        assert_eq!(store.blob_fname(&fname1)?, tmp_dir.path().join(&fname1));
948
        assert_eq!(
949
            &std::fs::read(store.blob_fname(&fname1)?)?[..],
950
            b"Hello world"
951
        );
952
        assert_eq!(
953
            &std::fs::read(store.blob_fname(&fname2)?)?[..],
954
            b"Goodbye, dear friends"
955
        );
956

            
957
        let n: u32 = store
958
            .conn
959
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
960
        assert_eq!(n, 2);
961

            
962
        let blob = store.read_blob(&fname2)?;
963
        assert_eq!(blob.as_str().unwrap(), "Goodbye, dear friends");
964

            
965
        // Now expire: the second file should go away.
966
        store.expire_all(&EXPIRATION_DEFAULTS)?;
967
        assert_eq!(
968
            &std::fs::read(store.blob_fname(&fname1)?)?[..],
969
            b"Hello world"
970
        );
971
        assert!(std::fs::read(store.blob_fname(&fname2)?).is_err());
972
        let n: u32 = store
973
            .conn
974
            .query_row("SELECT COUNT(filename) FROM ExtDocs", [], |row| row.get(0))?;
975
        assert_eq!(n, 1);
976

            
977
        Ok(())
978
    }
979

            
980
    #[test]
981
    fn consensus() -> Result<()> {
982
        use tor_netdoc::doc::netstatus;
983

            
984
        let (_tmp_dir, mut store) = new_empty()?;
985
        let now = OffsetDateTime::now_utc();
986
        let one_hour = 1.hours();
987

            
988
        assert_eq!(
989
            store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
990
            None
991
        );
992

            
993
        let cmeta = ConsensusMeta::new(
994
            netstatus::Lifetime::new(
995
                now.into(),
996
                (now + one_hour).into(),
997
                (now + one_hour * 2).into(),
998
            )
999
            .unwrap(),
            [0xAB; 32],
            [0xBC; 32],
        );

            
        store.store_consensus(
            &cmeta,
            ConsensusFlavor::Microdesc,
            true,
            "Pretend this is a consensus",
        )?;

            
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                None
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store.latest_consensus(ConsensusFlavor::Microdesc, Some(false))?;
            assert!(consensus.is_none());
        }

            
        store.mark_consensus_usable(&cmeta)?;

            
        {
            assert_eq!(
                store.latest_consensus_time(ConsensusFlavor::Microdesc)?,
                now.into()
            );
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, None)?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
            let consensus = store
                .latest_consensus(ConsensusFlavor::Microdesc, Some(false))?
                .unwrap();
            assert_eq!(consensus.as_str()?, "Pretend this is a consensus");
        }

            
        {
            let consensus_text = store.consensus_by_meta(&cmeta)?;
            assert_eq!(consensus_text.as_str()?, "Pretend this is a consensus");

            
            let (is, _cmeta2) = store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .unwrap();
            assert_eq!(is.as_str()?, "Pretend this is a consensus");

            
            let cmeta3 = ConsensusMeta::new(
                netstatus::Lifetime::new(
                    now.into(),
                    (now + one_hour).into(),
                    (now + one_hour * 2).into(),
                )
                .unwrap(),
                [0x99; 32],
                [0x99; 32],
            );
            assert!(store.consensus_by_meta(&cmeta3).is_err());

            
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0x99; 32])?
                .is_none());
        }

            
        {
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .is_some());
            store.delete_consensus(&cmeta)?;
            assert!(store
                .consensus_by_sha3_digest_of_signed_part(&[0xAB; 32])?
                .is_none());
        }

            
        Ok(())
    }

            
    #[test]
    fn authcerts() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;
        let now = OffsetDateTime::now_utc();
        let one_hour = 1.hours();

            
        let keyids = AuthCertKeyIds {
            id_fingerprint: [3; 20].into(),
            sk_fingerprint: [4; 20].into(),
        };
        let keyids2 = AuthCertKeyIds {
            id_fingerprint: [4; 20].into(),
            sk_fingerprint: [3; 20].into(),
        };

            
        let m1 = AuthCertMeta::new(keyids, now.into(), (now + one_hour * 24).into());

            
        store.store_authcerts(&[(m1, "Pretend this is a cert")])?;

            
        let certs = store.authcerts(&[keyids, keyids2])?;
        assert_eq!(certs.len(), 1);
        assert_eq!(certs.get(&keyids).unwrap(), "Pretend this is a cert");

            
        Ok(())
    }

            
    #[test]
    fn microdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;

            
        let now = OffsetDateTime::now_utc();
        let one_day = 1.days();

            
        let d1 = [5_u8; 32];
        let d2 = [7; 32];
        let d3 = [42; 32];
        let d4 = [99; 32];

            
        let long_ago: OffsetDateTime = now - one_day * 100;
        store.store_microdescs(
            &[
                ("Fake micro 1", &d1),
                ("Fake micro 2", &d2),
                ("Fake micro 3", &d3),
            ],
            long_ago.into(),
        )?;

            
        store.update_microdescs_listed(&[d2], now.into())?;

            
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 2);
        assert_eq!(mds.get(&d1), None);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");
        assert_eq!(mds.get(&d3).unwrap(), "Fake micro 3");
        assert_eq!(mds.get(&d4), None);

            
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let mds = store.microdescs(&[d2, d3, d4])?;
        assert_eq!(mds.len(), 1);
        assert_eq!(mds.get(&d2).unwrap(), "Fake micro 2");

            
        Ok(())
    }

            
    #[test]
    #[cfg(feature = "routerdesc")]
    fn routerdescs() -> Result<()> {
        let (_tmp_dir, mut store) = new_empty()?;

            
        let now = OffsetDateTime::now_utc();
        let one_day = 1.days();
        let long_ago: OffsetDateTime = now - one_day * 100;
        let recently = now - one_day;

            
        let d1 = [5_u8; 20];
        let d2 = [7; 20];
        let d3 = [42; 20];
        let d4 = [99; 20];

            
        store.store_routerdescs(&[
            ("Fake routerdesc 1", long_ago.into(), &d1),
            ("Fake routerdesc 2", recently.into(), &d2),
            ("Fake routerdesc 3", long_ago.into(), &d3),
        ])?;

            
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 2);
        assert_eq!(rds.get(&d1), None);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");
        assert_eq!(rds.get(&d3).unwrap(), "Fake routerdesc 3");
        assert_eq!(rds.get(&d4), None);

            
        // Now we'll expire.  that should drop everything but d2.
        store.expire_all(&EXPIRATION_DEFAULTS)?;
        let rds = store.routerdescs(&[d2, d3, d4])?;
        assert_eq!(rds.len(), 1);
        assert_eq!(rds.get(&d2).unwrap(), "Fake routerdesc 2");

            
        Ok(())
    }

            
    #[test]
    fn from_path_rw() -> Result<()> {
        let tmp = tempdir().unwrap();

            
        // Nothing there: can't open read-only
        let r = SqliteStore::from_path(tmp.path(), true);
        assert!(r.is_err());
        assert!(!tmp.path().join("dir_blobs").exists());

            
        // Opening it read-write will crate the files
        {
            let mut store = SqliteStore::from_path(tmp.path(), false)?;
            assert!(tmp.path().join("dir_blobs").is_dir());
            assert!(store.lockfile.is_some());
            assert!(!store.is_readonly());
            assert!(store.upgrade_to_readwrite()?); // no-op.
        }

            
        // At this point, we can successfully make a read-only connection.
        {
            let mut store2 = SqliteStore::from_path(tmp.path(), true)?;
            assert!(store2.is_readonly());

            
            // Nobody else is locking this, so we can upgrade.
            assert!(store2.upgrade_to_readwrite()?); // no-op.
            assert!(!store2.is_readonly());
        }
        Ok(())
    }
}