Skip to content

Commit

Permalink
add proxy, fix smart contract refactor bug
Browse files Browse the repository at this point in the history
  • Loading branch information
pdxwebdev committed Aug 26, 2022
1 parent dd74926 commit 836945d
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 6 deletions.
10 changes: 10 additions & 0 deletions yadacoin/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from yadacoin.core.identity import Identity
from yadacoin.core.health import Health
from yadacoin.core.smtp import Email
from yadacoin.http.proxy import ProxyHandler
from yadacoin.http.web import WEB_HANDLERS
from yadacoin.http.explorer import EXPLORER_HANDLERS
from yadacoin.http.graph import GRAPH_HANDLERS
Expand Down Expand Up @@ -541,6 +542,8 @@ def init_http(self):
self.config.app_log.info("API: http://{}:{}".format(self.config.serve_host, self.config.serve_port))
if 'web' in self.config.modes:
self.config.app_log.info("Wallet: http://{}:{}/app".format(self.config.serve_host, self.config.serve_port))
if 'proxy' in self.config.modes:
self.config.app_log.info("Proxy: {}:{}".format(self.config.serve_host, self.config.proxy_port))
if os.path.exists(path.join(path.dirname(__file__), '..', 'templates')):
template_path = path.join(path.dirname(__file__), '..', 'templates')
else:
Expand Down Expand Up @@ -659,5 +662,12 @@ def init_config_properties(self, test=False):
if x.__name__ not in self.config.websocketServer.inbound_streams:
self.config.websocketServer.inbound_streams[x.__name__] = {}

if 'proxy' in self.config.modes:
app = tornado.web.Application([
(r'/', ProxyHandler),
(r'.*', ProxyHandler),
])
app.listen(self.config.proxy_port)

if __name__ == "__main__":
NodeApplication()
4 changes: 3 additions & 1 deletion yadacoin/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(self, config):
self.mp = None
self.pp = None
self.stratum_pool_port = config.get('stratum_pool_port', 3333)
self.proxy_port = config.get('proxy_port', 8080)
self.wallet_host_port = config.get('wallet_host_port', 'http://localhost:{}'.format(config['serve_port']))
self.websocket_host_port = config.get('websocket_host_port', 'ws://localhost:{}'.format(config['serve_port']))
self.credits_per_share = config.get('credits_per_share', 5)
Expand Down Expand Up @@ -374,7 +375,8 @@ def to_dict(self):
'skynet_url': self.skynet_url,
'skynet_api_key': self.skynet_api_key,
'web_jwt_expiry': self.web_jwt_expiry,
'stratum_pool_port': self.stratum_pool_port
'stratum_pool_port': self.stratum_pool_port,
'proxy_port': self.proxy_port
}

def to_json(self):
Expand Down
4 changes: 2 additions & 2 deletions yadacoin/core/miningpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ async def get_pending_transactions(self):

# process recurring payments
generated_txns = []
async for x in await TU.get_current_smart_contract_txns(self.config.LatestBlock.block.index):
async for x in await TU.get_current_smart_contract_txns(self.config, self.config.LatestBlock.block.index):
try:
smart_contract_txn = Transaction.from_dict(x['transactions'])
except:
Expand All @@ -459,7 +459,7 @@ async def get_pending_transactions(self):

# process expired contracts
used_public_keys = []
async for x in await TU.get_expired_smart_contract_txns(self.config.LatestBlock.block.index):
async for x in await TU.get_expired_smart_contract_txns(self.config, self.config.LatestBlock.block.index):
expired_blockchain_smart_contract_obj = Transaction.from_dict(x.get('transactions'))
if expired_blockchain_smart_contract_obj.public_key in used_public_keys:
continue
Expand Down
7 changes: 4 additions & 3 deletions yadacoin/core/transactionutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async def rebroadcast_mempool(cls, config, include_zero=False):

