Source code for morse.middleware.pocolibs.overlays.viam_overlay
import logging; logger = logging.getLogger("morse." + __name__)
from morse.core.services import service, async_service, interruptible
from morse.core.overlay import MorseOverlay
from morse.core import status
from morse.middleware.pocolibs_datastream import DummyPoster
[docs]class ViamModule(MorseOverlay):
def __init__(self, overlaid_object):
# Call the constructor of the parent class
MorseOverlay.__init__(self, overlaid_object)
self._cntrl = DummyPoster("viamCntrl")
[docs] def Acquire_cb(self, answer):
status, res = answer
return status, [ self._bench, self._n]
@service
[docs] def Init(self, *args):
pass
@service
@service
[docs] def Reset(self, *args):
pass
@service
[docs] def DriverLoad(self, *args):
pass
@service
[docs] def BusPrint(self, *args):
pass
@service
[docs] def BankCreate(self, *args):
pass
@service
[docs] def CameraCreate(self, *args):
pass
@service
[docs] def BankAddCamera(self, *args):
pass
@service
@service
@service
[docs] def PushGeoFilter(self, *args):
pass
@service
[docs] def UpdateGeoFilter(self, *args):
pass
@service
[docs] def PushLumFilter(self, *args):
pass
@service
[docs] def UpdateLumFilter(self, *args):
pass
@service
[docs] def PushColorFilter(self, *args):
pass
@service
[docs] def UpdateColorFilter(self, *args):
pass
@service
[docs] def PushImProcFilter(self, *args):
pass
@service
[docs] def UpdateImProcFilter(self, *args):
pass
@service
[docs] def PushMiscFilter(self, *args):
pass
@service
[docs] def UpdateMiscFilter(self, *args):
pass
@service
[docs] def FilterList(self, *args):
pass
@service
[docs] def FilterGetCap(self, *args):
pass
@service
[docs] def CameraListHWModes(self, *args):
pass
@service
[docs] def CameraSetHWMode(self, *args):
pass
@service
[docs] def Display(self, *args):
pass
@service
[docs] def Calibrate(self, *args):
pass
@service
[docs] def CalibrationIO(self, *args):
pass
@service
[docs] def Save(self, *args):
pass
@interruptible
@async_service
[docs] def Acquire(self, bench, n):
n = int(n)
self._n = n
self._bench = bench
if n == 0:
n = -1
self.overlaid_object.capture(self.chain_callback(self.Acquire_cb), n)
@async_service
[docs] def Stop(self, bench):
self.completed(status.SUCCESS)
[docs] def name(self):
return "viam"