Skip to content

Commit

Permalink
merge 1.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Stubbs committed Jul 18, 2019
2 parents 543aa0f + 75c6c7b commit cd6e04c
Show file tree
Hide file tree
Showing 29 changed files with 1,467 additions and 209 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
# Change Log
All notable changes to this project will be documented in this file.

## 1.2.0 - 2019-07-15
### Added
- Added actor events subsystem with events agent that reads from the events queue.
- Added support for actor links to send an actor's events to another actor.
- Added support for an actor webhook property for sending an actor's events as an HTTP POST to an endpoint.
- Added timing data to messages POST processing.

### Changed
- Executions now change to status "RUNNING" as soon as a worker starts the corresponing actor container.
- Force halting an execution fails if the status is not RUNNING.
- Reading and managing nonces associated with aliases requires permissions on both the alias and the actor.
- Spawner now sets actor to READY state before setting worker to READY state to prevent autoscaler from stopping worker before actor is update to READY.
- Updated ActorMsgQueue to use a new, simpler class, TaskQueue, removing dependency on channelpy.

### Removed
- No change.


## 1.1.0 - 2019-06-18
### Added
- Added support for sending synchronous messages to an actor.
Expand Down
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ RUN pip3 install -r /requirements.txt

RUN touch /var/log/abaco.log

# set default threads for gunicorn
ENV threads=3

# todo -- add/remove to toggle between local channelpy and github instance
#ADD channelpy /channelpy
#RUN pip3 install /channelpy
Expand Down
8 changes: 8 additions & 0 deletions abaco.conf
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ dd: unix://var/run/docker.sock
# number of worker containers to initially start when an actor is created
init_count: 1

# set whether autoscaling is enabled
autoscaling = false

# max length of time, in seconds, an actor container is allowed to execute before being killed.
# set to -1 for indefinite execution time.
max_run_time: -1
Expand Down Expand Up @@ -116,6 +119,11 @@ show_traceback: false
# Here we set the to 12 hours.
log_ex: 43200

# Max length (in bytes) to store an actor execution's log. If a log exceeds this length, the log will be truncated.
# Note: max_log_length must not exceed the maximum document length for the log store.
# here we default it to 1 MB
max_log_length: 1000000

# Either camel or snake: Whether to return responses in camel case or snake. Default is snake.
case: snake

Expand Down
47 changes: 37 additions & 10 deletions actors/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,32 @@ def authorization():
if '/actors/aliases' in request.url_rule.rule:
alias_id = get_alias_id()
noun = 'alias'
if request.method == 'GET':
# GET requests require READ access
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ)
# all other requests require UPDATE access
elif request.method in ['DELETE', 'POST', 'PUT']:
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE)
# we need to compute the db_id since it is not computed in the general case for
# alias endpoints
db_id, _ = get_db_id()
# reading/creating/updating nonces for an alias requires permissions for both the
# alias itself and the underlying actor
if 'nonce' in request.url_rule.rule:
noun = 'alias and actor'
# logger.debug("checking user {} has permissions for "
# "alias: {} and actor: {}".format(g.user, alias_id, db_id))
if request.method == 'GET':
# GET requests require READ access

has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ)
has_pem = has_pem and check_permissions(user=g.user, identifier=db_id, level=codes.READ)
elif request.method in ['DELETE', 'POST', 'PUT']:
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE)
has_pem = has_pem and check_permissions(user=g.user, identifier=db_id, level=codes.UPDATE)

# otherwise, this is a request to manage the alias itself; only requires permissions on the alias
else:
if request.method == 'GET':
# GET requests require READ access
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.READ)
# all other requests require UPDATE access
elif request.method in ['DELETE', 'POST', 'PUT']:
has_pem = check_permissions(user=g.user, identifier=alias_id, level=codes.UPDATE)
else:
# all other checks are based on actor-id:
noun = 'actor'
Expand Down Expand Up @@ -318,14 +338,21 @@ def check_permissions(user, identifier, level):

def get_db_id():
"""Get the db_id and actor_identifier from the request path."""
# logger.debug("top of get_db_id. request.path: {}".format(request.path))
# the location of the actor identifier is different for aliases vs actor_id's.
# for actors, it is in index 2:
# /actors/<actor_id>
# for aliases, it is in index 3:
# /actors/aliases/<alias_id>
idx = 2
if 'aliases' in request.path:
idx = 3
path_split = request.path.split("/")
if len(path_split) < 3:
logger.error("Unrecognized request -- could not find the actor id. path_split: {}".format(path_split))
raise PermissionsException("Not authorized.")
# logger.debug("path_split: {}".format(path_split))
actor_identifier = path_split[2]
# logger.debug("actor_identifier: {}; tenant: {}".format(actor_identifier, g.tenant))
logger.debug("path_split: {}".format(path_split))
actor_identifier = path_split[idx]
logger.debug("actor_identifier: {}; tenant: {}".format(actor_identifier, g.tenant))
try:
actor_id = Actor.get_actor_id(g.tenant, actor_identifier)
except KeyError:
Expand Down
35 changes: 34 additions & 1 deletion actors/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ def put_cmd(self, actor_id, worker_id, image, tenant, stop_existing=True):
self.put(msg)


class EventsChannel(Channel):
"""Work with events on the events channel."""

event_queue_names = ('default',
)

def __init__(self, name='default'):
self.uri = Config.get('rabbit', 'uri')
if name not in EventsChannel.event_queue_names:
raise Exception('Invalid Events Channel Queue name.')

super().__init__(name='events_channel_{}'.format(name),
connection_type=RabbitConnection,
uri=self.uri)

def put_event(self, json_data):
"""Put a new event on the events channel."""
self.put(json_data)


class BinaryChannel(BasicChannel):
"""Override BaseChannel methods to handle binary messages."""

Expand Down Expand Up @@ -131,8 +151,21 @@ def get_one(self):
return self._process(msg.body), msg


from queues import BinaryTaskQueue


class ActorMsgChannel(BinaryTaskQueue):
def __init__(self, actor_id):
super().__init__(name='actor_msg_{}'.format(actor_id))

def put_msg(self, message, d={}, **kwargs):
d['message'] = message
for k, v in kwargs:
d[k] = v
self.put(d)


class ActorMsgChannel(BinaryChannel):
class ActorMSSgChannel(BinaryChannel):
"""Work with messages sent to a specific actor.
"""
def __init__(self, actor_id):
Expand Down
2 changes: 1 addition & 1 deletion actors/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def run(self):
message, msg_obj = self.ch.get_one()
# we directly ack messages from the clients channel because caller expects direct reply_to
msg_obj.ack()
logger.info("cleintg processing message: {}".format(message))
logger.info("clientg processing message: {}".format(message))
anon_ch = message['reply_to']
cmd = message['value']
if cmd.get('command') == 'new':
Expand Down
1 change: 1 addition & 0 deletions actors/codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
PROCESSING = 'PROCESSING'
COMPLETE = 'COMPLETE'
SUBMITTED = 'SUBMITTED'
RUNNING = 'RUNNING'
READY = 'READY'
ERROR = 'ERROR'
BUSY = 'BUSY'
Expand Down

0 comments on commit cd6e04c

Please sign in to comment.