@classmethod
async def get_current_smart_contract_txns(cls, config, start_index):
return self.config.mongo.async_db.blocks.aggregate([
return config.mongo.async_db.blocks.aggregate([
{
'$match': {
'transactions': {'$elemMatch': {'relationship.smart_contract.expiry': {'$gt': start_index}}}
Expand All @@ -180,7 +180,7 @@ async def get_current_smart_contract_txns(cls, config, start_index):

@classmethod
async def get_expired_smart_contract_txns(cls, config, start_index):
return self.config.mongo.async_db.blocks.aggregate([
return config.mongo.async_db.blocks.aggregate([
{
'$match': {
'transactions.relationship.smart_contract.expiry': start_index
Expand Down Expand Up @@ -231,5 +231,6 @@ async def get_trigger_txns(cls, config, smart_contract_txn, start_index=None, en
async for x in trigger_txn_blocks:
yield x

def get_transaction_objs_list(self, transaction_objs):
@classmethod
def get_transaction_objs_list(cls, transaction_objs):
return [y for x in list(transaction_objs.values()) for y in x]
189 changes: 189 additions & 0 deletions yadacoin/http/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/env python
#
# Simple asynchronous HTTP proxy with tunnelling (CONNECT).
#
# GET/POST proxying based on
# http://groups.google.com/group/python-tornado/msg/7bea08e7a049cf26
#
# Copyright (C) 2012 Senko Rasic <senko.rasic@dobarkod.hr>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

import asyncio
import os
import sys
import socket
if sys.version_info[0] == 2:
from urlparse import urlparse
else:
from urllib.parse import urlparse

import tornado.httpserver
import tornado.ioloop
import tornado.iostream
import tornado.web
import tornado.httpclient
import tornado.httputil
from yadacoin.core.config import get_config


__all__ = ['ProxyHandler']


def get_proxy(url):
url_parsed = urlparse(url, scheme='http')
proxy_key = '%s_proxy' % url_parsed.scheme
return os.environ.get(proxy_key)


def parse_proxy(proxy):
proxy_parsed = urlparse(proxy, scheme='http')
return proxy_parsed.hostname, proxy_parsed.port


def fetch_request(url, **kwargs):
proxy = get_proxy(url)
if proxy:
get_config().app_log.debug('Forward request via upstream proxy %s', proxy)
tornado.httpclient.AsyncHTTPClient.configure(
'tornado.curl_httpclient.CurlAsyncHTTPClient')
host, port = parse_proxy(proxy)
kwargs['proxy_host'] = host
kwargs['proxy_port'] = port

req = tornado.httpclient.HTTPRequest(url, **kwargs)
client = tornado.httpclient.AsyncHTTPClient()
return client.fetch(req, raise_error=False)


class ProxyHandler(tornado.web.RequestHandler):
SUPPORTED_METHODS = ['GET', 'POST', 'CONNECT']

def compute_etag(self):
return None # disable tornado Etag

async def get(self):
get_config().app_log.debug('Handle %s request to %s', self.request.method,
self.request.uri)

def handle_response(response):
if (response.error and not
isinstance(response.error, tornado.httpclient.HTTPError)):
self.set_status(500)
self.write('Internal server error:\n' + str(response.error))
else:
self.set_status(response.code, response.reason)
self._headers = tornado.httputil.HTTPHeaders() # clear tornado default header

for header, v in response.headers.get_all():
if header not in ('Content-Length', 'Transfer-Encoding', 'Content-Encoding', 'Connection'):
self.add_header(header, v) # some header appear multiple times, eg 'Set-Cookie'

if response.body:
self.set_header('Content-Length', len(response.body))
self.write(response.body)
self.finish()

body = self.request.body
if not body:
body = None
try:
if 'Proxy-Connection' in self.request.headers:
del self.request.headers['Proxy-Connection']
resp = await fetch_request(
self.request.uri,
method=self.request.method, body=body,
headers=self.request.headers, follow_redirects=False,
allow_nonstandard_methods=True)
handle_response(resp)
except tornado.httpclient.HTTPError as e:
if hasattr(e, 'response') and e.response:
handle_response(e.response)
else:
self.set_status(500)
self.write('Internal server error:\n' + str(e))
self.finish()

async def post(self):
return await self.get()

async def connect(self):
config = get_config()
config.app_log.debug('Start CONNECT to %s', self.request.uri)
host, port = self.request.uri.split(':')
client = self.request.connection.stream

async def relay(reader, writer):
try:
alldata = b''
while True:
data = await reader.read_bytes(1024*64, partial=True)
if writer.closed():
get_config().app_log.debug(alldata)
return
if data:
alldata += data
writer.write(data)
else:
break
get_config().app_log.debug(alldata)
except tornado.iostream.StreamClosedError:
pass

async def start_tunnel():
get_config().app_log.debug('CONNECT tunnel established to %s', self.request.uri)
client.write(b'HTTP/1.0 407 Proxy Authentication Required\r\n')
client.write(b'Proxy-Authenticate: Basic realm="Access to internal site"\r\n\r\n')
await asyncio.gather(
relay(client, upstream),
relay(upstream, client)
)
client.close()
upstream.close()

async def on_proxy_response(data=None):
if data:
first_line = data.splitlines()[0]
http_v, status, text = first_line.split(None, 2)
if int(status) == 200:
get_config().app_log.debug('Connected to upstream proxy %s', proxy)
await start_tunnel()
return

self.set_status(500)
self.finish()

async def start_proxy_tunnel():
upstream.write('CONNECT %s HTTP/1.1\r\n' % self.request.uri)
upstream.write('Host: %s\r\n' % self.request.uri)
upstream.write('Proxy-Connection: Keep-Alive\r\n\r\n')
data = await upstream.read_until('\r\n\r\n')
on_proxy_response(data)

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
upstream = tornado.iostream.IOStream(s)

proxy = get_proxy(self.request.uri)
if proxy:
proxy_host, proxy_port = parse_proxy(proxy)
await upstream.connect((proxy_host, proxy_port))
await start_proxy_tunnel()
else:
await upstream.connect((host, int(port)))
await start_tunnel()

0 comments on commit 836945d

Please sign in to comment.