diff --git a/.github/workflows/ci-matrix.yml b/.github/workflows/ci-matrix.yml index 5285afcbd..b444643e8 100644 --- a/.github/workflows/ci-matrix.yml +++ b/.github/workflows/ci-matrix.yml @@ -14,7 +14,7 @@ jobs: matrix: target: - { name: linux, os: ubuntu-22.04 } - - { name: macos, os: macos-13 } + - { name: macos, os: macos-latest } - { name: windows, os: windows-2022 } name: Build node on ${{ matrix.target.os }} diff --git a/masq_lib/src/constants.rs b/masq_lib/src/constants.rs index 20e332f4a..1be85f381 100644 --- a/masq_lib/src/constants.rs +++ b/masq_lib/src/constants.rs @@ -83,7 +83,8 @@ pub const VALUE_EXCEEDS_ALLOWED_LIMIT: u64 = ACCOUNTANT_PREFIX | 3; pub const MASQ_TOTAL_SUPPLY: u64 = 37_500_000; pub const DEFAULT_GAS_PRICE: u64 = 1; //TODO ?? Really -pub const DEFAULT_GAS_PRICE_MARGIN: u64 = 30; +pub const DEFAULT_GAS_PRICE_RETRY_PERCENTAGE: u64 = 30; +pub const DEFAULT_GAS_PRICE_RETRY_CONSTANT: u128 = 5_000; pub const DEFAULT_MAX_BLOCK_COUNT: u64 = 100_000; //chains @@ -142,7 +143,8 @@ mod tests { assert_eq!(CURRENT_LOGFILE_NAME, "MASQNode_rCURRENT.log"); assert_eq!(MASQ_PROMPT, "masq> "); assert_eq!(DEFAULT_GAS_PRICE, 1); - assert_eq!(DEFAULT_GAS_PRICE_MARGIN, 30); + assert_eq!(DEFAULT_GAS_PRICE_RETRY_PERCENTAGE, 30); + assert_eq!(DEFAULT_GAS_PRICE_RETRY_CONSTANT, 5_000); assert_eq!(WALLET_ADDRESS_LENGTH, 42); assert_eq!(MASQ_TOTAL_SUPPLY, 37_500_000); assert_eq!(WEIS_IN_GWEI, 1_000_000_000); diff --git a/multinode_integration_tests/tests/verify_bill_payment.rs b/multinode_integration_tests/tests/verify_bill_payment.rs index d421f82b2..e5fddc67f 100644 --- a/multinode_integration_tests/tests/verify_bill_payment.rs +++ b/multinode_integration_tests/tests/verify_bill_payment.rs @@ -465,7 +465,7 @@ fn verify_pending_payables() { } MASQNodeUtils::assert_node_wrote_log_containing( real_consuming_node.name(), - "Found 3 pending payables and 0 unfinalized failures to process", + "Found 3 pending payables and 0 suspected failures to process", Duration::from_secs(5), ); MASQNodeUtils::assert_node_wrote_log_containing( diff --git a/node/src/accountant/mod.rs b/node/src/accountant/mod.rs index 153158dd4..85bd9e017 100644 --- a/node/src/accountant/mod.rs +++ b/node/src/accountant/mod.rs @@ -30,7 +30,7 @@ use crate::accountant::scanners::pending_payable_scanner::utils::{ PendingPayableScanResult, TxHashByTable, }; use crate::accountant::scanners::scan_schedulers::{ - PayableSequenceScanner, ScanReschedulingAfterEarlyStop, ScanSchedulers, + ScanReschedulingAfterEarlyStop, ScanSchedulers, UnableToStartScanner, }; use crate::accountant::scanners::{Scanners, StartScanError}; use crate::blockchain::blockchain_bridge::{ @@ -312,11 +312,25 @@ impl Handler for Accountant { impl Handler for Accountant { type Result = (); - fn handle(&mut self, msg: ScanForReceivables, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: ScanForReceivables, _ctx: &mut Self::Context) -> Self::Result { // By now we know it is an automatic scan. The ReceivableScanner is independent of other // scanners and rescheduled regularly, just here. - self.handle_request_of_scan_for_receivable(msg.response_skeleton_opt); - self.scan_schedulers.receivable.schedule(ctx, &self.logger); + let scheduling_hint = self.handle_request_of_scan_for_receivable(msg.response_skeleton_opt); + + match scheduling_hint { + ScanReschedulingAfterEarlyStop::Schedule(other_scan_type) => unreachable!( + "Early stopped receivable scan was suggested to be followed up by the scan \ + for {:?}, which is not supported though", + other_scan_type + ), + ScanReschedulingAfterEarlyStop::DoNotSchedule => { + trace!( + self.logger, + "No early rescheduling, as the receivable scan did find results, or this \ + is the NullScanner" + ) + } + } } } @@ -348,8 +362,8 @@ impl Handler for Accountant { if let Some(node_to_ui_msg) = ui_msg_opt { info!( self.logger, - "Re-running the pending payable scan is recommended, as some \ - parts did not finish last time." + "Re-running the pending payable scan is recommended, as some parts \ + did not finish last time." ); self.ui_message_sub_opt .as_ref() @@ -401,13 +415,19 @@ impl Handler for Accountant { impl Handler for Accountant { type Result = (); - fn handle(&mut self, msg: ReceivedPayments, _ctx: &mut Self::Context) -> Self::Result { - if let Some(node_to_ui_msg) = self.scanners.finish_receivable_scan(msg, &self.logger) { - self.ui_message_sub_opt - .as_ref() - .expect("UIGateway is not bound") - .try_send(node_to_ui_msg) - .expect("UIGateway is dead"); + fn handle(&mut self, msg: ReceivedPayments, ctx: &mut Self::Context) -> Self::Result { + match self.scanners.finish_receivable_scan(msg, &self.logger) { + None => self.scan_schedulers.receivable.schedule(ctx, &self.logger), + Some(node_to_ui_msg) => { + self.ui_message_sub_opt + .as_ref() + .expect("UIGateway is not bound") + .try_send(node_to_ui_msg) + .expect("UIGateway is dead"); + // Externally triggered scans are not allowed to provoke an unwinding scan sequence + // with intervals. The only exception is the PendingPayableScanner that is always + // followed by the retry-payable scanner in a tight tandem. + } } } } @@ -417,27 +437,34 @@ impl Handler for Accountant { fn handle(&mut self, scan_error: ScanError, ctx: &mut Self::Context) -> Self::Result { error!(self.logger, "Received ScanError: {:?}", scan_error); + self.scanners .acknowledge_scan_error(&scan_error, &self.logger); match scan_error.response_skeleton_opt { - None => match scan_error.scan_type { - DetailedScanType::NewPayables => self - .scan_schedulers - .payable - .schedule_new_payable_scan(ctx, &self.logger), - DetailedScanType::RetryPayables => self - .scan_schedulers - .payable - .schedule_retry_payable_scan(ctx, None, &self.logger), - DetailedScanType::PendingPayables => self - .scan_schedulers - .pending_payable - .schedule(ctx, &self.logger), - DetailedScanType::Receivables => { - self.scan_schedulers.receivable.schedule(ctx, &self.logger) + None => { + debug!( + self.logger, + "Trying to restore the scan train after a crash" + ); + match scan_error.scan_type { + DetailedScanType::NewPayables => self + .scan_schedulers + .payable + .schedule_new_payable_scan(ctx, &self.logger), + DetailedScanType::RetryPayables => self + .scan_schedulers + .payable + .schedule_retry_payable_scan(ctx, None, &self.logger), + DetailedScanType::PendingPayables => self + .scan_schedulers + .pending_payable + .schedule(ctx, &self.logger), + DetailedScanType::Receivables => { + self.scan_schedulers.receivable.schedule(ctx, &self.logger) + } } - }, + } Some(response_skeleton) => { let error_msg = NodeToUiMessage { target: ClientId(response_skeleton.client_id), @@ -975,7 +1002,7 @@ impl Accountant { None => Err(StartScanError::NoConsumingWalletFound), }; - self.scan_schedulers.payable.reset_scan_timer(); + self.scan_schedulers.payable.reset_scan_timer(&self.logger); match result { Ok(scan_message) => { @@ -986,8 +1013,8 @@ impl Accountant { .expect("BlockchainBridge is dead"); ScanReschedulingAfterEarlyStop::DoNotSchedule } - Err(e) => self.handle_start_scan_error_and_prevent_scan_stall_point( - PayableSequenceScanner::NewPayables, + Err(e) => self.handle_start_scan_error( + UnableToStartScanner::NewPayables, e, response_skeleton_opt, ), @@ -1018,10 +1045,9 @@ impl Accountant { .expect("BlockchainBridge is dead"); } Err(e) => { - // It is thrown away and there is no rescheduling downstream because every error - // happening here on the start resolves into a panic by the current design - let _ = self.handle_start_scan_error_and_prevent_scan_stall_point( - PayableSequenceScanner::RetryPayables, + // Any error here panics by design, so the return value is unreachable/ignored. + let _ = self.handle_start_scan_error( + UnableToStartScanner::RetryPayables, e, response_skeleton_opt, ); @@ -1056,8 +1082,8 @@ impl Accountant { } Err(e) => { let initial_pending_payable_scan = self.scanners.initial_pending_payable_scan(); - self.handle_start_scan_error_and_prevent_scan_stall_point( - PayableSequenceScanner::PendingPayables { + self.handle_start_scan_error( + UnableToStartScanner::PendingPayables { initial_pending_payable_scan, }, e, @@ -1073,9 +1099,9 @@ impl Accountant { hint } - fn handle_start_scan_error_and_prevent_scan_stall_point( + fn handle_start_scan_error( &self, - scanner: PayableSequenceScanner, + scanner: UnableToStartScanner, e: StartScanError, response_skeleton_opt: Option, ) -> ScanReschedulingAfterEarlyStop { @@ -1102,7 +1128,7 @@ impl Accountant { fn handle_request_of_scan_for_receivable( &mut self, response_skeleton_opt: Option, - ) { + ) -> ScanReschedulingAfterEarlyStop { let result: Result = self.scanners.start_receivable_scan_guarded( &self.earning_wallet, @@ -1113,29 +1139,22 @@ impl Accountant { ); match result { - Ok(scan_message) => self - .retrieve_transactions_sub_opt - .as_ref() - .expect("BlockchainBridge is unbound") - .try_send(scan_message) - .expect("BlockchainBridge is dead"), - Err(e) => { - e.log_error( - &self.logger, - ScanType::Receivables, - response_skeleton_opt.is_some(), - ); - - if let Some(skeleton) = response_skeleton_opt { - self.ui_message_sub_opt - .as_ref() - .expect("UiGateway is unbound") - .try_send(NodeToUiMessage { - target: MessageTarget::ClientId(skeleton.client_id), - body: UiScanResponse {}.tmb(skeleton.context_id), - }) - .expect("UiGateway is dead"); - }; + Ok(scan_message) => { + self.retrieve_transactions_sub_opt + .as_ref() + .expect("BlockchainBridge is unbound") + .try_send(scan_message) + .expect("BlockchainBridge is dead"); + ScanReschedulingAfterEarlyStop::DoNotSchedule + } + Err(e) => + // Any error here panics by design, so the return value is unreachable/ignored. + { + self.handle_start_scan_error( + UnableToStartScanner::Receivables, + e, + response_skeleton_opt, + ) } } } @@ -2037,8 +2056,8 @@ mod tests { let system = System::new("test"); subject.scan_schedulers.automatic_scans_enabled = false; // Making sure we would kill the test if any sort of scan was scheduled - subject.scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); + subject.scan_schedulers.payable.retry_payable_notify_later = + Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify_later = Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify = @@ -2274,6 +2293,7 @@ mod tests { ]) .build(); subject.scan_schedulers.automatic_scans_enabled = false; + subject.scan_schedulers.payable.retry_payable_scan_interval = Duration::from_millis(1); let (blockchain_bridge, _, blockchain_bridge_recording_arc) = make_recorder(); let (ui_gateway, _, ui_gateway_recording_arc) = make_recorder(); let ui_gateway = @@ -2281,7 +2301,7 @@ mod tests { let (peer_actors, peer_addresses) = peer_actors_builder() .blockchain_bridge(blockchain_bridge) .ui_gateway(ui_gateway) - .build_and_provide_addresses(); + .build_with_addresses(); let subject_addr = subject.start(); let system = System::new("test"); let response_skeleton_opt = Some(ResponseSkeleton { @@ -2715,6 +2735,40 @@ mod tests { ); } + #[test] + #[should_panic( + expected = "internal error: entered unreachable code: Early stopped receivable scan \ + was suggested to be followed up by the scan for PendingPayables, which is not supported though" + )] + fn start_scan_error_in_receivables_and_unexpected_instruction_from_early_stop_scan_rescheduling( + ) { + let mut subject = AccountantBuilder::default().build(); + let reschedule_on_error_resolver = RescheduleScanOnErrorResolverMock::default() + .resolve_rescheduling_on_error_result(ScanReschedulingAfterEarlyStop::Schedule( + ScanType::PendingPayables, + )); + let receivable_scanner = ScannerMock::default() + .scan_started_at_result(None) + .start_scan_result(Err(StartScanError::NoConsumingWalletFound)); + subject + .scanners + .replace_scanner(ScannerReplacement::Receivable(ReplacementType::Mock( + receivable_scanner, + ))); + subject.scan_schedulers.reschedule_on_error_resolver = + Box::new(reschedule_on_error_resolver); + let system = System::new("test"); + let subject_addr = subject.start(); + + subject_addr + .try_send(ScanForReceivables { + response_skeleton_opt: None, + }) + .unwrap(); + + system.run(); + } + #[test] fn received_payments_with_response_skeleton_sends_response_to_ui_gateway() { let mut config = bc_from_earning_wallet(make_wallet("earning_wallet")); @@ -2724,7 +2778,7 @@ mod tests { receivable_scan_interval: Duration::from_millis(10_000), }); config.automatic_scans_enabled = false; - let subject = AccountantBuilder::default() + let mut subject = AccountantBuilder::default() .bootstrapper_config(config) .config_dao( ConfigDaoMock::new() @@ -2732,6 +2786,9 @@ mod tests { .set_result(Ok(())), ) .build(); + // Another scan must not be scheduled + subject.scan_schedulers.receivable.handle = + Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); let (ui_gateway, _, ui_gateway_recording_arc) = make_recorder(); let subject_addr = subject.start(); let system = System::new("test"); @@ -2838,11 +2895,13 @@ mod tests { let test_name = "accountant_scans_after_startup_and_does_not_detect_any_pending_payables"; let scan_params = ScanParams::default(); let notify_and_notify_later_params = NotifyAndNotifyLaterParams::default(); - let time_until_next_scan_params_arc = Arc::new(Mutex::new(vec![])); + let compute_time_to_next_scan_params_arc = Arc::new(Mutex::new(vec![])); let earning_wallet = make_wallet("earning"); let consuming_wallet = make_wallet("consuming"); let system = System::new(test_name); - let _ = SystemKillerActor::new(Duration::from_secs(10)).start(); + let (blockchain_bridge, _, _) = make_recorder(); + let blockchain_bridge = blockchain_bridge + .system_stop_conditions(match_lazily_every_type_id!(RetrieveTransactions)); let config = bc_from_wallets(consuming_wallet.clone(), earning_wallet.clone()); let payable_scanner = ScannerMock::new() .scan_started_at_result(None) @@ -2855,25 +2914,31 @@ mod tests { let receivable_scanner = ScannerMock::new() .scan_started_at_result(None) .start_scan_params(&scan_params.receivable_start_scan) - .start_scan_result(Err(StartScanError::NothingToProcess)); - let (subject, new_payable_expected_computed_interval, receivable_scan_interval) = + .start_scan_result(Ok(RetrieveTransactions { + recipient: earning_wallet.clone(), + response_skeleton_opt: None, + })); + let (subject, new_payable_expected_computed_interval) = configure_accountant_for_startup_with_preexisting_pending_payables( test_name, ¬ify_and_notify_later_params, - &time_until_next_scan_params_arc, + &compute_time_to_next_scan_params_arc, config, pending_payable_scanner, receivable_scanner, payable_scanner, ); - let peer_actors = peer_actors_builder().build(); + let peer_actors = peer_actors_builder() + .blockchain_bridge(blockchain_bridge) + .build(); let subject_addr: Addr = subject.start(); let subject_subs = Accountant::make_subs_from(&subject_addr); send_bind_message!(subject_subs, peer_actors); send_start_message!(subject_subs); - // The system is stopped by the NotifyLaterHandleMock for the Receivable scanner + // The system is stopped by the time the RetrieveTransactions msg arrives at the mocked + // BlockchainBridge. let before = SystemTime::now(); system.run(); let after = SystemTime::now(); @@ -2888,15 +2953,13 @@ mod tests { assert_payable_scanner_for_no_pending_payable_found( &scan_params.payable_start_scan, ¬ify_and_notify_later_params, - time_until_next_scan_params_arc, + compute_time_to_next_scan_params_arc, new_payable_expected_computed_interval, ); assert_receivable_scanner( test_name, earning_wallet, &scan_params.receivable_start_scan, - ¬ify_and_notify_later_params.receivables_notify_later, - receivable_scan_interval, ); // The test lays down evidences that the NewPayableScanner couldn't run before // the PendingPayableScanner, which is an intention. @@ -2954,8 +3017,11 @@ mod tests { let receivable_scanner = ScannerMock::new() .scan_started_at_result(None) .start_scan_params(&scan_params.receivable_start_scan) - .start_scan_result(Err(StartScanError::NothingToProcess)); - let (subject, expected_pending_payable_notify_later_interval, receivable_scan_interval) = + .start_scan_result(Ok(RetrieveTransactions { + recipient: earning_wallet.clone(), + response_skeleton_opt: None, + })); + let (subject, expected_pending_payable_notify_later_interval) = configure_accountant_for_startup_with_no_preexisting_pending_payables( test_name, ¬ify_and_notify_later_params, @@ -2964,7 +3030,7 @@ mod tests { pending_payable_scanner, receivable_scanner, ); - let (peer_actors, addresses) = peer_actors_builder().build_and_provide_addresses(); + let (peer_actors, addresses) = peer_actors_builder().build_with_addresses(); let subject_addr: Addr = subject.start(); let subject_subs = Accountant::make_subs_from(&subject_addr); let expected_tx_receipts_msg = TxReceiptsMessage { @@ -3031,8 +3097,6 @@ mod tests { test_name, earning_wallet, &scan_params.receivable_start_scan, - ¬ify_and_notify_later_params.receivables_notify_later, - receivable_scan_interval, ); // Since the assertions proved that the pending payable scanner had run multiple times // before the new payable scanner started or was scheduled, the front position definitely @@ -3055,15 +3119,14 @@ mod tests { struct NotifyAndNotifyLaterParams { new_payables_notify_later: Arc>>, new_payables_notify: Arc>>, - retry_payables_notify: Arc>>, + retry_payables_notify_later: Arc>>, pending_payables_notify_later: Arc>>, - receivables_notify_later: Arc>>, } fn configure_accountant_for_startup_with_preexisting_pending_payables( test_name: &str, notify_and_notify_later_params: &NotifyAndNotifyLaterParams, - time_until_next_scan_params_arc: &Arc>>, + compute_time_to_next_scan_params_arc: &Arc>>, config: BootstrapperConfig, pending_payable_scanner: ScannerMock< RequestTransactionReceipts, @@ -3076,7 +3139,7 @@ mod tests { Option, >, payable_scanner: ScannerMock, - ) -> (Accountant, Duration, Duration) { + ) -> (Accountant, Duration) { let mut subject = make_subject_and_inject_scanners( test_name, config, @@ -3086,7 +3149,6 @@ mod tests { ); let new_payable_expected_computed_interval = Duration::from_secs(3600); // Important that this is made short because the test relies on it with the system stop. - let receivable_scan_interval = Duration::from_millis(50); subject.scan_schedulers.pending_payable.handle = Box::new( NotifyLaterHandleMock::default() .notify_later_params(¬ify_and_notify_later_params.pending_payables_notify_later), @@ -3095,30 +3157,21 @@ mod tests { NotifyLaterHandleMock::default() .notify_later_params(¬ify_and_notify_later_params.new_payables_notify_later), ); - subject.scan_schedulers.payable.retry_payable_notify = Box::new( - NotifyHandleMock::default() - .notify_params(¬ify_and_notify_later_params.retry_payables_notify), + subject.scan_schedulers.payable.retry_payable_notify_later = Box::new( + NotifyLaterHandleMock::default() + .notify_later_params(¬ify_and_notify_later_params.retry_payables_notify_later), ); subject.scan_schedulers.payable.new_payable_notify = Box::new( NotifyHandleMock::default() .notify_params(¬ify_and_notify_later_params.new_payables_notify), ); - let receivable_notify_later_handle_mock = NotifyLaterHandleMock::default() - .notify_later_params(¬ify_and_notify_later_params.receivables_notify_later) - .stop_system_on_count_received(1); - subject.scan_schedulers.receivable.handle = Box::new(receivable_notify_later_handle_mock); - subject.scan_schedulers.receivable.interval = receivable_scan_interval; let interval_computer = NewPayableScanIntervalComputerMock::default() - .time_until_next_scan_params(&time_until_next_scan_params_arc) - .time_until_next_scan_result(ScanTiming::WaitFor( + .compute_time_to_next_scan_params(&compute_time_to_next_scan_params_arc) + .compute_time_to_next_scan_result(ScanTiming::WaitFor( new_payable_expected_computed_interval, )); subject.scan_schedulers.payable.interval_computer = Box::new(interval_computer); - ( - subject, - new_payable_expected_computed_interval, - receivable_scan_interval, - ) + (subject, new_payable_expected_computed_interval) } fn configure_accountant_for_startup_with_no_preexisting_pending_payables( @@ -3136,7 +3189,7 @@ mod tests { ReceivedPayments, Option, >, - ) -> (Accountant, Duration, Duration) { + ) -> (Accountant, Duration) { let mut subject = make_subject_and_inject_scanners( test_name, config, @@ -3144,37 +3197,30 @@ mod tests { receivable_scanner, payable_scanner, ); + let retry_payable_scan_interval = Duration::from_millis(1); let pending_payable_scan_interval = Duration::from_secs(3600); - let receivable_scan_interval = Duration::from_secs(3600); let pending_payable_notify_later_handle_mock = NotifyLaterHandleMock::default() .notify_later_params(¬ify_and_notify_later_params.pending_payables_notify_later) // This should stop the system .stop_system_on_count_received(1); subject.scan_schedulers.pending_payable.handle = Box::new(pending_payable_notify_later_handle_mock); + subject.scan_schedulers.payable.retry_payable_scan_interval = retry_payable_scan_interval; subject.scan_schedulers.pending_payable.interval = pending_payable_scan_interval; subject.scan_schedulers.payable.new_payable_notify_later = Box::new( NotifyLaterHandleMock::default() .notify_later_params(¬ify_and_notify_later_params.new_payables_notify_later), ); - subject.scan_schedulers.payable.retry_payable_notify = Box::new( - NotifyHandleMock::default() - .notify_params(¬ify_and_notify_later_params.retry_payables_notify) + subject.scan_schedulers.payable.retry_payable_notify_later = Box::new( + NotifyLaterHandleMock::default() + .notify_later_params(¬ify_and_notify_later_params.retry_payables_notify_later) .capture_msg_and_let_it_fly_on(), ); subject.scan_schedulers.payable.new_payable_notify = Box::new( NotifyHandleMock::default() .notify_params(¬ify_and_notify_later_params.new_payables_notify), ); - let receivable_notify_later_handle_mock = NotifyLaterHandleMock::default() - .notify_later_params(¬ify_and_notify_later_params.receivables_notify_later); - subject.scan_schedulers.receivable.interval = receivable_scan_interval; - subject.scan_schedulers.receivable.handle = Box::new(receivable_notify_later_handle_mock); - ( - subject, - pending_payable_scan_interval, - receivable_scan_interval, - ) + (subject, pending_payable_scan_interval) } fn make_subject_and_inject_scanners( @@ -3333,7 +3379,7 @@ mod tests { Mutex, Logger, String)>>, >, notify_and_notify_later_params: &NotifyAndNotifyLaterParams, - time_until_next_scan_until_next_new_payable_scan_params_arc: Arc>>, + time_until_next_new_payable_scan_params_arc: Arc>>, new_payable_expected_computed_interval: Duration, ) { // Note that there is no functionality from the payable scanner actually running. @@ -3351,14 +3397,9 @@ mod tests { new_payable_expected_computed_interval )] ); - let time_until_next_scan_until_next_new_payable_scan_params = - time_until_next_scan_until_next_new_payable_scan_params_arc - .lock() - .unwrap(); - assert_eq!( - *time_until_next_scan_until_next_new_payable_scan_params, - vec![()] - ); + let time_until_next_new_payable_scan_params = + time_until_next_new_payable_scan_params_arc.lock().unwrap(); + assert_eq!(*time_until_next_new_payable_scan_params, vec![()]); let payable_scanner_start_scan = payable_scanner_start_scan_arc.lock().unwrap(); assert!( payable_scanner_start_scan.is_empty(), @@ -3374,7 +3415,7 @@ mod tests { scan_for_new_payables_notify_params ); let scan_for_retry_payables_notify_params = notify_and_notify_later_params - .retry_payables_notify + .retry_payables_notify_later .lock() .unwrap(); assert!( @@ -3460,14 +3501,17 @@ mod tests { scan_for_new_payables_notify_params ); let scan_for_retry_payables_notify_params = notify_and_notify_later_params - .retry_payables_notify + .retry_payables_notify_later .lock() .unwrap(); assert_eq!( *scan_for_retry_payables_notify_params, - vec![ScanForRetryPayables { - response_skeleton_opt: None - }], + vec![( + ScanForRetryPayables { + response_skeleton_opt: None + }, + Duration::from_millis(1) + )], ); } @@ -3477,16 +3521,8 @@ mod tests { receivable_start_scan_params_arc: &Arc< Mutex, Logger, String)>>, >, - scan_for_receivables_notify_later_params_arc: &Arc< - Mutex>, - >, - receivable_scan_interval: Duration, ) { assert_receivable_scan_ran(test_name, receivable_start_scan_params_arc, earning_wallet); - assert_another_receivable_scan_scheduled( - scan_for_receivables_notify_later_params_arc, - receivable_scan_interval, - ) } fn assert_receivable_scan_ran( @@ -3514,25 +3550,6 @@ mod tests { assert_using_the_same_logger(&r_logger, test_name, Some("r")); } - fn assert_another_receivable_scan_scheduled( - scan_for_receivables_notify_later_params_arc: &Arc< - Mutex>, - >, - receivable_scan_interval: Duration, - ) { - let scan_for_receivables_notify_later_params = - scan_for_receivables_notify_later_params_arc.lock().unwrap(); - assert_eq!( - *scan_for_receivables_notify_later_params, - vec![( - ScanForReceivables { - response_skeleton_opt: None - }, - receivable_scan_interval - )] - ); - } - #[test] fn initial_pending_payable_scan_if_some_payables_found() { let sent_payable_dao = @@ -3629,23 +3646,31 @@ mod tests { let start_scan_params_arc = Arc::new(Mutex::new(vec![])); let notify_later_receivable_params_arc = Arc::new(Mutex::new(vec![])); let system = System::new(test_name); + let (blockchain_bridge, _, blockchain_bridge_recording_arc) = make_recorder(); + let blockchain_bridge = blockchain_bridge.system_stop_conditions( + match_lazily_every_type_id!(RetrieveTransactions, RetrieveTransactions), + ); SystemKillerActor::new(Duration::from_secs(10)).start(); // a safety net for GitHub Actions + let recipient = make_wallet("some_recipient"); let receivable_scanner = ScannerMock::new() .scan_started_at_result(None) .scan_started_at_result(None) .start_scan_params(&start_scan_params_arc) - .start_scan_result(Err(StartScanError::NothingToProcess)) .start_scan_result(Ok(RetrieveTransactions { - recipient: make_wallet("some_recipient"), + recipient: recipient.clone(), + response_skeleton_opt: None, + })) + .start_scan_result(Ok(RetrieveTransactions { + recipient: recipient.clone(), response_skeleton_opt: None, })) - .stop_the_system_after_last_msg(); + .finish_scan_result(None); let earning_wallet = make_wallet("earning"); let mut config = bc_from_earning_wallet(earning_wallet.clone()); config.scan_intervals_opt = Some(ScanIntervals { payable_scan_interval: Duration::from_secs(100), pending_payable_scan_interval: Duration::from_secs(10), - receivable_scan_interval: Duration::from_millis(99), + receivable_scan_interval: Duration::from_millis(15), }); let mut subject = AccountantBuilder::default() .bootstrapper_config(config) @@ -3663,8 +3688,25 @@ mod tests { ); let subject_addr = subject.start(); let subject_subs = Accountant::make_subs_from(&subject_addr); - let peer_actors = peer_actors_builder().build(); + let (peer_actors, peer_actors_addrs) = peer_actors_builder() + .blockchain_bridge(blockchain_bridge) + .build_with_addresses(); send_bind_message!(subject_subs, peer_actors); + let counter_msg = ReceivedPayments { + timestamp: SystemTime::now(), + new_start_block: BlockMarker::Value(1234), + transactions: vec![], + response_skeleton_opt: None, + }; + let counter_msg_setup = setup_for_counter_msg_triggered_via_type_id!( + RetrieveTransactions, + counter_msg, + subject_addr + ); + peer_actors_addrs + .blockchain_bridge_addr + .try_send(SetUpCounterMsgs::new(vec![counter_msg_setup])) + .unwrap(); subject_addr .try_send(ScanForReceivables { @@ -3702,37 +3744,37 @@ mod tests { assert!(start_scan_params.is_empty()); debug!( first_attempt_logger, - "first attempt verifying receivable scanner" + "first attempt receivable scanner logger verification" ); debug!( second_attempt_logger, - "second attempt verifying receivable scanner" + "second attempt receivable scanner logger verification" ); + let blockchain_bridge_recording = blockchain_bridge_recording_arc.lock().unwrap(); + let first_msg_towards_bb = + blockchain_bridge_recording.get_record::(0); + let expected_msg = RetrieveTransactions { + recipient, + response_skeleton_opt: None, + }; + assert_eq!(first_msg_towards_bb, &expected_msg); + let second_msg_towards_bb = + blockchain_bridge_recording.get_record::(1); + assert_eq!(second_msg_towards_bb, &expected_msg); assert_eq!( *notify_later_receivable_params, - vec![ - ( - ScanForReceivables { - response_skeleton_opt: None - }, - Duration::from_millis(99) - ), - ( - ScanForReceivables { - response_skeleton_opt: None - }, - Duration::from_millis(99) - ), - ] + vec![( + ScanForReceivables { + response_skeleton_opt: None + }, + Duration::from_millis(15) + ),] ); tlh.exists_log_containing(&format!( - "DEBUG: {test_name}: There was nothing to process during Receivables scan." - )); - tlh.exists_log_containing(&format!( - "DEBUG: {test_name}: first attempt verifying receivable scanner", + "DEBUG: {test_name}: first attempt receivable scanner logger verification", )); tlh.exists_log_containing(&format!( - "DEBUG: {test_name}: second attempt verifying receivable scanner", + "DEBUG: {test_name}: second attempt receivable scanner logger verification", )); } @@ -4207,7 +4249,8 @@ mod tests { expected = "internal error: entered unreachable code: Early stopped new payable scan \ was suggested to be followed up by the scan for Receivables, which is not supported though" )] - fn start_scan_error_in_new_payables_and_unexpected_reaction_by_receivable_scan_scheduling() { + fn start_scan_error_in_new_payables_and_unexpected_instruction_from_early_stop_scan_rescheduling( + ) { let mut subject = AccountantBuilder::default().build(); let reschedule_on_error_resolver = RescheduleScanOnErrorResolverMock::default() .resolve_rescheduling_on_error_result(ScanReschedulingAfterEarlyStop::Schedule( @@ -4427,32 +4470,6 @@ mod tests { ); } - #[test] - #[should_panic( - expected = "internal error: entered unreachable code: Early stopped pending payable scan \ - was suggested to be followed up by the scan for Receivables, which is not supported though" - )] - fn start_scan_error_in_pending_payables_and_unexpected_reaction_by_receivable_scan_scheduling() - { - let mut subject = AccountantBuilder::default().build(); - let reschedule_on_error_resolver = RescheduleScanOnErrorResolverMock::default() - .resolve_rescheduling_on_error_result(ScanReschedulingAfterEarlyStop::Schedule( - ScanType::Receivables, - )); - subject.scan_schedulers.reschedule_on_error_resolver = - Box::new(reschedule_on_error_resolver); - let system = System::new("test"); - let subject_addr = subject.start(); - - subject_addr - .try_send(ScanForPendingPayables { - response_skeleton_opt: None, - }) - .unwrap(); - - system.run(); - } - #[test] fn report_routing_service_provided_message_is_received() { init_test_logging(); @@ -5090,8 +5107,8 @@ mod tests { Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify_later = Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); - subject.scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); + subject.scan_schedulers.payable.retry_payable_notify_later = + Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); let expected_tx = TxBuilder::default().hash(expected_hash.clone()).build(); let sent_payable = SentPayables { payment_procedure_result: Ok(BatchResults { @@ -5149,8 +5166,8 @@ mod tests { Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify_later = Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); - subject.scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); + subject.scan_schedulers.payable.retry_payable_notify_later = + Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); let expected_tx = TxBuilder::default().hash(expected_hash.clone()).build(); let sent_payable = SentPayables { payment_procedure_result: Ok(BatchResults { @@ -5184,7 +5201,7 @@ mod tests { init_test_logging(); let test_name = "retry_payable_scan_is_requested_to_be_repeated"; let finish_scan_params_arc = Arc::new(Mutex::new(vec![])); - let retry_payable_notify_params_arc = Arc::new(Mutex::new(vec![])); + let retry_payable_notify_later_params_arc = Arc::new(Mutex::new(vec![])); let system = System::new(test_name); let consuming_wallet = make_paying_wallet(b"paying wallet"); let mut subject = AccountantBuilder::default() @@ -5201,8 +5218,10 @@ mod tests { result: NextScanToRun::RetryPayableScan, }), ))); - subject.scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().notify_params(&retry_payable_notify_params_arc)); + subject.scan_schedulers.payable.retry_payable_notify_later = Box::new( + NotifyLaterHandleMock::default() + .notify_later_params(&retry_payable_notify_later_params_arc), + ); subject.scan_schedulers.payable.new_payable_notify = Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify_later = @@ -5228,9 +5247,10 @@ mod tests { let (actual_sent_payable, logger) = finish_scan_params.remove(0); assert_eq!(actual_sent_payable, sent_payable,); assert_using_the_same_logger(&logger, test_name, None); - let mut payable_notify_params = retry_payable_notify_params_arc.lock().unwrap(); - let scheduled_msg = payable_notify_params.remove(0); + let mut payable_notify_params = retry_payable_notify_later_params_arc.lock().unwrap(); + let (scheduled_msg, duration) = payable_notify_params.remove(0); assert_eq!(scheduled_msg, ScanForRetryPayables::default()); + assert_eq!(duration, Duration::from_secs(5 * 60)); assert!( payable_notify_params.is_empty(), "Should be empty but {:?}", @@ -5245,7 +5265,7 @@ mod tests { let test_name = "accountant_in_automatic_mode_schedules_tx_retry_as_some_pending_payables_have_not_completed"; let finish_scan_params_arc = Arc::new(Mutex::new(vec![])); - let retry_payable_notify_params_arc = Arc::new(Mutex::new(vec![])); + let retry_payable_notify_later_params_arc = Arc::new(Mutex::new(vec![])); let mut subject = AccountantBuilder::default() .logger(Logger::new(test_name)) .build(); @@ -5263,8 +5283,10 @@ mod tests { Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.pending_payable.handle = Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); - subject.scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().notify_params(&retry_payable_notify_params_arc)); + subject.scan_schedulers.payable.retry_payable_notify_later = Box::new( + NotifyLaterHandleMock::default() + .notify_later_params(&retry_payable_notify_later_params_arc), + ); let system = System::new(test_name); let (mut msg, _) = make_tx_receipts_msg(vec![ SeedsToMakeUpPayableWithStatus { @@ -5286,12 +5308,15 @@ mod tests { let mut finish_scan_params = finish_scan_params_arc.lock().unwrap(); let (msg_actual, logger) = finish_scan_params.remove(0); assert_eq!(msg_actual, msg); - let retry_payable_notify_params = retry_payable_notify_params_arc.lock().unwrap(); + let retry_payable_notify_params = retry_payable_notify_later_params_arc.lock().unwrap(); assert_eq!( *retry_payable_notify_params, - vec![ScanForRetryPayables { - response_skeleton_opt: None - }] + vec![( + ScanForRetryPayables { + response_skeleton_opt: None + }, + Duration::from_secs(5 * 60) + )] ); assert_using_the_same_logger(&logger, test_name, None) } @@ -5314,8 +5339,8 @@ mod tests { .replace_scanner(ScannerReplacement::PendingPayable(ReplacementType::Mock( pending_payable_scanner, ))); - subject.scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); + subject.scan_schedulers.payable.retry_payable_notify_later = + Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify = Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify_later = @@ -5380,8 +5405,8 @@ mod tests { .replace_scanner(ScannerReplacement::PendingPayable(ReplacementType::Mock( pending_payable_scanner, ))); - subject.scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); + subject.scan_schedulers.payable.retry_payable_notify_later = + Box::new(NotifyLaterHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify = Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify_later = @@ -5443,8 +5468,9 @@ mod tests { .replace_scanner(ScannerReplacement::PendingPayable(ReplacementType::Mock( pending_payable_scanner, ))); - subject.scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().notify_params(&retry_payable_notify_params_arc)); + subject.scan_schedulers.payable.retry_payable_notify_later = Box::new( + NotifyLaterHandleMock::default().notify_later_params(&retry_payable_notify_params_arc), + ); subject.scan_schedulers.payable.new_payable_notify = Box::new(NotifyHandleMock::default().panic_on_schedule_attempt()); subject.scan_schedulers.payable.new_payable_notify_later = @@ -5468,9 +5494,12 @@ mod tests { let retry_payable_notify_params = retry_payable_notify_params_arc.lock().unwrap(); assert_eq!( *retry_payable_notify_params, - vec![ScanForRetryPayables { - response_skeleton_opt: Some(response_skeleton) - }] + vec![( + ScanForRetryPayables { + response_skeleton_opt: Some(response_skeleton) + }, + Duration::from_secs(5 * 60) + )] ); assert_using_the_same_logger(&logger, test_name, None) } @@ -5481,7 +5510,7 @@ mod tests { let test_name = "accountant_confirms_all_pending_txs_and_schedules_new_payable_scanner_timely"; let finish_scan_params_arc = Arc::new(Mutex::new(vec![])); - let time_until_next_scan_params_arc = Arc::new(Mutex::new(vec![])); + let compute_time_to_next_scan_params_arc = Arc::new(Mutex::new(vec![])); let new_payable_notify_later_arc = Arc::new(Mutex::new(vec![])); let new_payable_notify_arc = Arc::new(Mutex::new(vec![])); let system = System::new("new_payable_scanner_timely"); @@ -5498,9 +5527,9 @@ mod tests { ))); let expected_computed_interval = Duration::from_secs(3); let interval_computer = NewPayableScanIntervalComputerMock::default() - .time_until_next_scan_params(&time_until_next_scan_params_arc) + .compute_time_to_next_scan_params(&compute_time_to_next_scan_params_arc) // This determines the test - .time_until_next_scan_result(ScanTiming::WaitFor(expected_computed_interval)); + .compute_time_to_next_scan_result(ScanTiming::WaitFor(expected_computed_interval)); subject.scan_schedulers.payable.interval_computer = Box::new(interval_computer); subject.scan_schedulers.payable.new_payable_notify_later = Box::new( NotifyLaterHandleMock::default().notify_later_params(&new_payable_notify_later_arc), @@ -5558,7 +5587,7 @@ mod tests { let test_name = "accountant_confirms_payable_txs_and_schedules_the_delayed_new_payable_scanner_asap"; let finish_scan_params_arc = Arc::new(Mutex::new(vec![])); - let time_until_next_scan_params_arc = Arc::new(Mutex::new(vec![])); + let compute_time_to_next_scan_params_arc = Arc::new(Mutex::new(vec![])); let new_payable_notify_later_arc = Arc::new(Mutex::new(vec![])); let new_payable_notify_arc = Arc::new(Mutex::new(vec![])); let mut subject = AccountantBuilder::default() @@ -5573,9 +5602,9 @@ mod tests { pending_payable_scanner, ))); let interval_computer = NewPayableScanIntervalComputerMock::default() - .time_until_next_scan_params(&time_until_next_scan_params_arc) + .compute_time_to_next_scan_params(&compute_time_to_next_scan_params_arc) // This determines the test - .time_until_next_scan_result(ScanTiming::ReadyNow); + .compute_time_to_next_scan_result(ScanTiming::ReadyNow); subject.scan_schedulers.payable.interval_computer = Box::new(interval_computer); subject.scan_schedulers.payable.new_payable_notify_later = Box::new( NotifyLaterHandleMock::default().notify_later_params(&new_payable_notify_later_arc), @@ -5610,8 +5639,8 @@ mod tests { "Should be empty but {:?}", finish_scan_params ); - let time_until_next_scan_params = time_until_next_scan_params_arc.lock().unwrap(); - assert_eq!(*time_until_next_scan_params, vec![()]); + let compute_time_to_next_scan_params = compute_time_to_next_scan_params_arc.lock().unwrap(); + assert_eq!(*compute_time_to_next_scan_params, vec![()]); let new_payable_notify_later = new_payable_notify_later_arc.lock().unwrap(); assert!( new_payable_notify_later.is_empty(), @@ -5662,7 +5691,7 @@ mod tests { }), }]); let left_side_bound = if let ScanTiming::WaitFor(interval) = - assertion_interval_computer.time_until_next_scan() + assertion_interval_computer.compute_time_to_next_scan() { interval } else { @@ -5676,7 +5705,7 @@ mod tests { let new_payable_notify_later = new_payable_notify_later_arc.lock().unwrap(); let (_, actual_interval) = new_payable_notify_later[0]; let right_side_bound = if let ScanTiming::WaitFor(interval) = - assertion_interval_computer.time_until_next_scan() + assertion_interval_computer.compute_time_to_next_scan() { interval } else { @@ -5854,8 +5883,9 @@ mod tests { // Setup let notify_later_params_arc = Arc::new(Mutex::new(vec![])); scan_schedulers.payable.interval_computer = Box::new( - NewPayableScanIntervalComputerMock::default() - .time_until_next_scan_result(ScanTiming::WaitFor(Duration::from_secs(152))), + NewPayableScanIntervalComputerMock::default().compute_time_to_next_scan_result( + ScanTiming::WaitFor(Duration::from_secs(152)), + ), ); scan_schedulers.payable.new_payable_notify_later = Box::new( NotifyLaterHandleMock::default().notify_later_params(¬ify_later_params_arc), @@ -5887,28 +5917,32 @@ mod tests { Box::new( |_scanners: &mut Scanners, scan_schedulers: &mut ScanSchedulers| { // Setup - let notify_params_arc = Arc::new(Mutex::new(vec![])); - scan_schedulers.payable.retry_payable_notify = - Box::new(NotifyHandleMock::default().notify_params(¬ify_params_arc)); + let notify_later_params_arc = Arc::new(Mutex::new(vec![])); + scan_schedulers.payable.retry_payable_notify_later = Box::new( + NotifyLaterHandleMock::default().notify_later_params(¬ify_later_params_arc), + ); // Assertions Box::new(move |response_skeleton_opt| { - let notify_params = notify_params_arc.lock().unwrap(); + let notify_later_params = notify_later_params_arc.lock().unwrap(); match response_skeleton_opt { None => { // Response skeleton must be None assert_eq!( - *notify_params, - vec![ScanForRetryPayables { - response_skeleton_opt: None - }] + *notify_later_params, + vec![( + ScanForRetryPayables { + response_skeleton_opt: None + }, + Duration::from_secs(5 * 60) + )] ) } Some(_) => { assert!( - notify_params.is_empty(), + notify_later_params.is_empty(), "Should be empty but contained {:?}", - notify_params + notify_later_params ) } } diff --git a/node/src/accountant/scanners/mod.rs b/node/src/accountant/scanners/mod.rs index 787b9a8b2..d14814a5b 100644 --- a/node/src/accountant/scanners/mod.rs +++ b/node/src/accountant/scanners/mod.rs @@ -122,7 +122,10 @@ impl Scanners { }); } - Self::start_correct_payable_scanner::( + <(dyn MultistageDualPayableScanner) as StartableScanner< + ScanForNewPayables, + InitialTemplatesMessage, + >>::start_scan( &mut *self.payable, wallet, timestamp, @@ -150,7 +153,10 @@ impl Scanners { ) } - Self::start_correct_payable_scanner::( + <(dyn MultistageDualPayableScanner) as StartableScanner< + ScanForRetryPayables, + InitialTemplatesMessage, + >>::start_scan( &mut *self.payable, wallet, timestamp, @@ -168,10 +174,14 @@ impl Scanners { automatic_scans_enabled: bool, ) -> Result { let triggered_manually = response_skeleton_opt.is_some(); - self.check_general_conditions_for_pending_payable_scan( - triggered_manually, - automatic_scans_enabled, - )?; + if triggered_manually && automatic_scans_enabled { + return Err(StartScanError::ManualTriggerError( + ManulTriggerError::AutomaticScanConflict, + )); + } + + self.check_pending_payable_existence(triggered_manually)?; + match ( self.pending_payable.scan_started_at(), self.payable.scan_started_at(), @@ -256,6 +266,7 @@ impl Scanners { } pub fn acknowledge_scan_error(&mut self, error: &ScanError, logger: &Logger) { + debug!(logger, "Acknowledging a scan that couldn't finish"); match error.scan_type { DetailedScanType::NewPayables | DetailedScanType::RetryPayables => { self.payable.mark_as_ended(logger) @@ -298,41 +309,14 @@ impl Scanners { self.initial_pending_payable_scan = false } - // This is a helper function reducing a boilerplate of complex trait resolving where - // the compiler requires to specify which trigger message distinguishes the scan to run. - // The payable scanner offers two modes through doubled implementations of StartableScanner - // which uses the trigger message type as the only distinction between them. - fn start_correct_payable_scanner<'a, TriggerMessage>( - scanner: &'a mut (dyn MultistageDualPayableScanner + 'a), - wallet: &Wallet, - timestamp: SystemTime, - response_skeleton_opt: Option, - logger: &Logger, - ) -> Result - where - TriggerMessage: Message, - (dyn MultistageDualPayableScanner + 'a): - StartableScanner, - { - <(dyn MultistageDualPayableScanner + 'a) as StartableScanner< - TriggerMessage, - InitialTemplatesMessage, - >>::start_scan(scanner, wallet, timestamp, response_skeleton_opt, logger) - } - - fn check_general_conditions_for_pending_payable_scan( + fn check_pending_payable_existence( &mut self, triggered_manually: bool, - automatic_scans_enabled: bool, ) -> Result<(), StartScanError> { - if triggered_manually && automatic_scans_enabled { - return Err(StartScanError::ManualTriggerError( - ManulTriggerError::AutomaticScanConflict, - )); - } if self.initial_pending_payable_scan { return Ok(()); } + if triggered_manually && !self.aware_of_unresolved_pending_payable { return Err(StartScanError::ManualTriggerError( ManulTriggerError::UnnecessaryRequest { @@ -794,11 +778,11 @@ mod tests { false ); let dumped_records = pending_payable_scanner - .yet_unproven_failed_payables + .suspected_failed_payables .dump_cache(); assert!( dumped_records.is_empty(), - "There should be no yet unproven failures but found {:?}.", + "There should be no suspected failures but found {:?}.", dumped_records ); assert_eq!( @@ -1199,7 +1183,9 @@ mod tests { ); TestLogHandler::new().assert_logs_match_in_order(vec![ &format!("INFO: {test_name}: Scanning for pending payable"), - &format!("DEBUG: {test_name}: Found 1 pending payables and 1 unfinalized failures to process"), + &format!( + "DEBUG: {test_name}: Found 1 pending payables and 1 suspected failures to process" + ), ]) } @@ -1340,8 +1326,8 @@ mod tests { #[test] #[should_panic( - expected = "internal error: entered unreachable code: Automatic pending payable \ - scan should never start if there are no pending payables to process." + expected = "internal error: entered unreachable code: Automatic pending payable scan should \ + never start if there are no pending payables to process." )] fn pending_payable_scanner_bumps_into_zero_pending_payable_awareness_in_the_automatic_mode() { let consuming_wallet = make_paying_wallet(b"consuming"); @@ -1360,11 +1346,12 @@ mod tests { } #[test] - fn check_general_conditions_for_pending_payable_scan_if_it_is_initial_pending_payable_scan() { + fn check_pending_payable_existence_for_initial_pending_payable_scan_and_zero_awareness() { let mut subject = make_dull_subject(); + subject.aware_of_unresolved_pending_payable = false; subject.initial_pending_payable_scan = true; - let result = subject.check_general_conditions_for_pending_payable_scan(false, true); + let result = subject.check_pending_payable_existence(false); assert_eq!(result, Ok(())); assert_eq!(subject.initial_pending_payable_scan, true); diff --git a/node/src/accountant/scanners/payable_scanner/mod.rs b/node/src/accountant/scanners/payable_scanner/mod.rs index b82d2c46e..4c9ac9804 100644 --- a/node/src/accountant/scanners/payable_scanner/mod.rs +++ b/node/src/accountant/scanners/payable_scanner/mod.rs @@ -251,6 +251,10 @@ impl PayableScanner { self.insert_records_in_sent_payables(&batch_results.sent_txs); } if failed > 0 { + debug!( + logger, + "Recording failed txs: {:?}", batch_results.failed_txs + ); self.insert_records_in_failed_payables(&batch_results.failed_txs); } } diff --git a/node/src/accountant/scanners/payable_scanner/start_scan.rs b/node/src/accountant/scanners/payable_scanner/start_scan.rs index 35cbd3ab2..2684407f7 100644 --- a/node/src/accountant/scanners/payable_scanner/start_scan.rs +++ b/node/src/accountant/scanners/payable_scanner/start_scan.rs @@ -66,7 +66,7 @@ impl StartableScanner for Payable info!(logger, "Scanning for retry payables"); let failed_txs = self.get_txs_to_retry(); let amount_from_payables = self.find_amount_from_payables(&failed_txs); - let retry_tx_templates = RetryTxTemplates::new(&failed_txs, &amount_from_payables); + let retry_tx_templates = RetryTxTemplates::new(&failed_txs, &amount_from_payables, logger); Ok(InitialTemplatesMessage { initial_templates: Either::Right(retry_tx_templates), @@ -89,7 +89,7 @@ mod tests { use crate::accountant::scanners::payable_scanner::tx_templates::initial::retry::{ RetryTxTemplate, RetryTxTemplates, }; - use crate::accountant::scanners::Scanners; + use crate::accountant::scanners::payable_scanner::MultistageDualPayableScanner; use crate::accountant::test_utils::{ make_payable_account, FailedPayableDaoMock, PayableDaoMock, }; @@ -144,7 +144,10 @@ mod tests { .payable_dao(payable_dao) .build(); - let result = Scanners::start_correct_payable_scanner::( + let result = <(dyn MultistageDualPayableScanner) as StartableScanner< + ScanForRetryPayables, + InitialTemplatesMessage, + >>::start_scan( &mut subject, &consuming_wallet, timestamp, @@ -157,11 +160,8 @@ mod tests { let retrieve_payables_params = retrieve_payables_params_arc.lock().unwrap(); let expected_tx_templates = { let mut tx_template_1 = RetryTxTemplate::from(&failed_tx_1); - tx_template_1.base.amount_in_wei = - tx_template_1.base.amount_in_wei + payable_account_1.balance_wei; - + tx_template_1.base.amount_in_wei = payable_account_1.balance_wei; let tx_template_2 = RetryTxTemplate::from(&failed_tx_2); - RetryTxTemplates(vec![tx_template_1, tx_template_2]) }; assert_eq!( diff --git a/node/src/accountant/scanners/payable_scanner/tx_templates/initial/retry.rs b/node/src/accountant/scanners/payable_scanner/tx_templates/initial/retry.rs index 9990635cd..8157a373b 100644 --- a/node/src/accountant/scanners/payable_scanner/tx_templates/initial/retry.rs +++ b/node/src/accountant/scanners/payable_scanner/tx_templates/initial/retry.rs @@ -1,6 +1,7 @@ // Copyright (c) 2025, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. use crate::accountant::db_access_objects::failed_payable_dao::FailedTx; use crate::accountant::scanners::payable_scanner::tx_templates::BaseTxTemplate; +use masq_lib::logger::Logger; use std::collections::{BTreeSet, HashMap}; use std::ops::{Deref, DerefMut}; use web3::types::Address; @@ -13,11 +14,25 @@ pub struct RetryTxTemplate { } impl RetryTxTemplate { - pub fn new(failed_tx: &FailedTx, payable_scan_amount_opt: Option) -> Self { + pub fn new( + failed_tx: &FailedTx, + updated_payable_balance_opt: Option, + logger: &Logger, + ) -> Self { let mut retry_template = RetryTxTemplate::from(failed_tx); - if let Some(payable_scan_amount) = payable_scan_amount_opt { - retry_template.base.amount_in_wei += payable_scan_amount; + debug!(logger, "Tx to retry {:?}", failed_tx); + + if let Some(updated_payable_balance) = updated_payable_balance_opt { + debug!( + logger, + "Updating the pay for {:?} from former {} to latest accounted balance {} of minor", + failed_tx.receiver_address, + failed_tx.amount_minor, + updated_payable_balance + ); + + retry_template.base.amount_in_wei = updated_payable_balance; } retry_template @@ -44,6 +59,7 @@ impl RetryTxTemplates { pub fn new( txs_to_retry: &BTreeSet, amounts_from_payables: &HashMap, + logger: &Logger, ) -> Self { Self( txs_to_retry @@ -52,7 +68,7 @@ impl RetryTxTemplates { let payable_scan_amount_opt = amounts_from_payables .get(&tx_to_retry.receiver_address) .copied(); - RetryTxTemplate::new(tx_to_retry, payable_scan_amount_opt) + RetryTxTemplate::new(tx_to_retry, payable_scan_amount_opt, logger) }) .collect(), ) @@ -98,6 +114,49 @@ mod tests { }; use crate::accountant::scanners::payable_scanner::tx_templates::BaseTxTemplate; use crate::blockchain::test_utils::{make_address, make_tx_hash}; + use masq_lib::logger::Logger; + + #[test] + fn retry_tx_template_constructor_works() { + let receiver_address = make_address(42); + let amount_in_wei = 1_000_000; + let gas_price = 20_000_000_000; + let nonce = 123; + let tx_hash = make_tx_hash(789); + let failed_tx = FailedTx { + hash: tx_hash, + receiver_address, + amount_minor: amount_in_wei, + gas_price_minor: gas_price, + nonce, + timestamp: 1234567, + reason: FailureReason::PendingTooLong, + status: FailureStatus::RetryRequired, + }; + let logger = Logger::new("test"); + let fetched_balance_from_payable_table_opt_1 = None; + let fetched_balance_from_payable_table_opt_2 = Some(1_234_567); + + let result_1 = RetryTxTemplate::new( + &failed_tx, + fetched_balance_from_payable_table_opt_1, + &logger, + ); + let result_2 = RetryTxTemplate::new( + &failed_tx, + fetched_balance_from_payable_table_opt_2, + &logger, + ); + + let assert = |result: RetryTxTemplate, expected_amount_in_wei: u128| { + assert_eq!(result.base.receiver_address, receiver_address); + assert_eq!(result.base.amount_in_wei, expected_amount_in_wei); + assert_eq!(result.prev_gas_price_wei, gas_price); + assert_eq!(result.prev_nonce, nonce); + }; + assert(result_1, amount_in_wei); + assert(result_2, fetched_balance_from_payable_table_opt_2.unwrap()); + } #[test] fn retry_tx_template_can_be_created_from_failed_tx() { diff --git a/node/src/accountant/scanners/payable_scanner/tx_templates/priced/new.rs b/node/src/accountant/scanners/payable_scanner/tx_templates/priced/new.rs index 6de54e4c9..200b9602b 100644 --- a/node/src/accountant/scanners/payable_scanner/tx_templates/priced/new.rs +++ b/node/src/accountant/scanners/payable_scanner/tx_templates/priced/new.rs @@ -4,7 +4,7 @@ use crate::accountant::scanners::payable_scanner::tx_templates::initial::new::{ NewTxTemplate, NewTxTemplates, }; use crate::accountant::scanners::payable_scanner::tx_templates::BaseTxTemplate; -use crate::blockchain::blockchain_bridge::increase_gas_price_by_margin; +use crate::blockchain::blockchain_bridge::increase_by_percentage; use masq_lib::logger::Logger; use std::ops::Deref; use thousands::Separable; @@ -63,7 +63,7 @@ impl PricedNewTxTemplates { ceil: u128, logger: &Logger, ) -> Self { - let computed_gas_price_wei = increase_gas_price_by_margin(latest_gas_price_wei); + let computed_gas_price_wei = increase_by_percentage(latest_gas_price_wei); let safe_gas_price_wei = if computed_gas_price_wei > ceil { warning!( diff --git a/node/src/accountant/scanners/payable_scanner/tx_templates/priced/retry.rs b/node/src/accountant/scanners/payable_scanner/tx_templates/priced/retry.rs index 48e41f4b9..3477b206a 100644 --- a/node/src/accountant/scanners/payable_scanner/tx_templates/priced/retry.rs +++ b/node/src/accountant/scanners/payable_scanner/tx_templates/priced/retry.rs @@ -4,7 +4,8 @@ use crate::accountant::scanners::payable_scanner::tx_templates::initial::retry:: RetryTxTemplate, RetryTxTemplates, }; use crate::accountant::scanners::payable_scanner::tx_templates::BaseTxTemplate; -use crate::blockchain::blockchain_bridge::increase_gas_price_by_margin; +use crate::blockchain::blockchain_bridge::increase_by_percentage; +use masq_lib::constants::DEFAULT_GAS_PRICE_RETRY_CONSTANT; use masq_lib::logger::Logger; use std::ops::{Deref, DerefMut}; use thousands::Separable; @@ -34,7 +35,7 @@ impl PricedRetryTxTemplate { ) -> PricedRetryTxTemplate { let receiver = retry_tx_template.base.receiver_address; let computed_gas_price_wei = - Self::compute_gas_price(retry_tx_template.prev_gas_price_wei, latest_gas_price_wei); + Self::compute_gas_price(latest_gas_price_wei, retry_tx_template.prev_gas_price_wei); let safe_gas_price_wei = if computed_gas_price_wei > ceil { log_builder.push(receiver, computed_gas_price_wei); @@ -46,10 +47,12 @@ impl PricedRetryTxTemplate { PricedRetryTxTemplate::new(retry_tx_template, safe_gas_price_wei) } - fn compute_gas_price(latest_gas_price_wei: u128, prev_gas_price_wei: u128) -> u128 { - let gas_price_wei = latest_gas_price_wei.max(prev_gas_price_wei); - - increase_gas_price_by_margin(gas_price_wei) + pub fn compute_gas_price(latest_gas_price_wei: u128, prev_gas_price_wei: u128) -> u128 { + if latest_gas_price_wei >= prev_gas_price_wei { + increase_by_percentage(latest_gas_price_wei) + } else { + prev_gas_price_wei + DEFAULT_GAS_PRICE_RETRY_CONSTANT + } } } @@ -167,3 +170,44 @@ impl RetryLogBuilder { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn compute_gas_price_increases_by_percentage_if_latest_is_higher() { + let latest_gas_price_wei = 101; + let prev_gas_price_wei = 100; + + let computed_gas_price = + PricedRetryTxTemplate::compute_gas_price(latest_gas_price_wei, prev_gas_price_wei); + + let expected_gas_price = increase_by_percentage(latest_gas_price_wei); + assert_eq!(computed_gas_price, expected_gas_price); + } + + #[test] + fn compute_gas_price_increases_by_percentage_if_latest_is_equal() { + let latest_gas_price_wei = 100; + let prev_gas_price_wei = 100; + + let computed_gas_price = + PricedRetryTxTemplate::compute_gas_price(latest_gas_price_wei, prev_gas_price_wei); + + let expected_gas_price = increase_by_percentage(latest_gas_price_wei); + assert_eq!(computed_gas_price, expected_gas_price); + } + + #[test] + fn compute_gas_price_increments_previous_by_constant_if_latest_is_lower() { + let latest_gas_price_wei = 99; + let prev_gas_price_wei = 100; + + let computed_gas_price = + PricedRetryTxTemplate::compute_gas_price(latest_gas_price_wei, prev_gas_price_wei); + + let expected_gas_price = prev_gas_price_wei + DEFAULT_GAS_PRICE_RETRY_CONSTANT; + assert_eq!(computed_gas_price, expected_gas_price); + } +} diff --git a/node/src/accountant/scanners/pending_payable_scanner/mod.rs b/node/src/accountant/scanners/pending_payable_scanner/mod.rs index 499ee0179..4fe12add1 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/mod.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/mod.rs @@ -65,7 +65,7 @@ pub struct PendingPayableScanner { pub failed_payable_dao: Box, pub financial_statistics: Rc>, pub current_sent_payables: Box>, - pub yet_unproven_failed_payables: Box>, + pub suspected_failed_payables: Box>, pub clock: Box, } @@ -119,6 +119,8 @@ impl Scanner for PendingPayableScan let retry_opt = scan_report.requires_payments_retry(); + debug!(logger, "Payment retry requirement: {:?}", retry_opt); + self.process_txs_by_state(scan_report, logger); self.mark_as_ended(logger); @@ -136,7 +138,7 @@ impl Scanner for PendingPayableScan impl CachesEmptiableScanner for PendingPayableScanner { fn empty_caches(&mut self, logger: &Logger) { self.current_sent_payables.ensure_empty_cache(logger); - self.yet_unproven_failed_payables.ensure_empty_cache(logger); + self.suspected_failed_payables.ensure_empty_cache(logger); } } @@ -155,14 +157,16 @@ impl PendingPayableScanner { failed_payable_dao, financial_statistics, current_sent_payables: Box::new(CurrentPendingPayables::default()), - yet_unproven_failed_payables: Box::new(RecheckRequiringFailures::default()), + suspected_failed_payables: Box::new(RecheckRequiringFailures::default()), clock: Box::new(SimpleClockReal::default()), } } fn harvest_tables(&mut self, logger: &Logger) -> Result, StartScanError> { + debug!(logger, "Harvesting sent_payable and failed_payable tables"); + let pending_tx_hashes_opt = self.harvest_pending_payables(); - let failure_hashes_opt = self.harvest_unproven_failures(); + let failure_hashes_opt = self.harvest_suspected_failures(); if Self::is_there_nothing_to_process( pending_tx_hashes_opt.as_ref(), @@ -199,7 +203,7 @@ impl PendingPayableScanner { Some(pending_tx_hashes) } - fn harvest_unproven_failures(&mut self) -> Option> { + fn harvest_suspected_failures(&mut self) -> Option> { let failures = self .failed_payable_dao .retrieve_txs(Some(FailureRetrieveCondition::EveryRecheckRequiredRecord)) @@ -211,7 +215,7 @@ impl PendingPayableScanner { } let failure_hashes = Self::wrap_hashes(&failures, TxHashByTable::FailedPayable); - self.yet_unproven_failed_payables.load_cache(failures); + self.suspected_failed_payables.load_cache(failures); Some(failure_hashes) } @@ -250,8 +254,8 @@ impl PendingPayableScanner { fn emptiness_check(&self, msg: &TxReceiptsMessage) { if msg.results.is_empty() { panic!( - "We should never receive an empty list of results. \ - Even receipts that could not be retrieved can be interpreted" + "We should never receive an empty list of results. Even receipts that could \ + not be retrieved can be interpreted" ) } } @@ -325,7 +329,7 @@ impl PendingPayableScanner { }; self.current_sent_payables.ensure_empty_cache(logger); - self.yet_unproven_failed_payables.ensure_empty_cache(logger); + self.suspected_failed_payables.ensure_empty_cache(logger); cases } @@ -350,10 +354,7 @@ impl PendingPayableScanner { } } TxHashByTable::FailedPayable(tx_hash) => { - match self - .yet_unproven_failed_payables - .get_record_by_hash(tx_hash) - { + match self.suspected_failed_payables.get_record_by_hash(tx_hash) { Some(failed_tx) => { cases.push(TxCaseToBeInterpreted::new( TxByTable::FailedPayable(failed_tx), @@ -378,10 +379,10 @@ impl PendingPayableScanner { panic!( "Looking up '{:?}' in the cache, the record could not be found. Dumping \ - the remaining values. Pending payables: {:?}. Unproven failures: {:?}.", + the remaining values. Pending payables: {:?}. Suspected failures: {:?}.", missing_entry, rearrange(self.current_sent_payables.dump_cache()), - rearrange(self.yet_unproven_failed_payables.dump_cache()), + rearrange(self.suspected_failed_payables.dump_cache()), ) } @@ -396,14 +397,18 @@ impl PendingPayableScanner { logger: &Logger, ) { self.handle_tx_failure_reclaims(confirmed_txs.reclaims, logger); - self.handle_normal_confirmations(confirmed_txs.normal_confirmations, logger); + self.handle_standard_confirmations(confirmed_txs.standard_confirmations, logger); } fn handle_tx_failure_reclaims(&mut self, reclaimed: Vec, logger: &Logger) { if reclaimed.is_empty() { + debug!(logger, "No failure reclaim to process"); + return; } + debug!(logger, "Processing failure reclaims: {:?}", reclaimed); + let hashes_and_blocks = Self::collect_and_sort_hashes_and_blocks(&reclaimed); self.replace_sent_tx_records(&reclaimed, &hashes_and_blocks, logger); @@ -495,11 +500,19 @@ impl PendingPayableScanner { } } - fn handle_normal_confirmations(&mut self, confirmed_txs: Vec, logger: &Logger) { + fn handle_standard_confirmations(&mut self, confirmed_txs: Vec, logger: &Logger) { if confirmed_txs.is_empty() { + debug!(logger, "No standard tx confirmations to process"); return; } + debug!( + logger, + "Processing {} standard tx confirmations", + confirmed_txs.len() + ); + trace!(logger, "{:?}", confirmed_txs); + self.confirm_transactions(&confirmed_txs); self.update_tx_blocks(&confirmed_txs, logger); @@ -616,7 +629,7 @@ impl PendingPayableScanner { }); self.add_new_failures(grouped_failures.new_failures, logger); - self.finalize_unproven_failures(grouped_failures.rechecks_completed, logger); + self.finalize_suspected_failures(grouped_failures.rechecks_completed, logger); } fn add_new_failures(&self, new_failures: Vec, logger: &Logger) { @@ -632,9 +645,12 @@ impl PendingPayableScanner { } if new_failures.is_empty() { + debug!(logger, "No reverted txs to process"); return; } + debug!(logger, "Processing reverted txs {:?}", new_failures); + let new_failures_btree_set: BTreeSet = new_failures.iter().cloned().collect(); if let Err(e) = self @@ -665,7 +681,7 @@ impl PendingPayableScanner { } } - fn finalize_unproven_failures(&self, rechecks_completed: Vec, logger: &Logger) { + fn finalize_suspected_failures(&self, rechecks_completed: Vec, logger: &Logger) { fn prepare_hashmap(rechecks_completed: &[TxHash]) -> HashMap { rechecks_completed .iter() @@ -674,9 +690,17 @@ impl PendingPayableScanner { } if rechecks_completed.is_empty() { + debug!(logger, "No recheck-requiring failures to finalize"); return; } + debug!( + logger, + "Finalizing {} double-checked failures", + rechecks_completed.len() + ); + trace!(logger, "{:?}", rechecks_completed); + match self .failed_payable_dao .update_statuses(&prepare_hashmap(&rechecks_completed)) @@ -830,7 +854,7 @@ impl PendingPayableScanner { debug!( logger, - "Found {} pending payables and {} unfinalized failures to process", + "Found {} pending payables and {} suspected failures to process", resolve_optional_vec(pending_tx_hashes_opt), resolve_optional_vec(failure_hashes_opt) ); @@ -905,7 +929,7 @@ mod tests { .build(); let logger = Logger::new("start_scan_fills_in_caches_and_returns_msg"); let pending_payable_cache_before = subject.current_sent_payables.dump_cache(); - let failed_payable_cache_before = subject.yet_unproven_failed_payables.dump_cache(); + let failed_payable_cache_before = subject.suspected_failed_payables.dump_cache(); let result = subject.start_scan(&make_wallet("blah"), SystemTime::now(), None, &logger); @@ -932,7 +956,7 @@ mod tests { failed_payable_cache_before ); let pending_payable_cache_after = subject.current_sent_payables.dump_cache(); - let failed_payable_cache_after = subject.yet_unproven_failed_payables.dump_cache(); + let failed_payable_cache_after = subject.suspected_failed_payables.dump_cache(); assert_eq!( pending_payable_cache_after, hashmap!(sent_tx_hash_1 => sent_tx_1, sent_tx_hash_2 => sent_tx_2) @@ -1040,7 +1064,7 @@ mod tests { failed_payable_cache.load_cache(vec![failed_tx_1, failed_tx_2]); let mut subject = PendingPayableScannerBuilder::new().build(); subject.current_sent_payables = Box::new(pending_payable_cache); - subject.yet_unproven_failed_payables = Box::new(failed_payable_cache); + subject.suspected_failed_payables = Box::new(failed_payable_cache); let logger = Logger::new("test"); let msg = TxReceiptsMessage { results: btreemap![TxHashByTable::SentPayable(sent_tx_hash_1) => Ok( @@ -1061,7 +1085,7 @@ mod tests { values. Pending payables: [SentTx { hash: 0x0000000000000000000000000000000000000000000000\ 000000000000000890, receiver_address: 0x0000000000000000000558000000000558000000, \ amount_minor: 43237380096, timestamp: 29942784, gas_price_minor: 94818816, nonce: 456, \ - status: Pending(Waiting) }]. Unproven failures: []."; + status: Pending(Waiting) }]. Suspected failures: []."; assert_eq!(panic_msg, expected); } @@ -1080,7 +1104,7 @@ mod tests { failed_payable_cache.load_cache(vec![failed_tx_1]); let mut subject = PendingPayableScannerBuilder::new().build(); subject.current_sent_payables = Box::new(pending_payable_cache); - subject.yet_unproven_failed_payables = Box::new(failed_payable_cache); + subject.suspected_failed_payables = Box::new(failed_payable_cache); let logger = Logger::new("test"); let msg = TxReceiptsMessage { results: btreemap![TxHashByTable::SentPayable(sent_tx_hash_1) => Ok(StatusReadFromReceiptCheck::Pending), @@ -1103,7 +1127,7 @@ mod tests { Pending(Waiting) }, SentTx { hash: 0x0000000000000000000000000000000000000000000000000000000\ 000000315, receiver_address: 0x000000000000000000093f00000000093f000000, amount_minor: \ 387532395441, timestamp: 89643024, gas_price_minor: 491169069, nonce: 789, status: \ - Pending(Waiting) }]. Unproven failures: []."; + Pending(Waiting) }]. Suspected failures: []."; assert_eq!(panic_msg, expected); } @@ -1720,7 +1744,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![sent_tx_1.clone(), sent_tx_2.clone()], }, &logger, @@ -1785,7 +1809,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![sent_tx_1.clone(), sent_tx_2.clone()], }, &Logger::new("test"), @@ -1832,7 +1856,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![sent_tx_1.clone(), sent_tx_2.clone()], }, &Logger::new("test"), @@ -1854,7 +1878,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![sent_tx.clone()], }, &Logger::new("test"), @@ -1862,9 +1886,9 @@ mod tests { } #[test] - fn handles_normal_confirmations_alone() { + fn handles_standard_confirmations_alone() { init_test_logging(); - let test_name = "handles_normal_confirmations_alone"; + let test_name = "handles_standard_confirmations_alone"; let transactions_confirmed_params_arc = Arc::new(Mutex::new(vec![])); let confirm_tx_params_arc = Arc::new(Mutex::new(vec![])); let payable_dao = PayableDaoMock::default() @@ -1905,7 +1929,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![sent_tx_1.clone(), sent_tx_2.clone()], + standard_confirmations: vec![sent_tx_1.clone(), sent_tx_2.clone()], reclaims: vec![], }, &logger, @@ -1981,7 +2005,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![sent_tx_1.clone()], + standard_confirmations: vec![sent_tx_1.clone()], reclaims: vec![sent_tx_2.clone()], }, &logger, @@ -2045,7 +2069,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![sent_tx_1, sent_tx_2], + standard_confirmations: vec![sent_tx_1, sent_tx_2], reclaims: vec![], }, &Logger::new("test"), @@ -2071,7 +2095,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![sent_tx], + standard_confirmations: vec![sent_tx], reclaims: vec![], }, &Logger::new("test"), @@ -2153,7 +2177,7 @@ mod tests { subject.handle_confirmed_transactions( DetectedConfirmations { - normal_confirmations: vec![sent_tx_1, sent_tx_2], + standard_confirmations: vec![sent_tx_1, sent_tx_2], reclaims: vec![sent_tx_3], }, &Logger::new(test_name), diff --git a/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs b/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs index fc16c5713..d04d458b4 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/tx_receipt_interpreter.rs @@ -31,6 +31,7 @@ impl TxReceiptInterpreter { pending_payable_scanner: &PendingPayableScanner, logger: &Logger, ) -> ReceiptScanReport { + debug!(logger, "Composing receipt scan report"); let scan_report = ReceiptScanReport::default(); tx_cases .into_iter() @@ -162,7 +163,7 @@ impl TxReceiptInterpreter { scan_report } - //TODO: failures handling might need enhancement suggested by GH-693 + //TODO: if wanted, address GH-693 for more detailed failures fn handle_reverted_tx( mut scan_report: ReceiptScanReport, tx: TxByTable, @@ -185,7 +186,7 @@ impl TxReceiptInterpreter { failed_tx.reason, ); - scan_report.register_finalization_of_unproven_failure(failed_tx.hash); + scan_report.register_finalization_of_suspected_failure(failed_tx.hash); } } scan_report @@ -279,7 +280,7 @@ mod tests { ReceiptScanReport { failures: DetectedFailures::default(), confirmations: DetectedConfirmations { - normal_confirmations: vec![updated_tx], + standard_confirmations: vec![updated_tx], reclaims: vec![] } } @@ -327,7 +328,7 @@ mod tests { ReceiptScanReport { failures: DetectedFailures::default(), confirmations: DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![sent_tx] } } diff --git a/node/src/accountant/scanners/pending_payable_scanner/utils.rs b/node/src/accountant/scanners/pending_payable_scanner/utils.rs index 7a1d18eaa..2b8052496 100644 --- a/node/src/accountant/scanners/pending_payable_scanner/utils.rs +++ b/node/src/accountant/scanners/pending_payable_scanner/utils.rs @@ -38,7 +38,9 @@ impl ReceiptScanReport { confirmation_type: ConfirmationType, ) { match confirmation_type { - ConfirmationType::Normal => self.confirmations.normal_confirmations.push(confirmed_tx), + ConfirmationType::Normal => { + self.confirmations.standard_confirmations.push(confirmed_tx) + } ConfirmationType::Reclaim => self.confirmations.reclaims.push(confirmed_tx), } } @@ -49,7 +51,7 @@ impl ReceiptScanReport { .push(PresortedTxFailure::NewEntry(failed_tx)); } - pub(super) fn register_finalization_of_unproven_failure(&mut self, tx_hash: TxHash) { + pub(super) fn register_finalization_of_suspected_failure(&mut self, tx_hash: TxHash) { self.failures .tx_failures .push(PresortedTxFailure::RecheckCompleted(tx_hash)); @@ -62,13 +64,13 @@ impl ReceiptScanReport { #[derive(Debug, Default, PartialEq, Eq, Clone)] pub struct DetectedConfirmations { - pub normal_confirmations: Vec, + pub standard_confirmations: Vec, pub reclaims: Vec, } impl DetectedConfirmations { pub(super) fn is_empty(&self) -> bool { - self.normal_confirmations.is_empty() && self.reclaims.is_empty() + self.standard_confirmations.is_empty() && self.reclaims.is_empty() } } @@ -410,7 +412,7 @@ mod tests { #[test] fn detected_confirmations_is_empty_works() { let subject = DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![], }; @@ -462,19 +464,19 @@ mod tests { ]; let detected_confirmations_feeding = vec![ DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![], }, DetectedConfirmations { - normal_confirmations: vec![make_sent_tx(456)], + standard_confirmations: vec![make_sent_tx(456)], reclaims: vec![make_sent_tx(999)], }, DetectedConfirmations { - normal_confirmations: vec![make_sent_tx(777)], + standard_confirmations: vec![make_sent_tx(777)], reclaims: vec![], }, DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![make_sent_tx(999)], }, ]; @@ -550,19 +552,19 @@ mod tests { ]; let detected_confirmations_feeding = vec![ DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![], }, DetectedConfirmations { - normal_confirmations: vec![make_sent_tx(777)], + standard_confirmations: vec![make_sent_tx(777)], reclaims: vec![make_sent_tx(999)], }, DetectedConfirmations { - normal_confirmations: vec![make_sent_tx(777)], + standard_confirmations: vec![make_sent_tx(777)], reclaims: vec![], }, DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![make_sent_tx(999)], }, ]; @@ -594,15 +596,15 @@ mod tests { fn requires_payments_retry_says_no() { let detected_confirmations_feeding = vec![ DetectedConfirmations { - normal_confirmations: vec![make_sent_tx(777)], + standard_confirmations: vec![make_sent_tx(777)], reclaims: vec![make_sent_tx(999)], }, DetectedConfirmations { - normal_confirmations: vec![make_sent_tx(777)], + standard_confirmations: vec![make_sent_tx(777)], reclaims: vec![], }, DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![make_sent_tx(999)], }, ]; @@ -638,7 +640,7 @@ mod tests { tx_receipt_rpc_failures: vec![], }, confirmations: DetectedConfirmations { - normal_confirmations: vec![], + standard_confirmations: vec![], reclaims: vec![], }, }; diff --git a/node/src/accountant/scanners/scan_schedulers.rs b/node/src/accountant/scanners/scan_schedulers.rs index dedfee1e5..5de151c73 100644 --- a/node/src/accountant/scanners/scan_schedulers.rs +++ b/node/src/accountant/scanners/scan_schedulers.rs @@ -14,6 +14,7 @@ use masq_lib::logger::Logger; use masq_lib::messages::ScanType; use masq_lib::simple_clock::{SimpleClock, SimpleClockReal}; use std::fmt::{Debug, Display, Formatter}; +use std::ops::Div; use std::time::{Duration, SystemTime, UNIX_EPOCH}; pub struct ScanSchedulers { @@ -50,28 +51,31 @@ pub enum ScanReschedulingAfterEarlyStop { } #[derive(Debug, PartialEq, Eq, Clone, Copy)] -pub enum PayableSequenceScanner { +pub enum UnableToStartScanner { NewPayables, RetryPayables, PendingPayables { initial_pending_payable_scan: bool }, + Receivables, } -impl Display for PayableSequenceScanner { +impl Display for UnableToStartScanner { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - PayableSequenceScanner::NewPayables => write!(f, "NewPayables"), - PayableSequenceScanner::RetryPayables => write!(f, "RetryPayables"), - PayableSequenceScanner::PendingPayables { .. } => write!(f, "PendingPayables"), + UnableToStartScanner::NewPayables => write!(f, "NewPayables"), + UnableToStartScanner::RetryPayables => write!(f, "RetryPayables"), + UnableToStartScanner::PendingPayables { .. } => write!(f, "PendingPayables"), + UnableToStartScanner::Receivables => write!(f, "Receivables"), } } } -impl From for ScanType { - fn from(scanner: PayableSequenceScanner) -> Self { +impl From for ScanType { + fn from(scanner: UnableToStartScanner) -> Self { match scanner { - PayableSequenceScanner::NewPayables => ScanType::Payables, - PayableSequenceScanner::RetryPayables => ScanType::Payables, - PayableSequenceScanner::PendingPayables { .. } => ScanType::PendingPayables, + UnableToStartScanner::NewPayables => ScanType::Payables, + UnableToStartScanner::RetryPayables => ScanType::Payables, + UnableToStartScanner::PendingPayables { .. } => ScanType::PendingPayables, + UnableToStartScanner::Receivables => ScanType::Receivables, } } } @@ -80,23 +84,25 @@ pub struct PayableScanScheduler { pub new_payable_notify_later: Box>, pub interval_computer: Box, pub new_payable_notify: Box>, - pub retry_payable_notify: Box>, + pub retry_payable_notify_later: Box>, + pub retry_payable_scan_interval: Duration, } impl PayableScanScheduler { - fn new(new_payable_interval: Duration) -> Self { + fn new(payable_scan_interval: Duration) -> Self { Self { new_payable_notify_later: Box::new(NotifyLaterHandleReal::default()), interval_computer: Box::new(NewPayableScanIntervalComputerReal::new( - new_payable_interval, + payable_scan_interval, )), new_payable_notify: Box::new(NotifyHandleReal::default()), - retry_payable_notify: Box::new(NotifyHandleReal::default()), + retry_payable_notify_later: Box::new(NotifyLaterHandleReal::default()), + retry_payable_scan_interval: payable_scan_interval.div(2), } } pub fn schedule_new_payable_scan(&self, ctx: &mut Context, logger: &Logger) { - if let ScanTiming::WaitFor(interval) = self.interval_computer.time_until_next_scan() { + if let ScanTiming::WaitFor(interval) = self.interval_computer.compute_time_to_next_scan() { debug!( logger, "Scheduling a new-payable scan in {}ms", @@ -122,7 +128,8 @@ impl PayableScanScheduler { } } - pub fn reset_scan_timer(&mut self) { + pub fn reset_scan_timer(&mut self, logger: &Logger) { + debug!(logger, "NewPayableScanIntervalComputer timer reset"); self.interval_computer.reset_last_scan_timestamp(); } @@ -136,18 +143,20 @@ impl PayableScanScheduler { logger: &Logger, ) { debug!(logger, "Scheduling a retry-payable scan asap"); + let delay = self.retry_payable_scan_interval; - self.retry_payable_notify.notify( + let _ = self.retry_payable_notify_later.notify_later( ScanForRetryPayables { response_skeleton_opt, }, + delay, ctx, - ) + ); } } pub trait NewPayableScanIntervalComputer { - fn time_until_next_scan(&self) -> ScanTiming; + fn compute_time_to_next_scan(&self) -> ScanTiming; fn reset_last_scan_timestamp(&mut self); @@ -161,7 +170,7 @@ pub struct NewPayableScanIntervalComputerReal { } impl NewPayableScanIntervalComputer for NewPayableScanIntervalComputerReal { - fn time_until_next_scan(&self) -> ScanTiming { + fn compute_time_to_next_scan(&self) -> ScanTiming { let current_time = self.clock.now(); let time_since_last_scan = current_time .duration_since(self.last_scan_timestamp) @@ -244,7 +253,7 @@ where pub trait RescheduleScanOnErrorResolver { fn resolve_rescheduling_on_error( &self, - scanner: PayableSequenceScanner, + scanner: UnableToStartScanner, error: &StartScanError, is_externally_triggered: bool, logger: &Logger, @@ -257,25 +266,28 @@ pub struct RescheduleScanOnErrorResolverReal {} impl RescheduleScanOnErrorResolver for RescheduleScanOnErrorResolverReal { fn resolve_rescheduling_on_error( &self, - scanner: PayableSequenceScanner, + scanner: UnableToStartScanner, error: &StartScanError, is_externally_triggered: bool, logger: &Logger, ) -> ScanReschedulingAfterEarlyStop { let reschedule_hint = match scanner { - PayableSequenceScanner::NewPayables => { + UnableToStartScanner::NewPayables => { Self::resolve_new_payables(error, is_externally_triggered) } - PayableSequenceScanner::RetryPayables => { + UnableToStartScanner::RetryPayables => { Self::resolve_retry_payables(error, is_externally_triggered) } - PayableSequenceScanner::PendingPayables { + UnableToStartScanner::PendingPayables { initial_pending_payable_scan, } => Self::resolve_pending_payables( error, initial_pending_payable_scan, is_externally_triggered, ), + UnableToStartScanner::Receivables => { + Self::resolve_receivables(error, is_externally_triggered) + } }; Self::log_rescheduling(scanner, is_externally_triggered, logger, &reschedule_hint); @@ -290,14 +302,18 @@ impl RescheduleScanOnErrorResolverReal { is_externally_triggered: bool, ) -> ScanReschedulingAfterEarlyStop { if is_externally_triggered { - ScanReschedulingAfterEarlyStop::DoNotSchedule - } else if matches!(err, StartScanError::ScanAlreadyRunning { .. }) { - unreachable!( - "an automatic scan of NewPayableScanner should never interfere with itself {:?}", - err - ) - } else { - ScanReschedulingAfterEarlyStop::Schedule(ScanType::Payables) + return ScanReschedulingAfterEarlyStop::DoNotSchedule; + } + + match err { + StartScanError::CalledFromNullScanner => ScanReschedulingAfterEarlyStop::DoNotSchedule, + StartScanError::ScanAlreadyRunning { .. } => { + unreachable!( + "an automatic scan of NewPayableScanner should never interfere with itself {:?}", + err + ) + } + _ => ScanReschedulingAfterEarlyStop::Schedule(ScanType::Payables), } } @@ -324,18 +340,26 @@ impl RescheduleScanOnErrorResolverReal { is_externally_triggered: bool, ) -> ScanReschedulingAfterEarlyStop { if is_externally_triggered { - ScanReschedulingAfterEarlyStop::DoNotSchedule - } else if err == &StartScanError::NothingToProcess { - if initial_pending_payable_scan { + return ScanReschedulingAfterEarlyStop::DoNotSchedule; + } + + match err { + StartScanError::NothingToProcess => { + if !initial_pending_payable_scan { + unreachable!( + "the automatic pending payable scan should always be requested only in need, \ + which contradicts the current StartScanError::NothingToProcess" + ) + } ScanReschedulingAfterEarlyStop::Schedule(ScanType::Payables) - } else { - unreachable!( - "the automatic pending payable scan should always be requested only in need, \ - which contradicts the current StartScanError::NothingToProcess" - ) } - } else if err == &StartScanError::NoConsumingWalletFound { - if initial_pending_payable_scan { + StartScanError::NoConsumingWalletFound => { + if !initial_pending_payable_scan { + unreachable!( + "PendingPayableScanner called later than the initial attempt, but \ + the consuming wallet is still missing; this should not be possible" + ) + } // Cannot deduce there are strayed pending payables from the previous Node's run // (StartScanError::NoConsumingWalletFound is thrown before // StartScanError::NothingToProcess can be evaluated); but may be cautious and @@ -344,22 +368,34 @@ impl RescheduleScanOnErrorResolverReal { // TODO Correctly, a check-point during the bootstrap, not allowing to come // this far, should be the solution. Part of the issue mentioned in GH-799 ScanReschedulingAfterEarlyStop::Schedule(ScanType::PendingPayables) - } else { - unreachable!( - "PendingPayableScanner called later than the initial attempt, but \ - the consuming wallet is still missing; this should not be possible" - ) } - } else { - unreachable!( + StartScanError::CalledFromNullScanner => ScanReschedulingAfterEarlyStop::DoNotSchedule, + _ => unreachable!( "{:?} should be impossible with PendingPayableScanner in automatic mode", err - ) + ), + } + } + + fn resolve_receivables( + err: &StartScanError, + is_externally_triggered: bool, + ) -> ScanReschedulingAfterEarlyStop { + if is_externally_triggered { + return ScanReschedulingAfterEarlyStop::DoNotSchedule; + } + + match err { + StartScanError::CalledFromNullScanner => ScanReschedulingAfterEarlyStop::DoNotSchedule, + _ => unreachable!( + "{:?} should be impossible with ReceivableScanner in automatic mode", + err + ), } } fn log_rescheduling( - scanner: PayableSequenceScanner, + scanner: UnableToStartScanner, is_externally_triggered: bool, logger: &Logger, reschedule_hint: &ScanReschedulingAfterEarlyStop, @@ -383,8 +419,8 @@ impl RescheduleScanOnErrorResolverReal { #[cfg(test)] mod tests { use crate::accountant::scanners::scan_schedulers::{ - NewPayableScanIntervalComputer, NewPayableScanIntervalComputerReal, PayableSequenceScanner, - ScanReschedulingAfterEarlyStop, ScanSchedulers, ScanTiming, + NewPayableScanIntervalComputer, NewPayableScanIntervalComputerReal, + ScanReschedulingAfterEarlyStop, ScanSchedulers, ScanTiming, UnableToStartScanner, }; use crate::accountant::scanners::test_utils::NewPayableScanIntervalComputerMock; use crate::accountant::scanners::{ManulTriggerError, StartScanError}; @@ -463,7 +499,7 @@ mod tests { subject.scan_interval = standard_interval; subject.last_scan_timestamp = past_instant; - let result = subject.time_until_next_scan(); + let result = subject.compute_time_to_next_scan(); assert_eq!( result, @@ -498,7 +534,7 @@ mod tests { subject.scan_interval = standard_interval; subject.last_scan_timestamp = past_instant; - let result = subject.time_until_next_scan(); + let result = subject.compute_time_to_next_scan(); assert_eq!( result, @@ -518,7 +554,7 @@ mod tests { subject.scan_interval = Duration::from_secs(180); subject.clock = Box::new(SimpleClockMock::default().now_result(now)); - let result = subject.time_until_next_scan(); + let result = subject.compute_time_to_next_scan(); assert_eq!( result, @@ -564,7 +600,7 @@ mod tests { subject.clock = Box::new(SimpleClockMock::default().now_result(now)); subject.last_scan_timestamp = now.checked_add(Duration::from_secs(1)).unwrap(); - let _ = subject.time_until_next_scan(); + let _ = subject.compute_time_to_next_scan(); } #[test] @@ -612,7 +648,7 @@ mod tests { .reset_last_scan_timestamp_params(&reset_last_scan_timestamp_params_arc), ); - subject.payable.reset_scan_timer(); + subject.payable.reset_scan_timer(&Logger::new("test")); let reset_last_scan_timestamp_params = reset_last_scan_timestamp_params_arc.lock().unwrap(); assert_eq!(*reset_last_scan_timestamp_params, vec![()]) @@ -642,7 +678,6 @@ mod tests { StartScanError::CalledFromNullScanner ]; - let mut check_vec = candidates .iter() .fold(vec![],|mut acc, current|{ @@ -703,14 +738,14 @@ mod tests { test_what_if_externally_triggered( &format!("{}(initial_pending_payable_scan = false)", test_name), &subject, - PayableSequenceScanner::PendingPayables { + UnableToStartScanner::PendingPayables { initial_pending_payable_scan: false, }, ); test_what_if_externally_triggered( &format!("{}(initial_pending_payable_scan = true)", test_name), &subject, - PayableSequenceScanner::PendingPayables { + UnableToStartScanner::PendingPayables { initial_pending_payable_scan: true, }, ); @@ -719,7 +754,7 @@ mod tests { fn test_what_if_externally_triggered( test_name: &str, subject: &ScanSchedulers, - scanner: PayableSequenceScanner, + scanner: UnableToStartScanner, ) { init_test_logging(); let logger = Logger::new(test_name); @@ -749,17 +784,16 @@ mod tests { } #[test] - fn resolve_error_for_pending_payables_if_nothing_to_process_and_initial_pending_payable_scan_true( - ) { + fn resolve_error_if_nothing_to_process_and_initial_pending_payable_scan_true() { init_test_logging(); let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); - let test_name = "resolve_error_for_pending_payables_if_nothing_to_process_and_initial_pending_payable_scan_true"; + let test_name = "resolve_error_if_nothing_to_process_and_initial_pending_payable_scan_true"; let logger = Logger::new(test_name); let result = subject .reschedule_on_error_resolver .resolve_rescheduling_on_error( - PayableSequenceScanner::PendingPayables { + UnableToStartScanner::PendingPayables { initial_pending_payable_scan: true, }, &StartScanError::NothingToProcess, @@ -792,7 +826,7 @@ mod tests { let _ = subject .reschedule_on_error_resolver .resolve_rescheduling_on_error( - PayableSequenceScanner::PendingPayables { + UnableToStartScanner::PendingPayables { initial_pending_payable_scan: false, }, &StartScanError::NothingToProcess, @@ -802,12 +836,13 @@ mod tests { } #[test] - fn resolve_error_for_pending_p_if_no_consuming_wallet_found_in_initial_pending_payable_scan() { + fn resolve_error_if_no_consuming_wallet_found_in_initial_pending_payable_scan() { init_test_logging(); - let test_name = "resolve_error_for_pending_p_if_no_consuming_wallet_found_in_initial_pending_payable_scan"; + let test_name = + "resolve_error_if_no_consuming_wallet_found_in_initial_pending_payable_scan"; let logger = Logger::new(test_name); let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); - let scanner = PayableSequenceScanner::PendingPayables { + let scanner = UnableToStartScanner::PendingPayables { initial_pending_payable_scan: true, }; @@ -839,9 +874,10 @@ mod tests { than the initial attempt, but the consuming wallet is still missing; this should not be \ possible" )] - fn pending_p_scan_attempt_if_no_consuming_wallet_found_mustnt_happen_if_not_initial_scan() { + fn pending_payable_scan_attempt_if_no_consuming_wallet_found_mustnt_happen_if_not_initial_scan() + { let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); - let scanner = PayableSequenceScanner::PendingPayables { + let scanner = UnableToStartScanner::PendingPayables { initial_pending_payable_scan: false, }; @@ -855,6 +891,20 @@ mod tests { ); } + #[test] + fn resolve_error_for_pending_payables_if_null_scanner_is_used_in_automatic_mode() { + test_resolve_error_if_null_scanner_is_used_in_automatic_mode( + UnableToStartScanner::PendingPayables { + initial_pending_payable_scan: true, + }, + ); + test_resolve_error_if_null_scanner_is_used_in_automatic_mode( + UnableToStartScanner::PendingPayables { + initial_pending_payable_scan: false, + }, + ); + } + #[test] fn resolve_error_for_pending_payables_forbidden_states() { fn test_forbidden_states( @@ -867,7 +917,7 @@ mod tests { subject .reschedule_on_error_resolver .resolve_rescheduling_on_error( - PayableSequenceScanner::PendingPayables { + UnableToStartScanner::PendingPayables { initial_pending_payable_scan, }, *error, @@ -894,6 +944,7 @@ mod tests { let inputs = ListOfStartScanErrors::default().eliminate_already_tested_variants(vec![ StartScanError::NothingToProcess, StartScanError::NoConsumingWalletFound, + StartScanError::CalledFromNullScanner, ]); let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); @@ -910,7 +961,7 @@ mod tests { test_what_if_externally_triggered( test_name, &subject, - PayableSequenceScanner::RetryPayables {}, + UnableToStartScanner::RetryPayables {}, ); } @@ -923,7 +974,7 @@ mod tests { subject .reschedule_on_error_resolver .resolve_rescheduling_on_error( - PayableSequenceScanner::RetryPayables, + UnableToStartScanner::RetryPayables, error, false, &Logger::new("test"), @@ -946,15 +997,14 @@ mod tests { } #[test] - fn resolve_rescheduling_on_error_works_for_new_payables_if_externally_triggered() { - let test_name = - "resolve_rescheduling_on_error_works_for_new_payables_if_externally_triggered"; + fn resolve_rescheduling_on_error_for_new_payables_if_externally_triggered() { + let test_name = "resolve_rescheduling_on_error_for_new_payables_if_externally_triggered"; let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); test_what_if_externally_triggered( test_name, &subject, - PayableSequenceScanner::NewPayables {}, + UnableToStartScanner::NewPayables {}, ); } @@ -963,13 +1013,13 @@ mod tests { expected = "internal error: entered unreachable code: an automatic scan of NewPayableScanner \ should never interfere with itself ScanAlreadyRunning { cross_scan_cause_opt: None, started_at:" )] - fn resolve_hint_for_new_payables_if_scan_is_already_running_error_and_is_automatic_scan() { + fn resolve_hint_for_new_payables_error_if_scan_is_already_running_in_automatic_scan() { let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); let _ = subject .reschedule_on_error_resolver .resolve_rescheduling_on_error( - PayableSequenceScanner::NewPayables, + UnableToStartScanner::NewPayables, &StartScanError::ScanAlreadyRunning { cross_scan_cause_opt: None, started_at: SystemTime::now(), @@ -979,10 +1029,18 @@ mod tests { ); } + #[test] + fn resolve_error_for_new_payables_if_null_scanner_is_used_in_automatic_mode() { + test_resolve_error_if_null_scanner_is_used_in_automatic_mode( + UnableToStartScanner::NewPayables, + ); + } + #[test] fn resolve_new_payables_with_error_cases_resulting_in_future_rescheduling() { let test_name = "resolve_new_payables_with_error_cases_resulting_in_future_rescheduling"; let inputs = ListOfStartScanErrors::default().eliminate_already_tested_variants(vec![ + StartScanError::CalledFromNullScanner, StartScanError::ScanAlreadyRunning { cross_scan_cause_opt: None, started_at: SystemTime::now(), @@ -996,7 +1054,7 @@ mod tests { let result = subject .reschedule_on_error_resolver .resolve_rescheduling_on_error( - PayableSequenceScanner::NewPayables, + UnableToStartScanner::NewPayables, *error, false, &logger, @@ -1015,27 +1073,100 @@ mod tests { }) } + #[test] + fn resolve_rescheduling_on_error_for_receivables_if_externally_triggered() { + let test_name = "resolve_rescheduling_on_error_for_receivables_if_externally_triggered"; + let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); + + test_what_if_externally_triggered(test_name, &subject, UnableToStartScanner::Receivables); + } + + #[test] + fn resolve_error_for_receivables_if_null_scanner_is_used_in_automatic_mode() { + test_resolve_error_if_null_scanner_is_used_in_automatic_mode( + UnableToStartScanner::Receivables, + ); + } + + #[test] + fn resolve_error_for_receivables_all_fatal_cases_in_automatic_mode() { + let inputs = ListOfStartScanErrors::default() + .eliminate_already_tested_variants(vec![StartScanError::CalledFromNullScanner]); + let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); + + inputs.errors.iter().for_each(|error| { + let panic = catch_unwind(AssertUnwindSafe(|| { + subject + .reschedule_on_error_resolver + .resolve_rescheduling_on_error( + UnableToStartScanner::Receivables, + *error, + false, + &Logger::new("test"), + ) + })) + .unwrap_err(); + + let panic_msg = panic.downcast_ref::().unwrap(); + let expected_msg = format!( + "internal error: entered unreachable code: {:?} should be impossible with \ + ReceivableScanner in automatic mode", + error + ); + assert_eq!( + panic_msg, &expected_msg, + "We expected '{}' but got '{}'", + expected_msg, panic_msg + ) + }) + } + #[test] fn conversion_between_hintable_scanner_and_scan_type_works() { assert_eq!( - ScanType::from(PayableSequenceScanner::NewPayables), + ScanType::from(UnableToStartScanner::NewPayables), ScanType::Payables ); assert_eq!( - ScanType::from(PayableSequenceScanner::RetryPayables), + ScanType::from(UnableToStartScanner::RetryPayables), ScanType::Payables ); assert_eq!( - ScanType::from(PayableSequenceScanner::PendingPayables { + ScanType::from(UnableToStartScanner::PendingPayables { initial_pending_payable_scan: false }), ScanType::PendingPayables ); assert_eq!( - ScanType::from(PayableSequenceScanner::PendingPayables { + ScanType::from(UnableToStartScanner::PendingPayables { initial_pending_payable_scan: true }), ScanType::PendingPayables ); + assert_eq!( + ScanType::from(UnableToStartScanner::Receivables), + ScanType::Receivables + ) + } + + fn test_resolve_error_if_null_scanner_is_used_in_automatic_mode(scanner: UnableToStartScanner) { + let subject = ScanSchedulers::new(*TEST_SCAN_INTERVALS, true); + + let result = subject + .reschedule_on_error_resolver + .resolve_rescheduling_on_error( + scanner, + &StartScanError::CalledFromNullScanner, + false, + &Logger::new("test"), + ); + + assert_eq!( + result, + ScanReschedulingAfterEarlyStop::DoNotSchedule, + "We expected DoNotSchedule but got {:?} for {:?}", + result, + scanner + ); } } diff --git a/node/src/accountant/scanners/test_utils.rs b/node/src/accountant/scanners/test_utils.rs index 08325dedc..031e1bb7d 100644 --- a/node/src/accountant/scanners/test_utils.rs +++ b/node/src/accountant/scanners/test_utils.rs @@ -18,8 +18,8 @@ use crate::accountant::scanners::pending_payable_scanner::{ CachesEmptiableScanner, ExtendedPendingPayablePrivateScanner, }; use crate::accountant::scanners::scan_schedulers::{ - NewPayableScanIntervalComputer, PayableSequenceScanner, RescheduleScanOnErrorResolver, - ScanReschedulingAfterEarlyStop, ScanTiming, + NewPayableScanIntervalComputer, RescheduleScanOnErrorResolver, ScanReschedulingAfterEarlyStop, + ScanTiming, UnableToStartScanner, }; use crate::accountant::scanners::{ PendingPayableScanner, PrivateScanner, RealScannerMarker, ReceivableScanner, Scanner, @@ -336,15 +336,20 @@ impl ScannerMockMarker for ScannerMock>>, - time_until_next_scan_results: RefCell>, + compute_time_to_next_scan_params: Arc>>, + compute_time_to_next_scan_results: RefCell>, reset_last_scan_timestamp_params: Arc>>, } impl NewPayableScanIntervalComputer for NewPayableScanIntervalComputerMock { - fn time_until_next_scan(&self) -> ScanTiming { - self.time_until_next_scan_params.lock().unwrap().push(()); - self.time_until_next_scan_results.borrow_mut().remove(0) + fn compute_time_to_next_scan(&self) -> ScanTiming { + self.compute_time_to_next_scan_params + .lock() + .unwrap() + .push(()); + self.compute_time_to_next_scan_results + .borrow_mut() + .remove(0) } fn reset_last_scan_timestamp(&mut self) { @@ -358,13 +363,15 @@ impl NewPayableScanIntervalComputer for NewPayableScanIntervalComputerMock { } impl NewPayableScanIntervalComputerMock { - pub fn time_until_next_scan_params(mut self, params: &Arc>>) -> Self { - self.time_until_next_scan_params = params.clone(); + pub fn compute_time_to_next_scan_params(mut self, params: &Arc>>) -> Self { + self.compute_time_to_next_scan_params = params.clone(); self } - pub fn time_until_next_scan_result(self, result: ScanTiming) -> Self { - self.time_until_next_scan_results.borrow_mut().push(result); + pub fn compute_time_to_next_scan_result(self, result: ScanTiming) -> Self { + self.compute_time_to_next_scan_results + .borrow_mut() + .push(result); self } @@ -470,14 +477,14 @@ pub fn assert_timestamps_from_str(examined_str: &str, expected_timestamps: Vec>>, + Arc>>, resolve_rescheduling_on_error_results: RefCell>, } impl RescheduleScanOnErrorResolver for RescheduleScanOnErrorResolverMock { fn resolve_rescheduling_on_error( &self, - scanner: PayableSequenceScanner, + scanner: UnableToStartScanner, error: &StartScanError, is_externally_triggered: bool, logger: &Logger, @@ -500,7 +507,7 @@ impl RescheduleScanOnErrorResolver for RescheduleScanOnErrorResolverMock { impl RescheduleScanOnErrorResolverMock { pub fn resolve_rescheduling_on_error_params( mut self, - params: &Arc>>, + params: &Arc>>, ) -> Self { self.resolve_rescheduling_on_error_params = params.clone(); self diff --git a/node/src/accountant/test_utils.rs b/node/src/accountant/test_utils.rs index b460363ee..9e3e06575 100644 --- a/node/src/accountant/test_utils.rs +++ b/node/src/accountant/test_utils.rs @@ -1281,7 +1281,7 @@ pub struct PendingPayableScannerBuilder { payment_thresholds: PaymentThresholds, financial_statistics: FinancialStatistics, current_sent_payables: Box>, - yet_unproven_failed_payables: Box>, + suspected_failed_payables: Box>, clock: Box, } @@ -1294,7 +1294,7 @@ impl PendingPayableScannerBuilder { payment_thresholds: PaymentThresholds::default(), financial_statistics: FinancialStatistics::default(), current_sent_payables: Box::new(PendingPayableCacheMock::default()), - yet_unproven_failed_payables: Box::new(PendingPayableCacheMock::default()), + suspected_failed_payables: Box::new(PendingPayableCacheMock::default()), clock: Box::new(SimpleClockMock::default()), } } @@ -1323,7 +1323,7 @@ impl PendingPayableScannerBuilder { mut self, failures: Box>, ) -> Self { - self.yet_unproven_failed_payables = failures; + self.suspected_failed_payables = failures; self } @@ -1341,7 +1341,7 @@ impl PendingPayableScannerBuilder { Rc::new(RefCell::new(self.financial_statistics)), ); scanner.current_sent_payables = self.current_sent_payables; - scanner.yet_unproven_failed_payables = self.yet_unproven_failed_payables; + scanner.suspected_failed_payables = self.suspected_failed_payables; scanner.clock = self.clock; scanner } diff --git a/node/src/blockchain/blockchain_agent/agent_web3.rs b/node/src/blockchain/blockchain_agent/agent_web3.rs index 66df08d57..4446642cd 100644 --- a/node/src/blockchain/blockchain_agent/agent_web3.rs +++ b/node/src/blockchain/blockchain_agent/agent_web3.rs @@ -118,11 +118,13 @@ mod tests { BlockchainAgentWeb3, WEB3_MAXIMAL_GAS_LIMIT_MARGIN, }; use crate::blockchain::blockchain_agent::BlockchainAgent; - use crate::blockchain::blockchain_bridge::increase_gas_price_by_margin; + use crate::blockchain::blockchain_bridge::increase_by_percentage; use crate::test_utils::make_wallet; use itertools::{Either, Itertools}; use masq_lib::blockchains::chains::Chain; - use masq_lib::constants::DEFAULT_GAS_PRICE_MARGIN; + use masq_lib::constants::{ + DEFAULT_GAS_PRICE_RETRY_CONSTANT, DEFAULT_GAS_PRICE_RETRY_PERCENTAGE, + }; use masq_lib::logger::Logger; use masq_lib::test_utils::logging::{init_test_logging, TestLogHandler}; use masq_lib::test_utils::utils::TEST_DEFAULT_CHAIN; @@ -155,7 +157,7 @@ mod tests { let result = subject.price_qualified_payables(Either::Left(new_tx_templates.clone())); - let gas_price_with_margin_wei = increase_gas_price_by_margin(rpc_gas_price_wei); + let gas_price_with_margin_wei = increase_by_percentage(rpc_gas_price_wei); let expected_result = Either::Left(PricedNewTxTemplates::new( new_tx_templates, gas_price_with_margin_wei, @@ -170,30 +172,32 @@ mod tests { let test_name = "returns_correct_priced_qualified_payables_for_retry_payable_scan"; let consuming_wallet = make_wallet("efg"); let consuming_wallet_balances = make_zeroed_consuming_wallet_balances(); - let rpc_gas_price_wei = 444_555_666; + let latest_gas_price_wei = 444_555_666; let chain = TEST_DEFAULT_CHAIN; + let prev_gas_prices = vec![ + latest_gas_price_wei - 1, + latest_gas_price_wei, + latest_gas_price_wei + 1, + latest_gas_price_wei - 123_456, + latest_gas_price_wei + 456_789, + ]; let retry_tx_templates: Vec = { - vec![ - rpc_gas_price_wei - 1, - rpc_gas_price_wei, - rpc_gas_price_wei + 1, - rpc_gas_price_wei - 123_456, - rpc_gas_price_wei + 456_789, - ] - .into_iter() - .enumerate() - .map(|(idx, prev_gas_price_wei)| { - let account = make_payable_account((idx as u64 + 1) * 3_000); - RetryTxTemplate { - base: BaseTxTemplate::from(&account), - prev_gas_price_wei, - prev_nonce: idx as u64, - } - }) - .collect_vec() + prev_gas_prices + .iter() + .copied() + .enumerate() + .map(|(idx, prev_gas_price_wei)| { + let account = make_payable_account((idx as u64 + 1) * 3_000); + RetryTxTemplate { + base: BaseTxTemplate::from(&account), + prev_gas_price_wei, + prev_nonce: idx as u64, + } + }) + .collect_vec() }; let mut subject = BlockchainAgentWeb3::new( - rpc_gas_price_wei, + latest_gas_price_wei, 77_777, consuming_wallet, consuming_wallet_balances, @@ -205,23 +209,27 @@ mod tests { .price_qualified_payables(Either::Right(RetryTxTemplates(retry_tx_templates.clone()))); let expected_result = { - let price_wei_for_accounts_from_1_to_5 = vec![ - increase_gas_price_by_margin(rpc_gas_price_wei), - increase_gas_price_by_margin(rpc_gas_price_wei), - increase_gas_price_by_margin(rpc_gas_price_wei + 1), - increase_gas_price_by_margin(rpc_gas_price_wei), - increase_gas_price_by_margin(rpc_gas_price_wei + 456_789), - ]; - if price_wei_for_accounts_from_1_to_5.len() != retry_tx_templates.len() { + let computed_gas_prices = prev_gas_prices + .into_iter() + .map(|prev_gas_price_wei| { + PricedRetryTxTemplate::compute_gas_price( + latest_gas_price_wei, + prev_gas_price_wei, + ) + }) + .collect::>(); + if computed_gas_prices.len() != retry_tx_templates.len() { panic!("Corrupted test") } - Either::Right(PricedRetryTxTemplates( retry_tx_templates .iter() - .zip(price_wei_for_accounts_from_1_to_5.into_iter()) - .map(|(retry_tx_template, increased_gas_price)| { - PricedRetryTxTemplate::new(retry_tx_template.clone(), increased_gas_price) + .zip(computed_gas_prices.into_iter()) + .map(|(retry_tx_template, computed_gas_price_wei)| { + PricedRetryTxTemplate::new( + retry_tx_template.clone(), + computed_gas_price_wei, + ) }) .collect_vec(), )) @@ -238,9 +246,10 @@ mod tests { // This should be the value that would surplus the ceiling just slightly if the margin is // applied. // Adding just 1 didn't work, therefore 2 - let rpc_gas_price_wei = - ((ceiling_gas_price_wei * 100) / (DEFAULT_GAS_PRICE_MARGIN as u128 + 100)) + 2; - let check_value_wei = increase_gas_price_by_margin(rpc_gas_price_wei); + let rpc_gas_price_wei = ((ceiling_gas_price_wei * 100) + / (DEFAULT_GAS_PRICE_RETRY_PERCENTAGE as u128 + 100)) + + 2; + let check_value_wei = increase_by_percentage(rpc_gas_price_wei); test_gas_price_must_not_break_through_ceiling_value_in_the_new_payable_mode( test_name, @@ -340,8 +349,8 @@ mod tests { // applied. // Adding just 1 didn't work, therefore 2 let rpc_gas_price_wei = - (ceiling_gas_price_wei * 100) / (DEFAULT_GAS_PRICE_MARGIN as u128 + 100) + 2; - let check_value_wei = increase_gas_price_by_margin(rpc_gas_price_wei); + (ceiling_gas_price_wei * 100) / (DEFAULT_GAS_PRICE_RETRY_PERCENTAGE as u128 + 100) + 2; + let check_value_wei = increase_by_percentage(rpc_gas_price_wei); let template_1 = RetryTxTemplateBuilder::new() .payable_account(&account_1) .prev_gas_price_wei(rpc_gas_price_wei - 1) @@ -382,18 +391,17 @@ mod tests { let account_1 = make_payable_account(12); let account_2 = make_payable_account(34); let ceiling_gas_price_wei = chain.rec().gas_price_safe_ceiling_minor; - // This should be the value that would surplus the ceiling just slightly if the margin is applied - let border_gas_price_wei = - (ceiling_gas_price_wei * 100) / (DEFAULT_GAS_PRICE_MARGIN as u128 + 100) + 2; - let rpc_gas_price_wei = border_gas_price_wei - 1; - let check_value_wei = increase_gas_price_by_margin(border_gas_price_wei); + // Once the gas price is computed from latest and prev gas price values, it'll break the ceiling + let prev_gas_price_wei = ceiling_gas_price_wei + 1 - DEFAULT_GAS_PRICE_RETRY_CONSTANT; + let latest_gas_price_wei = prev_gas_price_wei - 1; + let check_value_wei = prev_gas_price_wei + DEFAULT_GAS_PRICE_RETRY_CONSTANT; let template_1 = RetryTxTemplateBuilder::new() .payable_account(&account_1) - .prev_gas_price_wei(border_gas_price_wei) + .prev_gas_price_wei(prev_gas_price_wei) .build(); let template_2 = RetryTxTemplateBuilder::new() .payable_account(&account_2) - .prev_gas_price_wei(border_gas_price_wei) + .prev_gas_price_wei(prev_gas_price_wei) .build(); let retry_tx_templates = vec![template_1, template_2]; let expected_log_msg = format!( @@ -406,7 +414,7 @@ mod tests { test_gas_price_must_not_break_through_ceiling_value_in_the_retry_payable_mode( test_name, chain, - rpc_gas_price_wei, + latest_gas_price_wei, Either::Right(RetryTxTemplates(retry_tx_templates)), &expected_log_msg, ); @@ -466,8 +474,8 @@ mod tests { let expected_log_msg = format!( "The computed gas price(s) in wei is above the ceil value of 50,000,000,000 wei computed by this Node.\n\ Transaction(s) to following receivers are affected:\n\ - 0x00000000000000000000000077616c6c65743132 with gas price 64,999,999,998\n\ - 0x00000000000000000000000077616c6c65743334 with gas price 64,999,999,997" + 0x00000000000000000000000077616c6c65743132 with gas price 50,000,004,999\n\ + 0x00000000000000000000000077616c6c65743334 with gas price 50,000,004,998" ); test_gas_price_must_not_break_through_ceiling_value_in_the_retry_payable_mode( @@ -602,8 +610,7 @@ mod tests { assert_eq!( result, - (2 * (77_777 + WEB3_MAXIMAL_GAS_LIMIT_MARGIN)) - * increase_gas_price_by_margin(444_555_666) + (2 * (77_777 + WEB3_MAXIMAL_GAS_LIMIT_MARGIN)) * increase_by_percentage(444_555_666) ); } @@ -611,30 +618,32 @@ mod tests { fn estimate_transaction_fee_total_works_for_retry_txs() { let consuming_wallet = make_wallet("efg"); let consuming_wallet_balances = make_zeroed_consuming_wallet_balances(); - let rpc_gas_price_wei = 444_555_666; + let latest_gas_price_wei = 444_555_666; let chain = TEST_DEFAULT_CHAIN; + let prev_gas_prices = vec![ + latest_gas_price_wei - 1, + latest_gas_price_wei, + latest_gas_price_wei + 1, + latest_gas_price_wei - 123_456, + latest_gas_price_wei + 456_789, + ]; let retry_tx_templates: Vec = { - vec![ - rpc_gas_price_wei - 1, - rpc_gas_price_wei, - rpc_gas_price_wei + 1, - rpc_gas_price_wei - 123_456, - rpc_gas_price_wei + 456_789, - ] - .into_iter() - .enumerate() - .map(|(idx, prev_gas_price_wei)| { - let account = make_payable_account((idx as u64 + 1) * 3_000); - RetryTxTemplate { - base: BaseTxTemplate::from(&account), - prev_gas_price_wei, - prev_nonce: idx as u64, - } - }) - .collect() + prev_gas_prices + .iter() + .copied() + .enumerate() + .map(|(idx, prev_gas_price_wei)| { + let account = make_payable_account((idx as u64 + 1) * 3_000); + RetryTxTemplate { + base: BaseTxTemplate::from(&account), + prev_gas_price_wei, + prev_nonce: idx as u64, + } + }) + .collect() }; let subject = BlockchainAgentWeb3::new( - rpc_gas_price_wei, + latest_gas_price_wei, 77_777, consuming_wallet, consuming_wallet_balances, @@ -645,15 +654,11 @@ mod tests { let result = subject.estimate_transaction_fee_total(&priced_qualified_payables); - let gas_prices_for_accounts_from_1_to_5 = vec![ - increase_gas_price_by_margin(rpc_gas_price_wei), - increase_gas_price_by_margin(rpc_gas_price_wei), - increase_gas_price_by_margin(rpc_gas_price_wei + 1), - increase_gas_price_by_margin(rpc_gas_price_wei), - increase_gas_price_by_margin(rpc_gas_price_wei + 456_789), - ]; - let expected_result = gas_prices_for_accounts_from_1_to_5 + let expected_result = prev_gas_prices .into_iter() + .map(|prev_gas_price_wei| { + PricedRetryTxTemplate::compute_gas_price(latest_gas_price_wei, prev_gas_price_wei) + }) .sum::() * (77_777 + WEB3_MAXIMAL_GAS_LIMIT_MARGIN); assert_eq!(result, expected_result) diff --git a/node/src/blockchain/blockchain_bridge.rs b/node/src/blockchain/blockchain_bridge.rs index 183f58659..0d13ead5d 100644 --- a/node/src/blockchain/blockchain_bridge.rs +++ b/node/src/blockchain/blockchain_bridge.rs @@ -39,7 +39,7 @@ use actix::{Addr, Recipient}; use futures::Future; use itertools::{Either, Itertools}; use masq_lib::blockchains::chains::Chain; -use masq_lib::constants::DEFAULT_GAS_PRICE_MARGIN; +use masq_lib::constants::DEFAULT_GAS_PRICE_RETRY_PERCENTAGE; use masq_lib::logger::Logger; use masq_lib::ui_gateway::NodeFromUiMessage; use regex::Regex; @@ -312,28 +312,32 @@ impl BlockchainBridge { Box::new( self.process_payments(msg.agent, msg.priced_templates) - .map_err(move |e: LocalPayableError| { - sent_payable_subs_err - .try_send(SentPayables { - payment_procedure_result: Self::payment_procedure_result_from_error( - e.clone(), - ), - payable_scan_type, - response_skeleton_opt: skeleton_opt, - }) - .expect("Accountant is dead"); - - format!("ReportAccountsPayable: {}", e) - }) - .and_then(move |batch_results| { - sent_payable_subs_success - .try_send(SentPayables { - payment_procedure_result: Ok(batch_results), - payable_scan_type, - response_skeleton_opt: skeleton_opt, - }) - .expect("Accountant is dead"); - + .then(move |result| { + match result { + Ok(batch_results) => { + sent_payable_subs_success + .try_send(SentPayables { + payment_procedure_result: Ok(batch_results), + payable_scan_type, + response_skeleton_opt: skeleton_opt, + }) + .expect("Accountant is dead"); + } + Err(e) => { + sent_payable_subs_err + .try_send(SentPayables { + payment_procedure_result: + Self::payment_procedure_result_from_error(e), + payable_scan_type, + response_skeleton_opt: skeleton_opt, + }) + .expect("Accountant is dead"); + } + } + // TODO Temporary workaround: prevents the Accountant from receiving two messages + // describing the same error. Duplicate notifications could previously trigger + // a panic in the scanners, because the substitution call for a given scanner + // was executed twice and tripped the guard that enforces a single concurrent scan. Ok(()) }), ) @@ -509,8 +513,8 @@ impl BlockchainBridge { pub fn extract_max_block_count(error: BlockchainInterfaceError) -> Option { let regex_result = - Regex::new(r".* (max: |allowed for your plan: |is limited to |block range limit \(|exceeds max block range )(?P\d+).*") - .expect("Invalid regex"); + Regex::new(r".* (max: |allowed for your plan: |is limited to |block range limit \(|exceeds max block range |maximum allowed is )(?P\d+).*") + .expect("Invalid regex"); let max_block_count = match error { BlockchainInterfaceError::QueryFailed(msg) => match regex_result.captures(msg.as_str()) { @@ -538,8 +542,8 @@ struct PendingTxInfo { when_sent: SystemTime, } -pub fn increase_gas_price_by_margin(gas_price: u128) -> u128 { - (gas_price * (100 + DEFAULT_GAS_PRICE_MARGIN as u128)) / 100 +pub fn increase_by_percentage(gas_price: u128) -> u128 { + (gas_price * (100 + DEFAULT_GAS_PRICE_RETRY_PERCENTAGE as u128)) / 100 } pub struct BlockchainBridgeSubsFactoryReal {} @@ -771,7 +775,7 @@ mod tests { let accountant_received_payment = accountant_recording_arc.lock().unwrap(); let blockchain_agent_with_context_msg_actual: &PricedTemplatesMessage = accountant_received_payment.get_record(0); - let computed_gas_price_wei = increase_gas_price_by_margin(0x230000000); + let computed_gas_price_wei = increase_by_percentage(0x230000000); let expected_tx_templates = tx_templates .iter() .map(|tx_template| PricedNewTxTemplate { @@ -1021,7 +1025,7 @@ mod tests { // let pending_payable_fingerprint_seeds_msg = // accountant_recording.get_record::(0); let sent_payables_msg = accountant_recording.get_record::(0); - let scan_error_msg = accountant_recording.get_record::(1); + // let scan_error_msg = accountant_recording.get_record::(1); let batch_results = sent_payables_msg.clone().payment_procedure_result.unwrap(); let failed_tx = FailedTx { hash: H256::from_str( @@ -1048,22 +1052,23 @@ mod tests { // amount: account.balance_wei // }] // ); - assert_eq!(scan_error_msg.scan_type, DetailedScanType::NewPayables); - assert_eq!( - scan_error_msg.response_skeleton_opt, - Some(ResponseSkeleton { - client_id: 1234, - context_id: 4321 - }) - ); - assert!(scan_error_msg - .msg - .contains("ReportAccountsPayable: Sending error: \"Transport error: Error(IncompleteMessage)\". Signed and hashed transactions:"), "This string didn't contain the expected: {}", scan_error_msg.msg); - assert!(scan_error_msg.msg.contains( - "FailedTx { hash: 0x81d20df32920161727cd20e375e53c2f9df40fd80256a236fb39e444c999fb6c," - )); - assert!(scan_error_msg.msg.contains("FailedTx { hash: 0x81d20df32920161727cd20e375e53c2f9df40fd80256a236fb39e444c999fb6c, receiver_address: 0x00000000000000000000000000000000626c6168, amount_minor: 111420204, timestamp:"), "This string didn't contain the expected: {}", scan_error_msg.msg); - assert_eq!(accountant_recording.len(), 2); + // assert_eq!(scan_error_msg.scan_type, DetailedScanType::NewPayables); + // assert_eq!( + // scan_error_msg.response_skeleton_opt, + // Some(ResponseSkeleton { + // client_id: 1234, + // context_id: 4321 + // }) + // ); + // assert!(scan_error_msg + // .msg + // .contains("ReportAccountsPayable: Sending error: \"Transport error: Error(IncompleteMessage)\". Signed and hashed transactions:"), "This string didn't contain the expected: {}", scan_error_msg.msg); + // assert!(scan_error_msg.msg.contains( + // "FailedTx { hash: 0x81d20df32920161727cd20e375e53c2f9df40fd80256a236fb39e444c999fb6c," + // )); + // assert!(scan_error_msg.msg.contains("FailedTx { hash: 0x81d20df32920161727cd20e375e53c2f9df40fd80256a236fb39e444c999fb6c, receiver_address: 0x00000000000000000000000000000000626c6168, amount_minor: 111420204, timestamp:"), "This string didn't contain the expected: {}", scan_error_msg.msg); + //assert_eq!(accountant_recording.len(), 2); + assert_eq!(accountant_recording.len(), 1); } #[test] @@ -1534,7 +1539,7 @@ mod tests { system.run(); let after = SystemTime::now(); let expected_transactions = RetrievedBlockchainTransactions { - new_start_block: BlockMarker::Value(42 + 9_000_000 + 1), + new_start_block: BlockMarker::Value(42 + 9_000_000), transactions: vec![ BlockchainTransaction { block_number: 6040059, @@ -1735,7 +1740,7 @@ mod tests { let received_payments_message = accountant_recording.get_record::(0); check_timestamp(before, received_payments_message.timestamp, after); let expected_transactions = RetrievedBlockchainTransactions { - new_start_block: BlockMarker::Value(6 + 5000 + 1), + new_start_block: BlockMarker::Value(6 + 5000), transactions: vec![BlockchainTransaction { block_number: 2000, from: earning_wallet.clone(), @@ -2198,6 +2203,15 @@ mod tests { assert_eq!(Some(100000), max_block_count); } + #[test] + fn extract_max_block_range_for_nodies_error_response_v2() { + let result = BlockchainInterfaceError::QueryFailed("RPC error: Error { code: ServerError(-32001), message: \"Block range too large: maximum allowed is 20000 blocks\", data: None }".to_string()); + + let max_block_count = BlockchainBridge::extract_max_block_count(result); + + assert_eq!(Some(20000), max_block_count); + } + #[test] fn extract_max_block_range_for_expected_batch_got_single_error_response() { let result = BlockchainInterfaceError::QueryFailed( @@ -2225,7 +2239,7 @@ mod tests { #[test] fn increase_gas_price_by_margin_works() { - assert_eq!(increase_gas_price_by_margin(1_000_000_000), 1_300_000_000); - assert_eq!(increase_gas_price_by_margin(9_000_000_000), 11_700_000_000); + assert_eq!(increase_by_percentage(1_000_000_000), 1_300_000_000); + assert_eq!(increase_by_percentage(9_000_000_000), 11_700_000_000); } } diff --git a/node/src/blockchain/blockchain_interface/blockchain_interface_web3/lower_level_interface_web3.rs b/node/src/blockchain/blockchain_interface/blockchain_interface_web3/lower_level_interface_web3.rs index 7a4d6ddfb..e2c6f4dcc 100644 --- a/node/src/blockchain/blockchain_interface/blockchain_interface_web3/lower_level_interface_web3.rs +++ b/node/src/blockchain/blockchain_interface/blockchain_interface_web3/lower_level_interface_web3.rs @@ -9,7 +9,7 @@ use futures::Future; use serde_json::Value; use web3::contract::{Contract, Options}; use web3::transports::{Batch, Http}; -use web3::types::{Address, BlockNumber, Filter, Log}; +use web3::types::{Address, Filter, Log}; use web3::{Error, Web3}; pub struct LowBlockchainIntWeb3 { @@ -68,7 +68,7 @@ impl LowBlockchainInt for LowBlockchainIntWeb3 { Box::new( self.web3 .eth() - .transaction_count(address, Some(BlockNumber::Pending)) + .transaction_count(address, None) .map_err(move |e| QueryFailed(format!("{} for wallet {}", e, address))), ) } diff --git a/node/src/blockchain/blockchain_interface/blockchain_interface_web3/mod.rs b/node/src/blockchain/blockchain_interface/blockchain_interface_web3/mod.rs index 9249c6ee0..f7e779cc3 100644 --- a/node/src/blockchain/blockchain_interface/blockchain_interface_web3/mod.rs +++ b/node/src/blockchain/blockchain_interface/blockchain_interface_web3/mod.rs @@ -279,6 +279,8 @@ impl BlockchainInterface for BlockchainInterfaceWeb3 { get_transaction_id .map_err(LocalPayableError::TransactionID) .and_then(move |latest_nonce| { + debug!(logger, "Latest nonce: {:?}", latest_nonce); + let templates = SignableTxTemplates::new(priced_templates, latest_nonce.as_u64()); @@ -394,7 +396,9 @@ impl BlockchainInterfaceWeb3 { ) -> BlockMarker { let locally_determined_end_block_marker = match (start_block_marker, scan_range) { (BlockMarker::Value(start_block), BlockScanRange::Range(scan_range_number)) => { - BlockMarker::Value(start_block + scan_range_number) + // Subtract 1 because the range is inclusive: [start_block, end_block] + // Example: If max range is 20000, we need start_block to start_block+20000-1 (ending up with 20000 blocks total) + BlockMarker::Value(start_block + scan_range_number - 1) } (_, _) => BlockMarker::Uninitialized, }; @@ -469,7 +473,6 @@ mod tests { use super::*; use crate::accountant::scanners::pending_payable_scanner::utils::TxHashByTable; use crate::accountant::test_utils::make_payable_account; - use crate::blockchain::blockchain_bridge::increase_gas_price_by_margin; use crate::blockchain::blockchain_interface::blockchain_interface_web3::{ BlockchainInterfaceWeb3, CONTRACT_ABI, REQUESTS_IN_PARALLEL, TRANSACTION_LITERAL, TRANSFER_METHOD_ID, @@ -501,10 +504,12 @@ mod tests { use itertools::Either; use web3::transports::Http; use web3::types::{H256, U256}; + use masq_lib::constants::DEFAULT_GAS_PRICE_RETRY_CONSTANT; use crate::accountant::scanners::payable_scanner::tx_templates::initial::new::NewTxTemplates; use crate::accountant::scanners::payable_scanner::tx_templates::initial::retry::RetryTxTemplates; use crate::accountant::scanners::payable_scanner::tx_templates::priced::retry::PricedRetryTxTemplate; use crate::accountant::scanners::payable_scanner::tx_templates::test_utils::RetryTxTemplateBuilder; + use crate::blockchain::blockchain_bridge::increase_by_percentage; #[test] fn constants_are_correct() { @@ -560,8 +565,8 @@ mod tests { let start_block_marker = BlockMarker::Value(42); let scan_range = BlockScanRange::Range(1000); let block_response = "0x7d0"; // 2_000 - let expected_new_start_block = BlockMarker::Value(42 + 1000 + 1); - let expected_log = "from start block: Number(42) to end block: Number(1042)"; + let expected_new_start_block = BlockMarker::Value(42 + 1000); + let expected_log = "from start block: Number(42) to end block: Number(1041)"; assert_on_retrieves_transactions( start_block_marker, scan_range, @@ -884,7 +889,7 @@ mod tests { let gas_price_wei_from_rpc_u128_wei = u128::from_str_radix(&gas_price_wei_from_rpc_hex[2..], 16).unwrap(); let gas_price_wei_from_rpc_u128_wei_with_margin = - increase_gas_price_by_margin(gas_price_wei_from_rpc_u128_wei); + increase_by_percentage(gas_price_wei_from_rpc_u128_wei); let expected_result = Either::Left(PricedNewTxTemplates::new( tx_templates.clone(), gas_price_wei_from_rpc_u128_wei_with_margin, @@ -902,32 +907,32 @@ mod tests { #[test] fn blockchain_interface_web3_can_introduce_blockchain_agent_in_the_retry_payables_mode() { let gas_price_wei = "0x3B9ACA00"; // 1000000000 - let gas_price_from_rpc = u128::from_str_radix(&gas_price_wei[2..], 16).unwrap(); + let latest_gas_price_wei = u128::from_str_radix(&gas_price_wei[2..], 16).unwrap(); let retry_1 = RetryTxTemplateBuilder::default() .payable_account(&make_payable_account(12)) - .prev_gas_price_wei(gas_price_from_rpc - 1) + .prev_gas_price_wei(latest_gas_price_wei - 1) .build(); let retry_2 = RetryTxTemplateBuilder::default() .payable_account(&make_payable_account(34)) - .prev_gas_price_wei(gas_price_from_rpc) + .prev_gas_price_wei(latest_gas_price_wei) .build(); let retry_3 = RetryTxTemplateBuilder::default() .payable_account(&make_payable_account(56)) - .prev_gas_price_wei(gas_price_from_rpc + 1) + .prev_gas_price_wei(latest_gas_price_wei + 1) .build(); let retry_tx_templates = RetryTxTemplates(vec![retry_1.clone(), retry_2.clone(), retry_3.clone()]); let expected_retry_tx_templates = PricedRetryTxTemplates(vec![ - PricedRetryTxTemplate::new(retry_1, increase_gas_price_by_margin(gas_price_from_rpc)), - PricedRetryTxTemplate::new(retry_2, increase_gas_price_by_margin(gas_price_from_rpc)), + PricedRetryTxTemplate::new(retry_1, increase_by_percentage(latest_gas_price_wei)), + PricedRetryTxTemplate::new(retry_2, increase_by_percentage(latest_gas_price_wei)), PricedRetryTxTemplate::new( retry_3, - increase_gas_price_by_margin(gas_price_from_rpc + 1), + (latest_gas_price_wei + 1) + DEFAULT_GAS_PRICE_RETRY_CONSTANT, ), ]); - let expected_estimated_transaction_fee_total = 285_979_200_073_328; + let expected_estimated_transaction_fee_total = 263_981_166_713_328; test_blockchain_interface_web3_can_introduce_blockchain_agent( Either::Right(retry_tx_templates), @@ -1270,7 +1275,7 @@ mod tests { Err(BlockchainInterfaceError::InvalidResponse), &logger ), - BlockMarker::Value(150) + BlockMarker::Value(149) ); assert_eq!( Subject::calculate_end_block_marker( @@ -1288,7 +1293,7 @@ mod tests { Ok(120.into()), &logger ), - BlockMarker::Value(50 + 10) + BlockMarker::Value(59) ); } diff --git a/node/src/blockchain/blockchain_interface/blockchain_interface_web3/utils.rs b/node/src/blockchain/blockchain_interface/blockchain_interface_web3/utils.rs index 16cab8c2d..fa893b19f 100644 --- a/node/src/blockchain/blockchain_interface/blockchain_interface_web3/utils.rs +++ b/node/src/blockchain/blockchain_interface/blockchain_interface_web3/utils.rs @@ -282,11 +282,16 @@ pub fn send_payables_within_batch( transmission_log(chain, &signable_tx_templates) ); + let logger_clone = logger.clone(); + Box::new( web3_batch .transport() .submit_batch() - .map_err(move |e| return_sending_error(&sent_txs_for_err, &e)) + .map_err(move |e| { + warning!(logger_clone, "Failed to submit batch to Web3 client: {}", e); + return_sending_error(&sent_txs_for_err, &e) + }) .and_then(move |batch_responses| Ok(return_batch_results(sent_txs, batch_responses))), ) } @@ -727,6 +732,7 @@ mod tests { #[test] fn send_payables_within_batch_fails_on_submit_batch_call() { + let test_name = "send_payables_within_batch_fails_on_submit_batch_call"; let port = find_free_port(); let (_event_loop_handle, transport) = Http::with_max_parallel( &format!("http://{}:{}", &Ipv4Addr::LOCALHOST.to_string(), port), @@ -779,12 +785,15 @@ mod tests { failed_txs, }); - test_send_payables_within_batch( - "send_payables_within_batch_fails_on_submit_batch_call", - signable_tx_templates, - expected_result, - port, - ); + test_send_payables_within_batch(test_name, signable_tx_templates, expected_result, port); + + let os_specific_code = transport_error_code(); + let os_specific_msg = transport_error_message(); + TestLogHandler::new().exists_log_containing(&format!( + "WARN: {test_name}: Failed to submit batch to Web3 client: Transport error: \ + Error(Connect, Os {{ code: {}, kind: ConnectionRefused, message: \"{}\" }}", + os_specific_code, os_specific_msg + )); } #[test] diff --git a/node/src/blockchain/blockchain_interface_initializer.rs b/node/src/blockchain/blockchain_interface_initializer.rs index 04838f312..01b97a3b8 100644 --- a/node/src/blockchain/blockchain_interface_initializer.rs +++ b/node/src/blockchain/blockchain_interface_initializer.rs @@ -47,7 +47,7 @@ mod tests { use crate::accountant::scanners::payable_scanner::tx_templates::initial::new::NewTxTemplates; use crate::accountant::scanners::payable_scanner::tx_templates::priced::new::PricedNewTxTemplates; use crate::accountant::test_utils::make_payable_account; - use crate::blockchain::blockchain_bridge::increase_gas_price_by_margin; + use crate::blockchain::blockchain_bridge::increase_by_percentage; use crate::blockchain::blockchain_interface_initializer::BlockchainInterfaceInitializer; use crate::test_utils::make_wallet; use futures::Future; @@ -88,7 +88,7 @@ mod tests { .unwrap(); assert_eq!(blockchain_agent.consuming_wallet(), &payable_wallet); let result = blockchain_agent.price_qualified_payables(Either::Left(tx_templates.clone())); - let gas_price_with_margin = increase_gas_price_by_margin(1_000_000_000); + let gas_price_with_margin = increase_by_percentage(1_000_000_000); let expected_result = Either::Left(PricedNewTxTemplates::new( tx_templates, gas_price_with_margin, diff --git a/node/src/test_utils/recorder.rs b/node/src/test_utils/recorder.rs index f52b1a0c8..569a5f804 100644 --- a/node/src/test_utils/recorder.rs +++ b/node/src/test_utils/recorder.rs @@ -641,8 +641,9 @@ impl PeerActorsBuilder { } // This must be called after System.new and before System.run. - // These addresses may be helpful for setting up the Counter Messages. - pub fn build_and_provide_addresses(self) -> (PeerActors, PeerActorAddrs) { + // + // The addresses may be helpful for setting up the Counter Messages. + pub fn build_with_addresses(self) -> (PeerActors, PeerActorAddrs) { let proxy_server_addr = self.proxy_server.start(); let dispatcher_addr = self.dispatcher.start(); let hopper_addr = self.hopper.start(); @@ -683,7 +684,7 @@ impl PeerActorsBuilder { // This must be called after System.new and before System.run pub fn build(self) -> PeerActors { - let (peer_actors, _) = self.build_and_provide_addresses(); + let (peer_actors, _) = self.build_with_addresses(); peer_actors } } diff --git a/node/tests/contract_test.rs b/node/tests/contract_test.rs index 42d6fce37..35b7e0f85 100644 --- a/node/tests/contract_test.rs +++ b/node/tests/contract_test.rs @@ -133,7 +133,10 @@ fn masq_erc20_contract_exists_on_polygon_mainnet_integration() { #[test] fn masq_erc20_contract_exists_on_ethereum_mainnet_integration() { - let blockchain_urls = vec!["https://mainnet.infura.io/v3/0ead23143b174f6983c76f69ddcf4026"]; + let blockchain_urls = vec![ + "https://eth.llamarpc.com", + "https://mainnet.infura.io/v3/0ead23143b174f6983c76f69ddcf4026", + ]; let chain = Chain::EthMainnet; let assertion_body = |url, chain| assert_contract_existence(url, chain, "MASQ", 18); @@ -207,7 +210,10 @@ fn assert_total_supply( #[test] fn max_token_supply_matches_corresponding_constant_integration() { - let blockchain_urls = vec!["https://mainnet.infura.io/v3/0ead23143b174f6983c76f69ddcf4026"]; + let blockchain_urls = vec![ + "https://eth.llamarpc.com", + "https://mainnet.infura.io/v3/0ead23143b174f6983c76f69ddcf4026", + ]; let chain = Chain::EthMainnet; let assertion_body = |url, chain| assert_total_supply(url, chain, MASQ_TOTAL_SUPPLY);