forked from Overtorment/Cashier-BTC
-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker2.js
95 lines (81 loc) · 3.26 KB
/
worker2.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* Cashier-BTC
* -----------
* Self-hosted bitcoin payment gateway
*
* https://github.com/Overtorment/Cashier-BTC
*
**/
/**
* worker iterates through all paid addresses (which are actually hot wallets),
* and sweeps (forwards funds) to seller final (aggregational) wallet
*
*/
let storage = require('./models/storage')
let config = require('./config')
let blockchain = require('./models/blockchain')
let signer = require('./models/signer')
require('./smoke-test')
const { createLogger, format, transports } = require('winston')
const { combine, timestamp, printf } = format
const myFormat = printf(info => {
return `${info.timestamp} ${info.level}: ${info.message}`
})
const logger = createLogger({
level: config.logging_level,
format: combine(
timestamp(),
myFormat
), // winston.format.json(),
transports: [
//
// - Write to all logs with level `info` and below to `combined.log`
// - Write all logs error (and below) to `error.log`.
// or new transports.Console()
new transports.File({ filename: './logs/error.log', level: 'error' }),
new transports.File({ filename: './logs/combined.log' })
]
})
;(async () => {
while (1) {
logger.debug('worker2.js' + ' tick tock')
let wait = ms => new Promise(resolve => setTimeout(resolve, ms))
let job = await storage.getPaidAdressesNewerThanPromise(Date.now() - config.process_paid_for_period)
logger.info('worker2 js found ' + job.rows.length + ' records')
await processJob(job)
await wait(15000)
}
})()
async function processJob (rows) {
rows = rows || {}
rows.rows = rows.rows || []
for (const row of rows.rows) {
let json = row.doc
let received = await blockchain.getreceivedbyaddress(json.address, config.minimum_confirmation_required)
logger.info('worker2.js address: ' + json.address + ', expect: ' + json.btc_to_ask + ', confirmed:' + received[1].result + ', unconfirmed:' + received[0].result)
if (+received[1].result === +received[0].result && received[0].result > 0) { // balance is ok, need to transfer it
let seller = await storage.getSellerPromise(json.seller)
logger.info('worker2.js : transferring ' + received[1].result + ' BTC (minus fee) from ' + json.address + ' to seller ' + seller.seller + '(' + seller.address + ')')
let unspentOutputs = await blockchain.listunspent(json.address)
let createTx = signer.createTransaction
if (json.address[0] === '3') {
// assume source address is SegWit P2SH
// pretty safe to assume that since we generate those addresses
createTx = signer.createSegwitTransaction
}
let tx = createTx(unspentOutputs.result, seller.address, received[1].result, 0.0001, json.WIF) // received[1].result has the confirmed amount
logger.info('worker2.js broadcasting ' + tx)
let broadcastResult = await blockchain.broadcastTransaction(tx)
logger.info('worker2.js broadcast result: ' + JSON.stringify(broadcastResult))
json.processed = 'paid_and_sweeped'
json.sweep_result = json.sweep_result || {}
json.sweep_result[Date.now()] = {
'tx': tx,
'broadcast': broadcastResult
}
await storage.saveJobResultsPromise(json)
} else {
logger.warn('worker2.js: balance is not ok, skip')
}
}
}